Source code for masterpoints.factories
import html
import re
import requests
from accounts.models import User
from cobalt.settings import GLOBAL_MPSERVER, MP_USE_FILE
[docs]
def masterpoint_query_list(query):
"""Generic function to talk to the masterpoints SQL Server and return data as a list"""
url = f"{GLOBAL_MPSERVER}/{query}"
try:
response = requests.get(url, timeout=10).json()
except Exception as exc:
print(exc)
response = []
return response
[docs]
def masterpoint_query_row(query):
"""Generic function to get first row from query"""
ret = masterpoint_query_list(query)
if ret:
return ret[0]
return None
[docs]
def mp_file_grep(pattern):
with open("media/masterpoints/MPData.csv", "r", encoding="utf-8") as mp_file:
for line in mp_file:
if re.search(pattern, line):
return line.split(",")
[docs]
class MasterpointFactory:
"""Abstract class for accessing masterpoint data"""
[docs]
def get_masterpoints(self, system_number):
"""Get total masterpoints"""
[docs]
def system_number_lookup(self, system_number):
"""Look up the system number and return name"""
[docs]
def system_number_valid(self, system_number):
"""Look up the system number and return True if okay to add this user"""
[docs]
def user_summary(self, system_number):
"""Get basic information about a user"""
[docs]
class MasterpointDB(MasterpointFactory):
"""Concrete implementation of a masterpoint factory using a database to get the data"""
[docs]
def get_masterpoints(self, system_number):
summary = masterpoint_query_row(f"mps/{system_number}")
if summary:
points = summary["TotalMPs"]
rank = summary["RankName"] + " Master"
else:
points = "Not found"
rank = "Not found"
return {"points": points, "rank": rank}
[docs]
def system_number_lookup(self, system_number):
result = masterpoint_query_row(f"id/{system_number}")
if result:
if User.objects.filter(
system_number=system_number, is_active=True
).exists():
return "Error: User already registered"
if result["IsActive"] == "Y":
# only use first name from given names
given_name = result["GivenNames"].split(" ")[0]
surname = result["Surname"]
return html.unescape(f"{given_name} {surname}")
return "Error: Invalid or inactive number"
[docs]
def system_number_valid(self, system_number):
"""Checks if this is valid, returns boolean. To be valid this must exist in the MPC with IsActive True
and not already be a user in the system"""
result = masterpoint_query_row(f"id/{system_number}")
return bool(
result
and result["IsActive"] == "Y"
and not User.objects.filter(
system_number=system_number, is_active=True
).exists()
)
[docs]
def system_number_lookup_api(self, system_number):
"""Called by the API"""
result = masterpoint_query_row(f"id/{system_number}")
if result:
if User.objects.filter(
system_number=system_number, is_active=True
).exists():
return False, "User already registered"
if result["IsActive"] == "Y":
# only use first name from given names
given_name = result["GivenNames"].split(" ")[0]
surname = result["Surname"]
return True, (given_name, surname)
return False, "Invalid or inactive number"
[docs]
def user_summary(self, system_number):
# Get summary data
qry = f"{GLOBAL_MPSERVER}/mps/{system_number}"
try:
r = requests.get(qry).json()
except (
IndexError,
requests.exceptions.InvalidSchema,
requests.exceptions.MissingSchema,
requests.exceptions.ConnectionError,
ConnectionError,
):
r = []
if not r:
return None
summary = r[0]
# Set active to a boolean
summary["IsActive"] = summary["IsActive"] == "Y"
# Get home club name
qry = f'{GLOBAL_MPSERVER}/club/{summary["HomeClubID"]}'
summary["home_club"] = requests.get(qry).json()[0]["ClubName"]
return summary
[docs]
class MasterpointFile(MasterpointFactory):
"""Concrete implementation of a masterpoint factory using a file to get the data"""
[docs]
def get_masterpoints(self, system_number):
pattern = f"{int(system_number):07}"
result = mp_file_grep(pattern)
if result:
points = result[7]
rank = f"{result[19]} Master"
else:
points = "Not found"
rank = "Not found"
return {"points": points, "rank": rank}
[docs]
def system_number_lookup(self, system_number):
pattern = f"{int(system_number):07}"
result = mp_file_grep(pattern)
if result:
if User.objects.filter(
system_number=system_number, is_active=True
).exists():
return "Error: User already registered"
if result[6] == "Y":
# only use first name from given names
given_name = result[2].split(" ")[0]
surname = result[1]
return html.unescape(f"{given_name} {surname}")
return "Error: Inactive or invalid number"
[docs]
def system_number_valid(self, system_number):
"""Checks if this is valid, returns boolean. To be valid this must exist in the MPC with IsActive True
and not already be a user in the system"""
pattern = f"{int(system_number):07}"
result = mp_file_grep(pattern)
return bool(
result
and result[6] == "Y"
and not User.objects.filter(
system_number=system_number, is_active=True
).exists()
)
[docs]
def user_summary(self, system_number):
pattern = f"{int(system_number):07}"
result = mp_file_grep(pattern)
if not result:
return None
return {
"GivenNames": result[2],
"Surname": result[1],
"IsActive": result[6],
"home_club": None,
}
[docs]
def masterpoint_factory_creator():
return MasterpointFile() if MP_USE_FILE else MasterpointDB()