Source code for tests.management.commands.add_test_data

"""Script to create cobalt test data from csv files

CSV Files are processed in sort order and depend upon data that is created in earlier steps.
For example, an Event needs to be created before an EventEntry.

File format is:

Row 1 - app, model [, duplicates]

    app - name of Django application, e.g. events
    model - Django model name e.g. EventEntry
    duplicates - optional literal. If the word duplicates is present then they are allowed

Row 2 - headings

    These map to the field names of the table.

Example:

accounts             , TeamMate
id.user.accounts.User, id.team_mate.accounts.User, make_payments
aa                   , mark                      , 0
bb                   , mark                      , 0



"""

import contextlib
from django.core.exceptions import SuspiciousOperation

from cobalt.settings import (
    RBAC_EVERYONE,
    TIME_ZONE,
    TBA_PLAYER,
    COBALT_HOSTNAME,
)
from accounts.models import User
from django.core.management.base import BaseCommand
from accounts.management.commands.accounts_core import create_fake_user
import datetime
import pytz
from django.utils.timezone import make_aware, now
import glob
import sys
from inspect import currentframe, getframeinfo

# This import is needed, although your IDE may disagree - it is used through exec
from importlib import import_module

TZ = pytz.timezone(TIME_ZONE)
DATA_DIR = "tests/test_data"
CORE_DATA_DIR = "tests/test_data_core"


def _get_instance_of_app_model(app, model):
    """See if a app model combination is valid, returns an instance of the model or None"""

    exec_cmd = "module = import_module('%s.models')\ninstance = module.%s()" % (
        app,
        model,
    )
    # Set local array before we call the exec command and its value will change when returned
    local_array = {}

    # Execute
    exec(exec_cmd, globals(), local_array)

    # Return what we got
    return local_array["instance"]


def _parse_csv(file):
    """
    try to sort out the mess Excel makes of CSV files.
    Requires csv files to have the app and model in the first row and
    the fieldnames in the second row.
    This works on a single CSV file
    """

    with open(file, encoding="utf-8") as csv_file:

        lines = []
        for line in csv_file:

            # skip line if it is commented out (# in first column)
            if line.find("#") == 0:
                continue

            # skip blank lines
            if line.strip() == "":
                continue

            # skip if line is only commas, can happen if edited by Excel
            if line.replace(",", "").strip() == "":
                continue

            # Add apparently valid lines
            lines.append(line)

    # Data will hold rows of data excluding the first two which are configuration info
    data = []

    try:
        # First line should be the app, model
        app, model = lines[0].split(",")[:2]
    except ValueError:
        print("\n\nError\n")
        print("Didn't find App, Model on first line of file")
        print(f"File is: {file}")
        print("Line is: %s\n" % lines[0])
        sys.exit()

    try:
        # Optional third parameter to allow duplicates
        allow_dupes = lines[0].split(",")[3] == "duplicates"
    except (ValueError, IndexError):
        allow_dupes = False

    # Second line should have the headers which define the fields and their type
    # We don't validate them here, we just load them
    headers = lines[1]
    header_list = [header.strip() for header in headers.split(",")]

    # loop through records, line 3 onwards
    for line in lines[2:]:

        # split to parts using commas
        columns = line.split(",")

        # loop through columns
        row = {}
        for i in range(len(header_list)):

            try:
                # Skip missing columns - Excel adds these at the end of the row
                if columns[i].strip() == "":
                    continue
                # use header name as index and this field as the data
                row[header_list[i]] = columns[i].strip()
            except IndexError:
                row[header_list[i]] = None

        data.append(row)

    return app.strip(), model.strip(), data, allow_dupes


def _handle_not_found_error(app, model, csv):

    print("\n\nError\n")
    print(f"Failed to create instance of {app}.{model}")
    print(f"Processing file: {csv}\n")
    frame_tags_info = getframeinfo(currentframe())
    print(
        "Error somewhere above: ",
        frame_tags_info.filename,
        frame_tags_info.lineno,
        "\n",
    )
    sys.exit()


def _print_error_and_exit(error, csv, key, value):
    print("\n\nError\n")
    print(error)
    print()
    print("Options are:")
    print("  d.  exact date YYYMMDD")
    print("  m.  exact time 24hr clock HH:MM")
    print("  id. Link to foreign key")
    print(
        "  t.  relative date integer. Positive is in the past, negative in the future."
    )
    print()
    print(f"CSV File: {csv}")
    print(f"Heading: {key}")
    print(f"Data: {value}")
    print()
    sys.exit()


[docs] class Command(BaseCommand): def __init__(self): super().__init__() # we map the Django id of the table to the object # e.g. self.id_array["accounts.User"][1] = Everyone user self.id_array = {}
[docs] def add_arguments(self, parser): parser.add_argument( "--core_test_files", action="store_true", help="Use the core files directory instead of default", )
[docs] def handle(self, *args, **options): # see which directory to use to find the files data_dir = CORE_DATA_DIR if options["core_test_files"] else DATA_DIR # Check we aren't in production if COBALT_HOSTNAME in ["myabf.com.au", "www.myabf.com.au"]: raise SuspiciousOperation( "Not for use in production. This cannot be used in a production system." ) print("Running add_test_data") try: for file_name in sorted(glob.glob(f"{data_dir}/*.csv")): print("\n#########################################################") print(f"Processing: {file_name}") self.process_csv(file_name) except KeyboardInterrupt: print("\n\nTest data loading interrupted by user\n") sys.exit(0)
[docs] def process_csv(self, csv): """do the work on the csv data""" # get the data from the file app, model, data, allow_dupes = _parse_csv(csv) print(f"App Model is: {app}.{model}\n") # special case for creating users if app == "accounts" and model == "User": self.accounts_user(app, model, data) return # Pass dictionary of app models through process_csv_row to build it up app_model_dic = {} for row in data: self.process_csv_row(row, app, model, allow_dupes, csv, app_model_dic) # Add this app model dictionary to the global list for use later self.id_array[f"{app}.{model}"] = app_model_dic
[docs] def process_csv_row(self, row, app, model, allow_dupes, csv, app_model_dic): """handles processing a single row of data from the CSV""" # Get instance of app model - effectively an empty database row instance = self._get_instance_from_db(row, app, model) # Check and process instance self._check_if_exists_or_add( instance, allow_dupes, app, model, row, app_model_dic, csv )
def _check_if_exists_or_add( self, instance, allow_dupes, app, model, row, app_model_dic, csv ): if instance and not allow_dupes: print(f"already present: {instance}") else: instance = self._add_row_to_db(app, model, row, csv) # add to dic if we have an id field if "id" in row.keys(): app_model_dic[row["id"]] = instance return app_model_dic def _add_row_to_db(self, app, model, row, csv): # See if this is a valid app model combination app_model_instance = _get_instance_of_app_model(app, model) if not app_model_instance: # This will sys.exit() _handle_not_found_error(app, model, csv) # key is the field, value is what to put in it for key, value in row.items(): # CSV can use ^ symbol to represent a comma with contextlib.suppress(AttributeError): value = value.replace("^", ",") # id is assigned by Django, the id field in the CSV is what we refer to it as so skip if key == "id": continue # Foreign key if len(key) > 3 and key[:3] == "id.": # foreign key foreign_key, foreign_key_value = self._handle_foreign_key( key, row, value, app, model ) setattr(app_model_instance, foreign_key, foreign_key_value) # Relative date, X days ago or if negative X days in the future elif key[:2] == "t.": field = key[2:] try: adjusted_date = now() - datetime.timedelta(days=int(value)) except ValueError: _print_error_and_exit( "Relative date (t.) is not an integer.", csv, key, value ) datetime_local = adjusted_date.astimezone(TZ) setattr(app_model_instance, field, datetime_local) # Specific date YYYYMMDD elif key[:2] == "d.": field = key[2:] val_str = f"{value}" try: year = val_str[:4] month = val_str[4:6] day = val_str[6:8] this_date = make_aware( datetime.datetime(int(year), int(month), int(day), 0, 0), TZ, ) except ValueError: _print_error_and_exit( "Exact date (d.) is not in format YYYYMMDD.", csv, key, value ) setattr(app_model_instance, field, this_date) # Time elif key[:2] == "m.": field = key[2:] try: date_field = datetime.datetime.strptime(value, "%H:%M").time() except ValueError: _print_error_and_exit( "Time (m.) is not in format HH:MM", csv, key, value ) setattr(app_model_instance, field, date_field) # Any other normal field else: setattr(app_model_instance, key, value) app_model_instance.save() print(f"Added: {app_model_instance}") return app_model_instance def _get_instance_from_db(self, row, app, model): """see if this data is already present. We go through the headings found in row and build a query string to call using exec. We ignore the fields that aren't fieldnames - t. d. m. and id For foreign keys (id.) we build a slightly different query. """ # build up a string of Python commands to execute exec_cmd = ( f"module = import_module('{app}.models')\ninstance = module.{model}.objects" ) # Go through each value - key is the field name, value is the data for key, value in row.items(): # Ignore unset values if not value: continue # Ignore id fields if key == "id": continue # Ignore fields that aren't field names if key[:2] in ["d.", "m.", "t."]: continue # Foreign key queries if key[:3] == "id.": parts = key.split(".") foreign_key = parts[1] foreign_key_app = parts[2] foreign_key_model = parts[3] exec_cmd += f".filter({foreign_key}=this_array[f'{foreign_key_app}.{foreign_key_model}']['{value}'])" # The only other case is normal fields where the heading is the field name else: exec_cmd2 = f"module = import_module('{app}.models')\nfield_type=module.{model}._meta.get_field('{key}').get_internal_type()" exec(exec_cmd2, globals()) if field_type in ["CharField", "TextField"]: # noqa: F821 exec_cmd += f".filter({key}='{value}')" else: exec_cmd += f".filter({key}={value})" # Complete the query by getting the first match exec_cmd += ".first()" # execute dynamic Python and pass in out local data this_array = self.id_array local_array = {"this_array": this_array} try: exec(exec_cmd, globals(), local_array) except (KeyError, NameError) as exc: self.print_error_found_and_exit(exc, exec_cmd) # Return the instance, should be an app model instance or None return local_array["instance"]
[docs] def print_error_found_and_exit(self, exc, exec_cmd): """This handles exceptions""" print("\n\nError\n") print(f"{exc}") for block in self.id_array: for key2, val2 in self.id_array[block].items(): print(block, key2, val2) print("\nStatement was:") print(exec_cmd) print(exc) sys.exit()
[docs] def accounts_user(self, app, model, data): """Accounts get created first (must be first file) and we keep a reference to them for use when processing the other files""" # Dictionary to store users Django id of user maps to user object user_dic = {} # Process each row to create a user for row in data: # Handle the about section and picture file not being provided if "about" not in row: row["about"] = None if "pic" not in row: row["pic"] = None # Get or Create user user = create_fake_user( self, row["system_number"], row["first_name"], row["last_name"], row["about"], row["pic"], ) user_dic[row["id"]] = user # also get the "system" accounts user_dic["TBA"] = User.objects.filter(pk=TBA_PLAYER).first() user_dic["EVERYONE"] = User.objects.filter(pk=RBAC_EVERYONE).first() user_dic["mark"] = User.objects.filter(system_number="620246").first() user_dic["julian"] = User.objects.filter(system_number="518891").first() # Add the user dictionary to our id_array self.id_array["accounts.User"] = user_dic
def _handle_foreign_key(self, key, row, value, app, model): """helper to deal with data that points to a foreign key""" parts = key.split(".") foreign_key = parts[1] foreign_key_app = parts[2] foreign_key_model = parts[3] try: field_value = self.id_array[f"{foreign_key_app}.{foreign_key_model}"][value] except KeyError: print("\n\nError\n") print(row) print( f"Foreign key not found: {foreign_key_app}.{foreign_key_model}: {value}" ) print( f"Check that the file with {app}.{model} has id {value} and that it is loaded before this file.\n" ) sys.exit() return foreign_key, field_value