Source code for masterpoints.factories

import html
import re

import requests

from accounts.models import User
from cobalt.settings import GLOBAL_MPSERVER, MP_USE_FILE


[docs] def masterpoint_query_list(query): """Generic function to talk to the masterpoints SQL Server and return data as a list""" url = f"{GLOBAL_MPSERVER}/{query}" try: response = requests.get(url, timeout=10).json() except Exception as exc: print(exc) response = [] return response
[docs] def masterpoint_query_row(query): """Generic function to get first row from query""" ret = masterpoint_query_list(query) if ret: return ret[0] return None
[docs] def mp_file_grep(pattern): with open("media/masterpoints/MPData.csv", "r", encoding="utf-8") as mp_file: for line in mp_file: if re.search(pattern, line): return line.split(",")
[docs] class MasterpointFactory: """Abstract class for accessing masterpoint data"""
[docs] class Meta: abstract = True
[docs] def get_masterpoints(self, system_number): """Get total masterpoints"""
[docs] def system_number_lookup(self, system_number): """Look up the system number and return name"""
[docs] def system_number_valid(self, system_number): """Look up the system number and return True if okay to add this user"""
[docs] def user_summary(self, system_number): """Get basic information about a user"""
[docs] class MasterpointDB(MasterpointFactory): """Concrete implementation of a masterpoint factory using a database to get the data"""
[docs] def get_masterpoints(self, system_number): summary = masterpoint_query_row(f"mps/{system_number}") if summary: points = summary["TotalMPs"] rank = summary["RankName"] + " Master" else: points = "Not found" rank = "Not found" return {"points": points, "rank": rank}
[docs] def system_number_lookup(self, system_number): result = masterpoint_query_row(f"id/{system_number}") if result: if User.objects.filter( system_number=system_number, is_active=True ).exists(): return "Error: User already registered" if result["IsActive"] == "Y": # only use first name from given names given_name = result["GivenNames"].split(" ")[0] surname = result["Surname"] return html.unescape(f"{given_name} {surname}") return "Error: Invalid or inactive number"
[docs] def system_number_valid(self, system_number): """Checks if this is valid, returns boolean. To be valid this must exist in the MPC with IsActive True and not already be a user in the system""" result = masterpoint_query_row(f"id/{system_number}") return bool( result and result["IsActive"] == "Y" and not User.objects.filter( system_number=system_number, is_active=True ).exists() )
[docs] def system_number_lookup_api(self, system_number): """Called by the API""" result = masterpoint_query_row(f"id/{system_number}") if result: if User.objects.filter( system_number=system_number, is_active=True ).exists(): return False, "User already registered" if result["IsActive"] == "Y": # only use first name from given names given_name = result["GivenNames"].split(" ")[0] surname = result["Surname"] return True, (given_name, surname) return False, "Invalid or inactive number"
[docs] def user_summary(self, system_number): # Get summary data qry = f"{GLOBAL_MPSERVER}/mps/{system_number}" try: r = requests.get(qry).json() except ( IndexError, requests.exceptions.InvalidSchema, requests.exceptions.MissingSchema, requests.exceptions.ConnectionError, ConnectionError, ): r = [] if not r: return None summary = r[0] # Set active to a boolean summary["IsActive"] = summary["IsActive"] == "Y" # Get home club name qry = f'{GLOBAL_MPSERVER}/club/{summary["HomeClubID"]}' summary["home_club"] = requests.get(qry).json()[0]["ClubName"] return summary
[docs] class MasterpointFile(MasterpointFactory): """Concrete implementation of a masterpoint factory using a file to get the data"""
[docs] def get_masterpoints(self, system_number): pattern = f"{int(system_number):07}" result = mp_file_grep(pattern) if result: points = result[7] rank = f"{result[19]} Master" else: points = "Not found" rank = "Not found" return {"points": points, "rank": rank}
[docs] def system_number_lookup(self, system_number): pattern = f"{int(system_number):07}" result = mp_file_grep(pattern) if result: if User.objects.filter( system_number=system_number, is_active=True ).exists(): return "Error: User already registered" if result[6] == "Y": # only use first name from given names given_name = result[2].split(" ")[0] surname = result[1] return html.unescape(f"{given_name} {surname}") return "Error: Inactive or invalid number"
[docs] def system_number_valid(self, system_number): """Checks if this is valid, returns boolean. To be valid this must exist in the MPC with IsActive True and not already be a user in the system""" pattern = f"{int(system_number):07}" result = mp_file_grep(pattern) return bool( result and result[6] == "Y" and not User.objects.filter( system_number=system_number, is_active=True ).exists() )
[docs] def user_summary(self, system_number): pattern = f"{int(system_number):07}" result = mp_file_grep(pattern) if not result: return None return { "GivenNames": result[2], "Surname": result[1], "IsActive": result[6], "home_club": None, }
[docs] def masterpoint_factory_creator(): return MasterpointFile() if MP_USE_FILE else MasterpointDB()