import logging
import re
import mimetypes
from datetime import datetime, date
from threading import Thread
from itertools import chain
from urllib.parse import urlencode
# JPG TESTING - to test queue progress
# import time
import boto3
import firebase_admin.messaging
from botocore.exceptions import ClientError
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.mail import EmailMultiAlternatives
from django.core.paginator import Paginator
from django.db import connection, IntegrityError
from django.db.models import Count, OuterRef, Subquery, CharField, Q
from django.db.models.functions import Cast
from django.http import HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils import timezone
from django.utils.html import strip_tags
from django.utils.safestring import mark_safe
from fcm_django.models import FCMDevice
from firebase_admin.messaging import (
Message,
Notification,
AndroidConfig,
AndroidNotification,
APNSConfig,
APNSPayload,
Aps,
)
from post_office import mail as po_email
from cobalt.settings import MEDIA_ROOT
from accounts.models import User, UserAdditionalInfo, UnregisteredUser
from cobalt.settings import (
COBALT_HOSTNAME,
DISABLE_PLAYPEN,
RBAC_EVERYONE,
DEFAULT_FROM_EMAIL,
GLOBAL_TITLE,
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY,
AWS_REGION_NAME,
TBA_PLAYER,
ALL_SYSTEM_ACCOUNTS,
ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS,
apply_large_email_batch_config,
)
from events.models import (
CongressMaster,
Congress,
Event,
EventEntryPlayer,
)
from notifications.forms import (
EmailContactForm,
EmailOptionsForm,
EmailContentForm,
EmailAttachmentForm,
)
from notifications.models import (
Snooper,
BatchID,
BatchActivity,
BatchContent,
BatchAttachment,
EmailBatchRBAC,
Email,
EmailAttachment,
RealtimeNotificationHeader,
RealtimeNotification,
Recipient,
InAppNotification,
UnregisteredBlockedEmail,
)
from organisations.club_admin_core import (
clear_club_email_bounced,
get_club_contact_list,
get_club_members,
get_club_member_list,
get_member_count,
get_member_details,
has_club_email_bounced,
MEMBERSHIP_STATES_ACTIVE,
MEMBERSHIP_STATES_DO_NOT_USE,
MEMBERSHIP_STATES_TERMINAL,
)
from organisations.models import (
Organisation,
ClubTag,
MemberClubTag,
OrgEmailTemplate,
)
from organisations.decorators import check_club_menu_access
from rbac.core import rbac_user_has_role, rbac_get_users_with_role
from rbac.views import rbac_forbidden
from post_office.models import Email as PostOfficeEmail
logger = logging.getLogger("cobalt")
# Max no of emails to send in a batch
MAX_EMAILS = 45
# Max number of threads
MAX_EMAIL_THREADS = 20
# Artificial id for EVERYONE club tag
EVERYONE_TAG_ID = 9999999
# Artificial id for all contacts club tag
CONTACTS_TAG_ID = 11111111
def _to_address_checker(to_address, context):
"""Check environment to see what the to_address should be. This protects us from sending to
real users from test environments
Args:
to_address(str): email address to verify based upon environment
context(dict): dict with email_body (hopefully)
"""
# If DISABLE_PLAYPEN is set, then just return this unmodified, e.g. production
if DISABLE_PLAYPEN == "ON":
return to_address, context
# TODO: Change this to a variable if we ever use anything other than AWS SES
# https://docs.aws.amazon.com/ses/latest/DeveloperGuide/send-email-simulator.html
safe_address = "success@simulator.amazonses.com"
# If the everyone user is set to a valid email then we send to that
# If still set to the default (a@b.com) then we ignore
everyone = User.objects.get(pk=RBAC_EVERYONE)
if everyone.email == "a@b.com":
return_address = safe_address
if "email_body" in context:
context[
"email_body"
] = f"""<h1>Non-production environment<h1>
<h2>This email was not sent</h2>
<h3>To send this in future, update the email address of EVERYONE
from a@b.com to a real email address.</h3>
{context["email_body"]}
"""
logger.warning(
f"DISABLE_PLAYPEN is OFF. Overriding email address from '{to_address}' to '{return_address}' "
f"We will use the email address of the EVERYONE user if it is not set to a@b.com."
)
else:
return_address = everyone.email
logger.warning(
f"DISABLE_PLAYPEN is OFF. Overriding email address from '{to_address}' to '{return_address}'"
)
return return_address, context
def _email_address_on_bounce_list(to_address):
"""Check if we are not sending to this address"""
# First check if it bounced
user_additional_info = UserAdditionalInfo.objects.filter(
user__email=to_address
).first()
club_email_bounced = has_club_email_bounced(to_address)
if (
user_additional_info and user_additional_info.email_hard_bounce
) or club_email_bounced:
logger.info(f"Not sending email to suppressed address - {to_address}")
return True
# Now check for unregistered users blocking sending
if UnregisteredBlockedEmail.objects.filter(email=to_address).exists():
logger.info(f"Not sending email to unregistered user at address - {to_address}")
return True
return False
[docs]
def custom_sender(from_name):
"""Returns a sender address string of the form "from_name<default_email_addres>" or None
The default email address is picked up from settings (eg "MyABF<donotreply@myabf.com.au>")
"""
if not from_name:
return None
match = re.search(r"<([^<>]+)>", DEFAULT_FROM_EMAIL)
if match:
return f"{from_name}<{match.group(1)}>"
else:
return None
[docs]
def club_default_template(club):
"""Determine a reasonable default email template for the club, or None
This uses the following rules:
0. If there are no club templates return None
1. If there is only one club template, regardless of name, use it (ie could be "Results")
2. If there is one called "Default" use it, regardless of what others exist
3. If there is no "Default", but "Results" and one other, use the other
4. If there is more than one and no "Default" (and not case 3) return None
"""
org_templates = OrgEmailTemplate.objects.filter(organisation=club).all()
default_template = None
if len(org_templates) == 0:
# no club templates (rule 0)
return None
elif len(org_templates) == 1:
# only one template, use it (rule 1)
default_template = org_templates.first()
else:
# more than one template, so see what we have
named_default = None
named_results = None
other_names = []
for org_template in org_templates:
if org_template.template_name.upper() == "RESULTS":
named_results = org_template
elif org_template.template_name.upper() == "DEFAULT":
named_default = org_template
else:
other_names.append(org_template)
if named_default:
# there is a "Default" so use it (rule 2)
default_template = named_default
elif named_results and len(other_names) == 1:
# there is a "Results" and only one other, so use the other (rule 3)
default_template = other_names[0]
return default_template
[docs]
def update_context_for_club_default_template(club, context):
"""Update the context dictionary with club default styling
Returns the OrgEmailTemplate objected used or None"""
default_template = club_default_template(club)
if default_template:
# update the context
if default_template.banner:
context["img_src"] = default_template.banner.url
if default_template.footer:
context["footer"] = default_template.footer
if default_template.box_colour:
context["box_colour"] = default_template.box_colour
if default_template.box_font_colour:
context["box_font_colour"] = default_template.box_font_colour
return default_template
[docs]
def send_cobalt_email_with_template(
to_address,
context,
template="system - default flex",
sender=None,
priority="medium",
batch_id=None,
reply_to=None,
attachments=None,
batch_size=1,
apply_default_template_for_club=None,
show_club_footer=True,
):
"""Queue an email using a template and context.
Args:
to_address (str or list): who to send to (JPG Comment - does not appear to support list)
context (dict): values to substitute into email template
template (str or EmailTemplate instance): it is more efficient to use an instance for multiple calls
sender (str): who to send from (None will use default from settings file)
priority (str): Django Post Office priority
batch_id (BatchID): batch_id for this batch of emails
reply_to (str): email address to send replies to
attachments (dict): optional dictionary of attachments
apply_default_template_for_club (Organisation): apply style settings from a default template for this club
show_club_footer (bool): allows caller to hide the footer from any club template being applied
Returns:
boolean: True if the message was sent, False otherwise
Context for the default template can have:
img_src: logo to override default MyABF logo
name: Users first name
title: Goes in title box
email_body: main part of email
additional_words: goes after main body
link: link for button e.g. /dashboard
link_text: words to go on link button
link_colour: default, primary, warning, danger, success, info
box_colour: default, primary, warning, danger, success, info
unregistered_identifier: will use alternative footer and show link to unregistered user preferences
"""
# Check if on bounce list
if _email_address_on_bounce_list(to_address):
logger.info(f"Ignoring email on bounce list {to_address}")
return False
# Augment context
context["host"] = COBALT_HOSTNAME
context["show_club_footer"] = show_club_footer
if apply_default_template_for_club:
default_org_template = update_context_for_club_default_template(
apply_default_template_for_club, context
)
else:
default_org_template = None
if "img_src" not in context:
context["img_src"] = "notifications/img/myabf-email.png"
# no need to defaul box colour now - default is in template html
# if "box_colour" not in context:
# context["box_colour"] = "primary"
# Note - link colour is not used in the template html
if "link_colour" not in context:
context["link_colour"] = "primary"
if "subject" not in context and "title" in context:
context["subject"] = context["title"]
# mark subject as safe or characters get changed
if context["subject"]:
context["subject"] = mark_safe(context["subject"])
# Check for playpen - don't send emails to users unless on production or similar
to_address, context = _to_address_checker(to_address, context)
# COB-793 - add custom header with batch size
headers = {"X-Myabf-Batch-Size": batch_size}
limited_notifications = apply_large_email_batch_config(batch_size)
if limited_notifications:
logger.debug(f"Email is part of a large batch of {batch_size}")
if reply_to:
headers["Reply-to"] = reply_to
elif default_org_template and default_org_template.reply_to:
headers["Reply-to"] = default_org_template.reply_to
this_sender = sender
if this_sender is None:
if default_org_template and default_org_template.from_name:
# this_sender = f"{default_org_template.from_name}<donotreply@myabf.com.au>"
this_sender = custom_sender(default_org_template.from_name)
if "img_src" in context:
context["inline_banner"] = context["img_src"][0] != "/"
else:
context["inline_banner"] = True
context["img_src"] = "notifications/img/myabf-email.png"
email = po_email.send(
sender=this_sender,
recipients=to_address,
template=template,
context=context,
render_on_delivery=True,
priority=priority,
headers=headers,
attachments=attachments,
)
Snooper(
post_office_email=email,
batch_id=batch_id,
limited_notifications=limited_notifications,
).save()
return True
[docs]
def create_rbac_batch_id(
rbac_role: str,
batch_id: BatchID = None,
user: User = None,
organisation: Organisation = None,
batch_type: str = "UNK",
batch_size: int = 0,
description: str = None,
complete: bool = False,
):
"""Create a new EmailBatchRBAC object to allow an RBAC role to access a batch of emails
Updated in sprint-48 to add type and description
Args:
rbac_role (str): the RBAC role to allow. e.g. "org.orgs.34.view"
batch_id (BatchID): batch ID, if None a new batch Id will be created
organisation: Org responsible for sending this
user: User responsible for sending this
batch_type: Type of batch (BatchID.BATCH_TYPE)
description: Email subject line or description
Returns: BatchID
"""
if not batch_id:
batch_id = BatchID()
batch_id.create_new()
batch_id.batch_type = batch_type
batch_id.batch_size = batch_size
batch_id.description = description if description else None
batch_id.state = (
BatchID.BATCH_STATE_COMPLETE if complete else BatchID.BATCH_STATE_WIP
)
batch_id.organisation = organisation
batch_id.save()
EmailBatchRBAC(
batch_id=batch_id,
rbac_role=rbac_role,
meta_sender=user,
meta_organisation=organisation,
).save()
return batch_id
[docs]
def send_cobalt_bulk_email(bcc_addresses, subject, message, reply_to=""):
"""Sends the same message to multiple people.
Args:
bcc_addresses (list): who to send to, list of strings
subject (str): subject line for email
message (str): message to send in HTML or plain format
reply_to (str): who to send replies to
Returns:
Nothing
"""
# start thread
thread = Thread(
target=send_cobalt_bulk_email_thread,
args=[bcc_addresses, subject, message, reply_to],
)
thread.setDaemon(True)
thread.start()
[docs]
def send_cobalt_bulk_email_thread(bcc_addresses, subject, message, reply_to):
"""Send bulk emails. Asynchronous thread
Args:
bcc_addresses (list): who to send to, list of strings
subject (str): subject line for email
message (str): message to send in HTML or plain format
reply_to (str): who to send replies to
Returns:
Nothing
"""
plain_message = strip_tags(message)
# split emails into chunks using an ugly list comprehension stolen from the internet
# turn [a,b,c,d] into [[a,b],[c,d]]
# fmt: off
emails_as_list = [
bcc_addresses[i * MAX_EMAILS: (i + 1) * MAX_EMAILS]
for i in range((len(bcc_addresses) + MAX_EMAILS - 1) // MAX_EMAILS)
]
# fmt: on
for emails in emails_as_list:
msg = EmailMultiAlternatives(
subject,
plain_message,
to=[],
bcc=emails,
from_email=DEFAULT_FROM_EMAIL,
reply_to=[reply_to],
)
msg.attach_alternative(message, "text/html")
msg.send()
for email in emails:
Email(
subject=subject,
message=message,
recipient=email,
status="Sent",
).save()
# Django creates a new database connection for this thread so close it
connection.close()
[docs]
def send_cobalt_bulk_notifications(
msg_list,
admin,
description,
invalid_lines=None,
total_file_rows=0,
sender_identification=None,
):
"""This originally sent messages over SMS, but now we only support FCM.
Args:
sender_identification(str): e.g. Compscore licence number to identify the sender
msg_list(list): list of tuples of system number and message to send (system_number, "message")
admin(User): administrator responsible for sending these messages
description(str): Text description of this batch of messages
invalid_lines(list): list of invalid lines in upload file
total_file_rows(int): Number of rows in original file
Returns:
sent_users(list): Who we think we sent messages to
unregistered_users(list): list of users who we do not know about
fcm_users(list): list of users we sent FCM messages to. Users may have multiple devices registered
un_contactable_users(list): list of users who don't have mobiles or haven't ticked to receive SMS
"""
unregistered_users = []
uncontactable_users = []
sent_users = []
# For now we just store the users, could change this to store users and devices, for non-blank headers this can be
# worked out anyway
fcm_sent_users = []
fcm_failed_users = []
# Log this batch
header = RealtimeNotificationHeader(
admin=admin,
description=description,
attempted_send_number=len(msg_list),
invalid_lines=invalid_lines,
total_record_number=total_file_rows,
sender_identification=sender_identification,
)
header.save()
# load data
app_users, fcm_lookup = _send_cobalt_bulk_notification_get_data(msg_list)
# Go through and try to send the messages
for item in msg_list:
system_number, msg = item
# Reformat string
msg = msg.replace("<br>", "\n")
fcm_device_list = fcm_lookup.get(system_number)
if fcm_device_list:
# If it works for any device, count that as successful
worked = False
for index, fcm_device in enumerate(fcm_device_list):
# Only add the first message to the database
add_message_to_database = index == 0
if send_fcm_message(
fcm_device, msg, admin, header, add_message_to_database
):
worked = True
if worked:
fcm_sent_users.append(system_number)
else:
fcm_failed_users.append(system_number)
uncontactable_users.append(system_number)
else:
unregistered_users.append(system_number)
# Update header
header.send_status = bool(sent_users + fcm_sent_users)
header.successful_send_number = len(sent_users) + len(fcm_sent_users)
# Save lists as strings using model functions
header.set_uncontactable_users(uncontactable_users)
header.set_unregistered_users(unregistered_users)
header.set_invalid_lines(invalid_lines)
header.save()
return sent_users + fcm_sent_users, unregistered_users, uncontactable_users
def _send_cobalt_bulk_notification_get_data(msg_list):
"""sub of send_cobalt_bulk_notifications to load required data"""
# Get system_numbers as list
system_numbers = [item[0] for item in msg_list]
# Get the App users (people set up with FCM)
app_users = FCMDevice.objects.filter(
user__system_number__in=system_numbers
).select_related("user")
# create dict of ABF number to FCM, can be multiple devices per person
fcm_lookup = {}
for app_user in app_users:
if app_user.user.system_number not in fcm_lookup:
fcm_lookup[app_user.user.system_number] = []
fcm_lookup[app_user.user.system_number].append(app_user)
return app_users, fcm_lookup
[docs]
def send_cobalt_sms(
phone_number, msg, from_name=GLOBAL_TITLE, header=None, member=None
):
"""Send single SMS. This will be replaced with a mobile app later
Args:
phone_number (str): who to send to
msg (str): message to send
from_name(str): Display name of sender
header(RealtimeNotificationHeader): parent for this message
member(User): user for this message
Returns:
None
"""
# from_name must be alpha-numeric or hyphens only, must start and end with alphanumeric, 11 chars max
if len(from_name) > 11:
from_name = from_name[:11]
# replace non alphanumerics with -
from_name = re.sub("[^0-9a-zA-Z]+", "-", from_name)
# Check start and end
if from_name[0] == "-":
from_name[0] = "A"
if len(from_name) == 11 and from_name[10] == "-":
from_name[10] = "A"
client = boto3.client(
"sns",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION_NAME,
)
# Assume the worst
return_code = False
try:
return_values = client.publish(
PhoneNumber=phone_number,
Message=msg,
MessageAttributes={
"AWS.SNS.SMS.SenderID": {
"DataType": "String",
"StringValue": from_name,
},
"AWS.SNS.SMS.SMSType": {
"DataType": "String",
"StringValue": "Transactional",
},
},
)
if return_values["ResponseMetadata"]["HTTPStatusCode"] == 200:
return_code = True
except ClientError:
logger.exception(f"Couldn't publish message to {phone_number}")
# Log it
RealtimeNotification(
member=member,
admin=header.admin,
status=return_code,
msg=msg,
header=header,
aws_message_id=return_values["MessageId"],
).save()
[docs]
def add_in_app_notification(member, msg, link=None):
"""Add a notification to the menu bar telling a user they have a message"""
InAppNotification(member=member, message=msg[:100], link=link).save()
def _cloudwatch_reader(log_group, notification):
"""Get data from Cloudwatch"""
client = boto3.client(
"logs",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION_NAME,
)
filter_pattern = f'{{ $.notification.messageId = "{notification.aws_message_id}" }}'
# TODO: Start and end times need investigated. Probably okay not to use them.
# start_time = int((datetime.now() - timedelta(hours=240)).timestamp()) * 1000
# end_time = int((datetime.now() + timedelta(hours=240)).timestamp()) * 1000
# It is possible to get multiple messages and to need a cursor (nextToken) to get all messages
# Get first response
response = client.filter_log_events(
logGroupName=log_group,
# startTime=start_time,
# endTime=end_time,
filterPattern=filter_pattern,
)
results = response["events"]
# Continue to build results if we got a nextToken
while "nextToken" in response:
response = client.filter_log_events(
logGroupName=log_group,
# startTime=start_time,
# endTime=end_time,
filterPattern=filter_pattern,
nextToken=response["nextToken"],
)
results.extend(response["events"])
return results
[docs]
@login_required()
def send_test_fcm_message(request, fcm_device_id):
"""Send a test message to a users registered FCM device"""
fcm_device = FCMDevice.objects.filter(pk=fcm_device_id).first()
# Check access
if fcm_device and (
fcm_device.user == request.user
or rbac_user_has_role(member=request.user, role="notifications.admin.view")
):
now = timezone.localtime().strftime("%a %d-%b-%Y %-I:%M")
now += timezone.localtime().strftime("%p").lower()
test_msg = (
f"This is a test message.\n\n"
f"It was sent to {fcm_device.user}.\n\n"
f"It was sent by {request.user}.\n\n"
f"It was sent on {now}."
)
send_fcm_message(fcm_device, test_msg, request.user)
return HttpResponse("Message sent")
return HttpResponse("Device not found or access denied")
[docs]
def send_fcm_message(
fcm_device, msg, admin=None, header=None, add_message_to_database=True
):
"""Send a message to a users registered FCM device"""
if not admin:
admin = User.objects.get(pk=RBAC_EVERYONE)
if add_message_to_database:
# For people with multiple devices we only add the message to the database once
RealtimeNotification(
member=fcm_device.user,
admin=admin,
msg=msg,
header=header,
fcm_device=fcm_device,
).save()
msg = Message(
notification=Notification(
title=f"Message for {fcm_device.user.first_name}", body=msg
),
android=AndroidConfig(
priority="high",
notification=AndroidNotification(sound="default", default_sound=True),
),
apns=APNSConfig(
payload=APNSPayload(
aps=Aps(sound="default"),
),
),
)
# Try to send the message, handle any error, so we don't break the whole sending group
try:
rc = fcm_device.send_message(msg)
except Exception as exc:
logger.error(exc.__str__())
return False
# log it
if type(rc) is firebase_admin.messaging.SendResponse:
logger.info(f"Sent message to {fcm_device.user} on device: {fcm_device.name}")
return True
# If we get an error then handle it
else:
logger.error(f"Error from FCM for {fcm_device.user} - {rc}")
logger.error(f"Deleting FCM device {fcm_device.name} for {fcm_device.user}")
fcm_device.delete()
return False
[docs]
def send_cobalt_email_to_system_number(
system_number, subject, message, club=None, administrator=None
):
"""Generic function to send a simple email to a user or unregistered user
if we get a club then we will use that to look for club specific email addresses
Updated for sprint-48 to pass additional header information to BatchID
Note: all emails sent via this function with a club specified are assumed to
be of batch_type Admin.
"""
from accounts.views.core import (
get_email_address_and_name_from_system_number,
)
email_address, first_name = get_email_address_and_name_from_system_number(
system_number, club
)
if not email_address:
logger.warning(
f"Unable to send email to {system_number}. No email address found."
)
return
un_registered_user = UnregisteredUser.objects.filter(
system_number=system_number
).first()
if un_registered_user:
unregistered_identifier = un_registered_user.identifier
else:
unregistered_identifier = None
context = {
"box_colour": "#00bcd4",
"name": first_name,
"title": subject,
"email_body": message,
"img_src": "/static/notifications/img/myabf-email.png",
"unregistered_identifier": unregistered_identifier,
}
if club:
# Create batch id so admins can see this email
batch_id = create_rbac_batch_id(
rbac_role=f"notifications.orgcomms.{club.id}.edit",
user=administrator,
organisation=club,
batch_type=BatchID.BATCH_TYPE_ADMIN,
batch_size=1,
description=subject,
complete=True,
)
else:
batch_id = None
send_cobalt_email_with_template(
to_address=email_address,
context=context,
batch_id=batch_id,
template="system - club",
apply_default_template_for_club=club if club else None,
)
[docs]
def remove_email_from_blocked_list(email_address):
"""Remove an email address from our internal list of blocked addresses"""
users = User.objects.filter(email=email_address)
for user in users:
user_additional_info, _ = UserAdditionalInfo.objects.get_or_create(user=user)
user_additional_info.email_hard_bounce = False
user_additional_info.email_hard_bounce_reason = None
user_additional_info.email_hard_bounce_date = None
user_additional_info.save()
clear_club_email_bounced(email_address)
[docs]
def get_notifications_statistics():
"""get stats about notifications. Called by util statistics"""
total_emails = PostOfficeEmail.objects.count()
total_real_time_notifications = RealtimeNotification.objects.count()
total_fcm_notifications = RealtimeNotification.objects.filter(
fcm_device__isnull=False
).count()
total_sms_notifications = total_real_time_notifications - total_fcm_notifications
total_registered_fcm_devices = FCMDevice.objects.count()
return {
"total_emails": total_emails,
"total_real_time_notifications": total_real_time_notifications,
"total_sms_notifications": total_sms_notifications,
"total_fcm_notifications": total_fcm_notifications,
"total_registered_fcm_devices": total_registered_fcm_devices,
}
def _add_user_to_recipients(club, batch, user, initial=True):
"""Add a user to the recipients of a batch.
Returns a tuple of (number added, message string)
If the user is already a recipient, set as included"""
# COB-940 ALL_SYSTEM_ACCOUNTS contains ids not system numbers
# so use ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS instead
if user.system_number in ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS:
return (0, f"{user.full_name} is a system account")
if not user.is_active or user.deceased:
return (0, f"{user.full_name} is inactive")
recipients = Recipient.objects.filter(batch=batch, system_number=user.system_number)
if recipients.exists():
recipient = recipients.first()
if not recipient.include:
recipient.include = True
recipient.save()
return (1, f"{user.full_name} included")
return (0, f"{user.full_name} already included")
else:
recipient = Recipient()
recipient.create_from_user(batch, user, initial=initial)
recipient.save()
return (1, f"{user.full_name} added")
_ADD_RECIPIENT_RESULT_OK = "OK"
_ADD_RECIPIENT_RESULT_SYSTEM_AC = "SYS"
_ADD_RECIPIENT_RESULT_DUPLICATE = "DUP"
_ADD_RECIPIENT_RESULT_INACTIVE = "INA"
_ADD_RECIPIENT_RESULT_NO_EMAIL = "NOE"
_ADD_RECIPIENT_RESULT_NOT_FOUND = "NOF"
_ADD_RECIPIENT_RESULTS = [
(_ADD_RECIPIENT_RESULT_OK, "added"),
(_ADD_RECIPIENT_RESULT_SYSTEM_AC, "system accounts"),
(_ADD_RECIPIENT_RESULT_DUPLICATE, "duplicates"),
(_ADD_RECIPIENT_RESULT_INACTIVE, "inactive users"),
(_ADD_RECIPIENT_RESULT_NO_EMAIL, "no email address"),
(_ADD_RECIPIENT_RESULT_NOT_FOUND, "not found"),
]
def _add_to_recipient_with_system_number(
batch, club, system_number, current_only=False
):
"""Add a club member or contact to the batch
Returns:
A result code
A user message
"""
# COB-940 ALL_SYSTEM_ACCOUNTS contains ids not system numbers
# so use ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS instead
if system_number in ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS:
return (_ADD_RECIPIENT_RESULT_SYSTEM_AC, "A system account")
# is the system number already a recipient?
existing = Recipient.objects.filter(
batch=batch, system_number=system_number
).first()
if existing:
if existing.include:
return (_ADD_RECIPIENT_RESULT_DUPLICATE, "Recipient already included")
else:
existing.include = True
existing.save()
return (_ADD_RECIPIENT_RESULT_OK, "Recipient added")
member_details = get_member_details(club, system_number)
if not member_details:
return (_ADD_RECIPIENT_RESULT_NOT_FOUND, "Recipient not found")
if not member_details.club_email:
return (_ADD_RECIPIENT_RESULT_NO_EMAIL, "No club email address available")
if member_details.membership_status in MEMBERSHIP_STATES_DO_NOT_USE:
return (
_ADD_RECIPIENT_RESULT_INACTIVE,
f"{member_details.first_name} {member_details.last_name} is inactive",
)
if (
current_only
and member_details.membership_status not in MEMBERSHIP_STATES_ACTIVE
):
return (
_ADD_RECIPIENT_RESULT_INACTIVE,
f"{member_details.first_name} {member_details.last_name} is not a current member",
)
recipient = Recipient()
recipient.system_number = system_number
recipient.batch = batch
recipient.first_name = member_details.first_name
recipient.last_name = member_details.last_name
recipient.email = member_details.club_email
recipient.include = True
recipient.initial = False
recipient.save()
return (_ADD_RECIPIENT_RESULT_OK, "Recipient added")
[docs]
@login_required
def compose_club_email(request, club_id):
"""Entry point for starting a new club batch email
Just create the batchId and then start the composition flow"""
role = f"notifications.orgcomms.{club_id}.edit"
if not rbac_user_has_role(request.user, role):
return rbac_forbidden(request, role)
club = get_object_or_404(Organisation, pk=club_id)
# let anyone with comms access to this org view them
batch = create_rbac_batch_id(
rbac_role=f"notifications.orgcomms.{club.id}.edit",
user=request.user,
organisation=club,
batch_type=BatchID.BATCH_TYPE_COMMS,
description=None,
complete=False,
)
return redirect("notifications:compose_email_recipients", club_id, batch.id)
[docs]
@login_required
def initiate_admin_multi_email(request, club_id):
"""Entry point for multi congress / event selection view
Just create the batch record and start the composition process"""
role = f"events.org.{club_id}.edit"
if not rbac_user_has_role(request.user, role):
return rbac_forbidden(request, role)
org = get_object_or_404(Organisation, pk=club_id)
# create the batch header
batch = create_rbac_batch_id(
f"events.org.{club_id}.edit",
organisation=org,
batch_type=BatchID.BATCH_TYPE_MULTI,
batch_size=0,
complete=False,
)
# go to club menu, comms tab, edit batch
return redirect("notifications:compose_email_multi_select", club_id, batch.id)
[docs]
def check_user_has_batch_access(user, batch):
"""Check whether the user has the appropriate RBAC role for the batch
Returns a tuple of boolean successs and the role checked (None if invalid batch type)
"""
if (
batch.batch_type
in batch.batch_type
in [
BatchID.BATCH_TYPE_ADMIN,
BatchID.BATCH_TYPE_COMMS,
BatchID.BATCH_TYPE_RESULTS,
]
):
role = f"notifications.orgcomms.{batch.organisation.id}.edit"
elif (
batch.batch_type
in batch.batch_type
in [
BatchID.BATCH_TYPE_CONGRESS,
BatchID.BATCH_TYPE_EVENT,
BatchID.BATCH_TYPE_MULTI,
BatchID.BATCH_TYPE_ENTRY,
]
):
role = f"events.org.{batch.organisation.id}.edit"
else:
return (False, None)
return (rbac_user_has_role(user, role), role)
[docs]
def check_club_and_batch_access():
"""Decorator to check club and batch email access rights when editing batches
Expects a request, club_id and batch_id_id.
Passes request, club and batch to the called function
Modelled on organisations/decorators.py/check_club_menu_access
"""
def _method_wrapper(function):
def _arguments_wrapper(request, club_id, batch_id_id, *args, **kwargs):
# Test if logged in
if not request.user.is_authenticated:
return redirect("/")
club = get_object_or_404(Organisation, pk=club_id)
batch = BatchID.objects.filter(pk=batch_id_id).first()
if not batch:
messages.error(
request,
"This batch no longer exists",
extra_tags="cobalt-message-error",
)
# return redirect("organisations:club_menu", club.id)
return redirect(
"organisations:club_menu_tab_entry_point",
club.id,
"comms",
)
# need to check for comms or congress permissions depending
# on the batch type being manipulated
access_granted, role_required = check_user_has_batch_access(
request.user, batch
)
if not access_granted:
if role_required:
return rbac_forbidden(request, role_required)
else:
return HttpResponse("Error - not an editable batch type")
if batch.state != BatchID.BATCH_STATE_WIP:
# batch is no longer editable, presumably has been sent by another user
messages.error(
request,
"This batch is no longer editable",
extra_tags="cobalt-message-error",
)
# return redirect("organisations:club_menu", club.id)
return redirect(
"organisations:club_menu_tab_entry_point",
club.id,
"comms",
)
# all ok
return function(request, club, batch, *args, **kwargs)
return _arguments_wrapper
return _method_wrapper
def _batch_has_been_customised(batch):
"""Returns whether a batch has been customised at all
Used to determine whether to show cancel or delete"""
# check the batch header information
if batch.description or batch.template or batch.reply_to or batch.from_name:
return True
# check for batch activities
if BatchActivity.objects.filter(batch=batch).count() > 0:
return True
# check for recipients
if Recipient.objects.filter(batch=batch).count() > 0:
return True
# check for batch content
if BatchContent.objects.filter(batch=batch).count() > 0:
return True
# check for attachements
if BatchAttachment.objects.filter(batch=batch).count() > 0:
return True
return False
[docs]
@check_club_and_batch_access()
def compose_email_multi_select(request, club, batch):
"""Compose batch emails - step 0 - select events (multis only)"""
# non_draft_congress_count=Count('congress_set', filter=~Q(congress_set__status='Draft'))
# select masters and count non-draft congresses, exclude any master with no non-draft congresses
masters = (
CongressMaster.objects.filter(org=club)
.annotate(
non_draft_congress_count=Count(
"congress",
filter=Q(congress__status="Published") | Q(congress__status="Closed"),
)
)
.filter(non_draft_congress_count__gt=0)
)
if request.method == "POST":
# update the selected batch activities and rebuild the recipients
# delete existing activities and recipients
BatchActivity.objects.filter(batch=batch).delete()
Recipient.objects.filter(batch=batch).delete()
# The form will return all selected items in the tree, including components
# of a higher level item (eg all events within a selected congress), so need
# to only add activities for the highest level items (eg the congress, not
# the events). The set of events, however, can be used to select the recipients.
selected_masters = []
selected_congresses = []
selected_events = []
added_count = 0
# process masters first (note - cannot trust the order keys are returned)
for key, value in request.POST.items():
parts = key.split("-")
if parts[0] != "master":
continue
master = get_object_or_404(CongressMaster, pk=int(value))
selected_masters.append(master.pk)
BatchActivity(
batch=batch,
activity_id=int(value),
activity_type=BatchActivity.ACTIVITY_TYPE_SERIES,
).save()
# process congresses, ignoring if part of a selected series
for key, value in request.POST.items():
parts = key.split("-")
if parts[0] != "congress":
continue
congress = get_object_or_404(Congress, pk=int(value))
selected_congresses.append(congress.pk)
if congress.congress_master.pk not in selected_masters:
BatchActivity(
batch=batch,
activity_id=int(value),
activity_type=BatchActivity.ACTIVITY_TYPE_CONGRESS,
).save()
# process events, adding an activity if not in a selected congress / master
# and always adding entrants to recipients
for key, value in request.POST.items():
parts = key.split("-")
if parts[0] != "event":
continue
event = get_object_or_404(Event, pk=int(value))
selected_events.append(event.pk)
if (
event.congress.pk not in selected_congresses
and event.congress.congress_master.pk not in selected_masters
):
BatchActivity(
batch=batch,
activity_id=int(value),
activity_type=BatchActivity.ACTIVITY_TYPE_EVENT,
).save()
# create recipients for the event
entered_players = (
EventEntryPlayer.objects.filter(
event_entry__event=event,
player__is_active=True,
)
.exclude(event_entry__entry_status="Cancelled")
.select_related("player")
)
for entered_player in entered_players:
# COB-940 ALL_SYSTEM_ACCOUNTS contains ids not system numbers
# so use ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS instead
if (
entered_player.player.system_number
not in ALL_SYSTEM_ACCOUNT_SYSTEM_NUMBERS
):
recipient = Recipient()
recipient.create_from_user(batch, entered_player.player)
try:
recipient.save()
added_count += 1
except IntegrityError:
# ignore duplicate system_numbers within the batch
pass
if added_count > 0:
# and redirect to the next step
return redirect("notifications:compose_email_recipients", club.id, batch.id)
else:
messages.add_message(request, messages.INFO, "No entrants found")
else:
# build the view from the selected batch activities
# Note that when a branch is selected (eg a series or congress), all elements
# on that branch must be added (eg if all events for a selected congress)
selected_masters = []
selected_congresses = []
selected_events = []
activities = BatchActivity.objects.filter(
batch=batch,
).all()
for activity in activities:
# add all congresses and events for a series
if activity.activity_type == BatchActivity.ACTIVITY_TYPE_SERIES:
master = get_object_or_404(CongressMaster, pk=activity.activity_id)
selected_masters.append(master.pk)
for congress in master.congress_set.all():
selected_congresses.append(congress.pk)
for event in congress.event_set.all():
selected_events.append(event.pk)
if activity.activity_type == BatchActivity.ACTIVITY_TYPE_CONGRESS:
congress = get_object_or_404(Congress, pk=activity.activity_id)
selected_congresses.append(congress.pk)
for event in congress.event_set.all():
selected_events.append(event.pk)
if activity.activity_type == BatchActivity.ACTIVITY_TYPE_EVENT:
event = get_object_or_404(Event, pk=activity.activity_id)
selected_events.append(event.pk)
start_date_str = (
batch.date_range_from.strftime("%d/%m/%Y") if batch.date_range_from else ""
)
end_date_str = (
batch.date_range_to.strftime("%d/%m/%Y") if batch.date_range_to else ""
)
return render(
request,
"notifications/batch_email_multi_event.html",
{
"step": 0,
"batch": batch,
"club": club,
"cancelable": not _batch_has_been_customised(batch),
"masters": masters,
"selected_masters": selected_masters,
"selected_congresses": selected_congresses,
"selected_events": selected_events,
"existing_selection": (
len(selected_masters) + len(selected_congresses) + len(selected_events)
)
> 0,
"start_date_str": start_date_str,
"end_date_str": end_date_str,
},
)
[docs]
@check_club_and_batch_access()
def compose_email_multi_select_by_date(request, club, batch):
"""User should have specified a start and end date to select by
Creates BatchActivities based on date and then redirects to main select view"""
# get and validate the date range
start_date_str = request.POST.get("start_date")
end_date_str = request.POST.get("end_date")
date_format = "%d/%m/%Y"
error_msg = None
try:
if start_date_str:
start_date = datetime.strptime(start_date_str, date_format).date()
else:
start_date = date(2020, 12, 1)
if end_date_str:
end_date = datetime.strptime(end_date_str, date_format).date()
else:
end_date = date.today()
except ValueError:
error_msg = "Invalid date"
if error_msg or end_date < start_date:
# invalid date range. Note message must be added to the request, not the response
messages.error(
request,
error_msg if error_msg else "Invalid date range",
extra_tags="cobalt-message-error",
)
base_url = reverse(
"notifications:compose_email_multi_select",
kwargs={
"club_id": club.id,
"batch_id_id": batch.id,
},
)
query_params = urlencode(
{"start_date_str": start_date_str, "end_date_str": end_date_str}
)
response = HttpResponse("Redirecting...", status=302)
response["HX-Redirect"] = f"{base_url}?{query_params}"
return response
# save the date range
batch.date_range_from = start_date
batch.date_range_to = end_date
batch.save()
# delete the existing batch activities
BatchActivity.objects.filter(batch=batch).delete()
# build the new set of batch activities
masters = CongressMaster.objects.filter(org=club)
total_event_count = 0
for master in masters:
selected_congresses = []
excluded_congresses = []
for congress in master.congress_set.all():
if congress.status != "Draft":
selected_events = []
excluded_events = []
# check each of the events in this congress
for event in congress.event_set.all():
# calculate event date range from session dates
event_start_date = None
event_end_date = None
for session in event.session_set.all():
if event_start_date is None:
event_start_date = session.session_date
event_end_date = session.session_date
elif session.session_date < event_start_date:
event_start_date = session.session_date
elif session.session_date > event_end_date:
event_end_date = session.session_date
if event_start_date >= start_date and event_end_date <= end_date:
selected_events.append(event)
total_event_count += 1
else:
excluded_events.append(event)
if len(excluded_events) == 0:
# all events selected (ie none excluded), so include the congress
selected_congresses.append(congress)
else:
# count the congress as excluded and create batch activities for the events
excluded_congresses.append(congress)
# for selected_event in selected_congresses:
for selected_event in selected_events:
BatchActivity(
batch=batch,
activity_type=BatchActivity.ACTIVITY_TYPE_EVENT,
activity_id=selected_event.id,
).save()
if len(excluded_congresses) == 0:
# all congresses in the master selected so create a batch activity for the master
BatchActivity(
batch=batch,
activity_type=BatchActivity.ACTIVITY_TYPE_SERIES,
activity_id=master.id,
).save()
else:
# some subset of congresses selected (possibly none), so create congress batch activities
for selected_congress in selected_congresses:
BatchActivity(
batch=batch,
activity_type=BatchActivity.ACTIVITY_TYPE_CONGRESS,
activity_id=selected_congress.id,
).save()
# display the new list, with an appropriate message about the results
if total_event_count == 0:
messages.warning(
request,
f"Nothing selected for the date range {start_date.strftime(date_format)} to {end_date.strftime(date_format)}",
extra_tags="cobalt-message-warning",
)
else:
messages.info(
request,
f"{total_event_count} event{'' if total_event_count == 1 else 's'} selected for the date range {start_date.strftime(date_format)} to {end_date.strftime(date_format)}",
)
base_url = reverse(
"notifications:compose_email_multi_select",
kwargs={
"club_id": club.id,
"batch_id_id": batch.id,
},
)
query_params = urlencode(
{"start_date_str": start_date_str, "end_date_str": end_date_str}
)
response = HttpResponse("Redirecting...", status=302)
response["HX-Redirect"] = f"{base_url}?{query_params}"
return response
[docs]
@check_club_and_batch_access()
def compose_email_multi_clear_date_range_htmx(request, club, batch):
"""Clear the date range stored in the batch
Called whenever a change is made after a select by date"""
batch.date_range_from = None
batch.date_range_to = None
batch.save()
# Return a 204 No Content response
return HttpResponse(status=204)
[docs]
@check_club_and_batch_access()
def compose_email_recipients(request, club, batch):
"""Compose batch emails - step 1 - review recipients"""
congress_stream = batch.batch_type in [
BatchID.BATCH_TYPE_CONGRESS,
BatchID.BATCH_TYPE_EVENT,
BatchID.BATCH_TYPE_MULTI,
]
if request.method == "GET":
try:
page_number = int(request.GET.get("page"))
except ValueError:
page_number = 1
except TypeError:
# None passed
page_number = 1
else:
page_number = 1
# get all of the recients for the batch and paginate
recipients = Recipient.objects.filter(
batch=batch,
).order_by("initial", "last_name", "first_name")
recipient_count = recipients.filter(include=True).count()
if recipient_count != batch.batch_size:
batch.batch_size = recipient_count
batch.save()
page_size = 20
pages = Paginator(recipients, page_size)
page = pages.get_page(page_number)
# work out where the added and initial headers should be placed on the current page
added_count = Recipient.objects.filter(batch=batch, initial=False).count()
initial_count = Recipient.objects.filter(batch=batch, initial=True).count()
if added_count == 0 or initial_count == 0:
initial_header_before_row = None
added_header_before_row = None
else:
first_row_on_page = (page_number - 1) * page_size + 1
last_row_on_page = min(
(initial_count + added_count), first_row_on_page + page_size - 1
)
if added_count <= first_row_on_page:
# have paged past the beginning of the initial selection, so show a header
initial_header_before_row = 1
added_header_before_row = None
else:
# top of page is in the added section, so show a header
added_header_before_row = 1
if added_count < last_row_on_page:
initial_header_before_row = added_count - (first_row_on_page - 1) + 1
else:
initial_header_before_row = None
# determine range of pages to show in pagination row
half_span = 4
# the number of pages to the left and right if in the middle of a large number of pages
full_span = half_span * 2 + 1
if pages.num_pages <= full_span:
# simple case - able to show all pages at once
page_range = range(1, pages.num_pages + 1)
else:
if page_number <= (half_span + 1):
# near the beginning
page_range = range(1, full_span + 1)
elif page_number >= (pages.num_pages - half_span - 1):
# near the end
page_range = range(pages.num_pages - full_span + 1, pages.num_pages + 1)
else:
# in the middle
page_range = range(page_number - half_span, page_number + half_span + 1)
return render(
request,
"notifications/batch_email_recipients.html",
{
"step": 1,
"batch": batch,
"club": club,
"cancelable": not _batch_has_been_customised(batch),
"page": page,
"page_range": page_range,
"initial_header_before_row": initial_header_before_row,
"added_header_before_row": added_header_before_row,
"congress_stream": congress_stream,
"recipient_count": recipient_count,
},
)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_add_self(request, club, batch):
"""Add current user to the recipient list"""
_, feedback = _add_user_to_recipients(club, batch, request.user, initial=False)
messages.add_message(request, messages.INFO, feedback)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_add_congress_email(request, club, batch):
"""Add the congress contact email(s) to the recipient list"""
# build a list of congress email addresses from batch activities
congress_emails = set() # set to avoid duplicates
for activity in batch.activities.all():
if activity.activity_type == BatchActivity.ACTIVITY_TYPE_CONGRESS:
congress = get_object_or_404(Congress, pk=activity.activity_id)
congress_emails.add(congress.contact_email)
elif activity.activity_type == BatchActivity.ACTIVITY_TYPE_EVENT:
event = get_object_or_404(Event, pk=activity.activity_id)
congress_emails.add(event.congress.contact_email)
elif activity.activity_type == BatchActivity.ACTIVITY_TYPE_SERIES:
series = get_object_or_404(CongressMaster, pk=activity.activity_id)
for congress in series.congress_set.all():
congress_emails.add(congress.contact_email)
# add the contact emails as recipients
added_count = 0
for email in congress_emails:
already_in = Recipient.objects.filter(email=email)
if already_in.exists():
# exists, but make sure that it is included
recipient = already_in.first()
if not recipient.include:
recipient.include = True
recipient.save()
added_count += 1
else:
# add it
recipient = Recipient()
recipient.batch = batch
recipient.email = email
recipient.first_name = None
recipient.last_name = f"Contact Email {email}"
recipient.system_number = None
recipient.include = True
recipient.initial = False
recipient.save()
added_count += 1
if added_count == 0:
messages.add_message(request, messages.WARNING, "No contact emails added")
else:
messages.add_message(
request,
messages.INFO,
f"{added_count} contact email{'s' if added_count > 1 else ''} added",
)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_add_tadmins(request, club, batch):
"""Add club tournament organisers to the recipients"""
tournament_admins = rbac_get_users_with_role(f"events.org.{club.id}.edit")
added_count = 0
for td in tournament_admins:
delta, _ = _add_user_to_recipients(club, batch, td, initial=False)
added_count += delta
if added_count == 0:
messages.add_message(request, messages.WARNING, "No tournament admins added")
else:
messages.add_message(
request,
messages.INFO,
f"{added_count} tournament admin{'s' if added_count > 1 else ''} added",
)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
def _updated_recipient_count(request, batch):
"""Return an HTML snippet with te updated recipent count for the batch"""
recipient_count = Recipient.objects.filter(
batch=batch,
include=True,
).count()
if recipient_count != batch.batch_size:
batch.batch_size = recipient_count
batch.save()
return HttpResponse(
f"{recipient_count if recipient_count else 'No'} recipient{'' if recipient_count == 1 else 's'}"
)
[docs]
@login_required()
def compose_email_recipients_toggle_recipient_htmx(request, recipient_id):
"""Toggle the include state of the recipient"""
recipient = get_object_or_404(Recipient, pk=recipient_id)
# check access
access_granted, role_required = check_user_has_batch_access(
request.user, recipient.batch
)
if not access_granted:
return rbac_forbidden(request, role_required, htmx=True)
recipient.include = not recipient.include
recipient.save()
# return HttpResponse(status=204)
return _updated_recipient_count(request, recipient.batch)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_select_all(request, club, batch):
"""Include all current recipients"""
Recipient.objects.filter(batch=batch, include=False).update(include=True)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_deselect_all(request, club, batch):
"""Deselect all current recipients"""
Recipient.objects.filter(batch=batch, include=True).update(include=False)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_remove_unselected_htmx(request, club, batch):
"""Remove all unselected recipients from a batch"""
Recipient.objects.filter(batch=batch, include=False).delete()
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_member_search_htmx(request, club, batch):
"""Returns a list of club member and contact search candidates
Searches by first name, last name or system number (not a combination)
Matches on the start of the relevent field, and can include members
with no club email address (unregistered users).
Such unregsistered users will be shown in the UI without a link.
It may be less confusing if a known member is on the list but
not selectable, rather than not there at all.
"""
first_name_search = request.POST.get("member-search-first", "")
last_name_search = request.POST.get("member-search-last", "")
system_number_search = request.POST.get("member-search-number", "")
# if there is nothing to search for, don't search
if not first_name_search and not last_name_search and not system_number_search:
return HttpResponse("")
member_details = get_club_members(club, exclude_contacts=False, active_only=False)
if first_name_search and not last_name_search:
first_name_search_upper = first_name_search.upper()
members = [
member
for member in member_details
if member.first_name.upper().startswith(first_name_search_upper)
]
elif last_name_search and not first_name_search:
last_name_search_upper = last_name_search.upper()
members = [
member
for member in member_details
if member.last_name.upper().startswith(last_name_search_upper)
]
elif first_name_search and last_name_search:
first_name_search_upper = first_name_search.upper()
last_name_search_upper = last_name_search.upper()
members = [
member
for member in member_details
if member.first_name.upper().startswith(first_name_search_upper)
and member.last_name.upper().startswith(last_name_search_upper)
]
else:
members = [
member
for member in member_details
if not member.internal
and str(member.system_number).startswith(system_number_search)
]
return render(
request,
"notifications/batch_email_recipients_member_search_htmx.html",
{
"club": club,
"batch": batch,
"members": members,
"inactive_states": MEMBERSHIP_STATES_TERMINAL,
},
)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_add_tag(request, club, batch, tag_id):
"""Add recipients from a club tag"""
reason_counts = {reason_code: 0 for (reason_code, _) in _ADD_RECIPIENT_RESULTS}
if tag_id in [EVERYONE_TAG_ID, CONTACTS_TAG_ID]:
# add all members
if tag_id == EVERYONE_TAG_ID:
system_numbers = get_club_member_list(club)
else:
system_numbers = get_club_contact_list(club)
for system_number in system_numbers:
result, _ = _add_to_recipient_with_system_number(batch, club, system_number)
reason_counts[result] += 1
else:
# add from a real club tag
tag = get_object_or_404(ClubTag, pk=tag_id)
tag_members = MemberClubTag.objects.filter(club_tag=tag)
for mct in tag_members:
result, _ = _add_to_recipient_with_system_number(
batch, club, mct.system_number
)
reason_counts[result] += 1
msg = f"{reason_counts[_ADD_RECIPIENT_RESULT_OK]} recipient{'s' if reason_counts[_ADD_RECIPIENT_RESULT_OK] != 1 else ''} added"
excluded_count = 0
excluded_msg = ""
for excluded_reason, excluded_desc in _ADD_RECIPIENT_RESULTS[1:]:
if reason_counts[excluded_reason]:
excluded_count += reason_counts[excluded_reason]
excluded_msg += f"{reason_counts[excluded_reason]} {excluded_desc}, "
if excluded_count:
msg += f" ({excluded_count} not added: {excluded_msg[:-2]})"
messages.add_message(
request,
messages.INFO,
msg,
)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_add_member(request, club, batch, system_number):
"""Add a club member by system number as a recipient"""
_, feedback = _add_to_recipient_with_system_number(batch, club, system_number)
messages.add_message(request, messages.INFO, feedback)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_recipients_remove_tag(request, club, batch, tag_id, from_all):
"""Remove tagged club members from a batch's recipients
Either from all recipeinets, or from added (ie not initial) recipients only"""
if tag_id == EVERYONE_TAG_ID:
system_numbers = get_club_member_list(club)
else:
tag = get_object_or_404(ClubTag, pk=tag_id)
source = MemberClubTag.objects.filter(club_tag=tag)
system_numbers = [item.system_number for item in source.all()]
if from_all:
# un-include all occurances
Recipient.objects.filter(
batch=batch, system_number__in=system_numbers, include=True
).update(include=False)
else:
# only un-include from non-initial recipients
Recipient.objects.filter(
batch=batch, system_number__in=system_numbers, include=True, initial=False
).update(include=False)
messages.add_message(
request,
messages.INFO,
f"{'EVERYONE' if tag_id == EVERYONE_TAG_ID else tag.tag_name} removed",
)
return redirect("notifications:compose_email_recipients", club.id, batch.id)
[docs]
@check_club_and_batch_access()
def compose_email_options(request, club, batch):
"""Compose batch emails - step 2 - email options"""
if request.method == "POST":
email_options_form = EmailOptionsForm(request.POST, club=club)
if email_options_form.is_valid():
# When a template is selected it populates the other two fields
# from the template. These values can be changed and would then override
# the template values. Rather than implement complex logic to determine
# whether the values are being overridden, just save the values as provided.
if email_options_form.cleaned_data.get("template"):
selected_template_id = email_options_form.cleaned_data.get("template")
if selected_template_id != 0:
template = get_object_or_404(
OrgEmailTemplate, pk=selected_template_id
)
else:
template = None
else:
template = None
selected_template_id = None
batch.template = template
batch.reply_to = email_options_form.cleaned_data.get("reply_to")
batch.from_name = email_options_form.cleaned_data.get("from_name")
batch.save()
# proceed to step 3 - content
return redirect("notifications:compose_email_content", club.id, batch.id)
else:
email_options_form = EmailOptionsForm(club=club)
if batch.template:
# use the stored template, but override the template value for the other two fields
selected_template_id = batch.template.id
email_options_form.fields["from_name"].initial = batch.from_name
email_options_form.fields["reply_to"].initial = batch.reply_to
elif len(email_options_form.fields["template"].choices) > 0:
# first time through
# apply the rules for determining a default
# (could return None even if there are templates defined)
org_default_template = club_default_template(club)
if org_default_template:
selected_template_id = org_default_template.id
else:
# use the first one, unless it is RESULTS and there is another option
selected_template_id = email_options_form.fields["template"].choices[0][
0
]
if (
email_options_form.fields["template"].choices[0][1].upper()
== "RESULTS"
):
if len(email_options_form.fields["template"].choices) > 1:
selected_template_id = email_options_form.fields[
"template"
].choices[1][0]
template = get_object_or_404(OrgEmailTemplate, pk=selected_template_id)
email_options_form.fields["from_name"].initial = template.from_name
email_options_form.fields["reply_to"].initial = template.reply_to
# save these to the batch
batch.template = template
batch.from_name = template.from_name
batch.reply_to = template.reply_to
batch.save()
else:
# no templates so just use defaults
selected_template_id = None
email_options_form.fields["from_name"].initial = batch.from_name
email_options_form.fields["reply_to"].initial = batch.reply_to
return render(
request,
"notifications/batch_email_options.html",
{
"selected_template_id": selected_template_id,
"step": 2,
"batch": batch,
"club": club,
"cancelable": not _batch_has_been_customised(batch),
"email_options_form": email_options_form,
},
)
[docs]
@check_club_and_batch_access()
def compose_email_options_from_and_reply_to_htmx(request, club, batch):
"""Rebuild the from and reply_to fields in the send email form if the template changes"""
template_id = request.POST.get("template")
template = get_object_or_404(OrgEmailTemplate, pk=template_id)
email_options_form = EmailOptionsForm(club=club)
email_options_form.fields["from_name"].initial = template.from_name
email_options_form.fields["reply_to"].initial = template.reply_to
# save these to the batch
batch.template = template
batch.from_name = template.from_name
batch.reply_to = template.reply_to
batch.save()
return render(
request,
"notifications/batch_email_options_from_and_reply_to_htmx.html",
{"batch": batch, "club": club, "email_options_form": email_options_form},
)
[docs]
@check_club_and_batch_access()
def compose_email_content(request, club, batch):
"""Compose batch emails - step 1 - review recipients"""
ready_to_send = False
if request.method == "POST":
email_content_form = EmailContentForm(request.POST)
if email_content_form.is_valid():
if hasattr(batch, "batchcontent"):
batch.batchcontent.email_body = email_content_form.cleaned_data.get(
"email_body", ""
)
batch.batchcontent.save()
else:
new_content = BatchContent()
new_content.batch = batch
new_content.email_body = email_content_form.cleaned_data.get(
"email_body", ""
)
new_content.save()
if email_content_form.cleaned_data.get("subject"):
batch.description = email_content_form.cleaned_data.get(
"subject", "Batch email"
)
batch.save()
ready_to_send = True
pass
else:
email_content_form = EmailContentForm()
email_content_form.fields["subject"].initial = batch.description
if hasattr(batch, "batchcontent"):
email_content_form.fields["email_body"].initial = (
batch.batchcontent.email_body
)
ready_to_send = True
return render(
request,
"notifications/batch_email_content.html",
{
"step": 3,
"batch": batch,
"club": club,
"cancelable": not _batch_has_been_customised(batch),
"email_content_form": email_content_form,
"ready_to_send": ready_to_send,
},
)
[docs]
@check_club_and_batch_access()
def compose_email_content_preview_htmx(request, club, batch):
"""Show a preview of the emails in the batch in a separate window"""
context = {
"host": COBALT_HOSTNAME,
"batch": batch,
"name": "Member",
"subject": batch.description,
}
# determine the html template and title values based on the batch type
if batch.batch_type in [
BatchID.BATCH_TYPE_CONGRESS,
BatchID.BATCH_TYPE_EVENT,
]:
context["po_template_name"] = "two_headings"
context["po_template_html_name"] = "po_email_with_two_headings_flex.html"
activity = BatchActivity.objects.filter(batch=batch).first()
if activity.activity_type == BatchActivity.ACTIVITY_TYPE_CONGRESS:
congress = get_object_or_404(Congress, pk=activity.activity_id)
else:
event = get_object_or_404(Event, pk=activity.activity_id)
congress = event.congress
context["title1"] = (
f"Message from {request.user.full_name} on behalf of {congress}"
)
context["title2"] = batch.description
elif batch.batch_type in [
BatchID.BATCH_TYPE_MULTI,
]:
context["po_template_name"] = "two_headings"
context["po_template_html_name"] = "po_email_with_two_headings_flex.html"
context["title1"] = f"Message from {request.user.full_name} on behalf of {club}"
context["title2"] = batch.description
elif batch.batch_type in [
BatchID.BATCH_TYPE_COMMS,
BatchID.BATCH_TYPE_RESULTS,
]:
context["po_template_name"] = "club"
context["po_template_html_name"] = "po_club_template.html"
context["title"] = batch.description
else:
context["po_template_name"] = "defaults"
context["po_template_html_name"] = "po_email_default_flex.html"
context["title"] = batch.description
batch_content = get_object_or_404(BatchContent, batch=batch)
context["email_body"] = batch_content.email_body
if batch.template:
org_template = batch.template
else:
org_template = club_default_template(club) or OrgEmailTemplate(
organisation=club
)
context["img_src"] = org_template.banner.url
context["box_colour"] = org_template.box_colour
context["box_font_colour"] = org_template.box_font_colour
context["footer"] = org_template.footer
context["from_name"] = org_template.from_name
context["reply_to"] = org_template.reply_to
context["attachment_objects"] = EmailAttachment.objects.filter(batches__batch=batch)
# host
# img_src
# box_colour
# box_font_colour
# subject
# email_body
return render(
request,
"notifications/batch_email_content_preview_htmx.html",
context,
)
[docs]
@check_club_and_batch_access()
def compose_email_content_send_htmx(request, club, batch):
"""Handle sending a test message or the full batch
Redirects to one of the process steps if there is an issue, otherwise
redirects to the watch email view.
"""
(ok_to_send, error_message, rectification_step) = _validate_batch_details(batch)
if ok_to_send:
(attachments, attachment_size) = _attachment_dict_for_batch(batch)
if attachment_size > 10_000_000:
(ok_to_send, error_message, rectification_step) = (
False,
"Attachments are too large",
3,
)
if ok_to_send:
dispatched = _dispatch_batch(
request,
club,
batch,
attachments,
test_user=request.user if "test" in request.POST else None,
)
if dispatched:
if "test" in request.POST and request.POST["test"] == "test":
messages.success(
request,
f"Test message sent to {request.user.email}",
extra_tags="cobalt-message-success",
)
return HttpResponse(f"Test message sent to {request.user.email}")
else:
# redirect to email watch view
# is this really useful for very samll batches (eg 1-4 recipients)
response = HttpResponse("Redirecting...", status=302)
# response["HX-Redirect"] = reverse(
# "notifications:watch_emails", kwargs={"batch_id": batch.batch_id}
# )
response["HX-Redirect"] = reverse(
"organisations:club_menu_tab_entry_point",
kwargs={"club_id": club.id, "tab_name": "comms"},
)
return response
else:
(ok_to_send, error_message, rectification_step) = (
False,
"Unable to send",
3,
)
messages.error(
request,
error_message,
extra_tags="cobalt-message-error",
)
response = HttpResponse("Redirecting...", status=302)
if rectification_step == 1:
response["HX-Redirect"] = reverse(
"notifications:compose_email_recipients",
kwargs={"club_id": club.id, "batch_id": batch.id},
)
elif rectification_step == 2:
response["HX-Redirect"] = reverse(
"notifications:compose_email_options",
kwargs={"club_id": club.id, "batch_id": batch.id},
)
else:
response["HX-Redirect"] = reverse(
"notifications:compose_email_content",
kwargs={"club_id": club.id, "batch_id": batch.id},
)
return response
def _attachment_dict_for_batch(batch):
"""Returns an attachment dictionary and total attachment size (bytes)"""
attachment_ids = BatchAttachment.objects.filter(batch=batch).values_list(
"attachment_id", flat=True
)
attachment_id_list = list(attachment_ids)
attachments = {}
total_size = 0.0
if len(attachment_id_list) > 0:
attachments_objects = EmailAttachment.objects.filter(id__in=attachment_id_list)
for attachments_object in attachments_objects:
mime_type, _ = mimetypes.guess_type(attachments_object.filename())
if mime_type is None:
attachments[attachments_object.filename()] = (
attachments_object.attachment.path
)
else:
attachments[attachments_object.filename()] = {
"file": attachments_object.attachment.path,
"mimetype": mime_type,
}
total_size += attachments_object.attachment.size
return (attachments, total_size)
def _validate_batch_details(batch):
"""Check whether the batch is really ready to send
Returns a tuple of:
Success (Trie/False)
User error message
Process step to rectify (1,2, 3)
"""
if batch.state != BatchID.BATCH_STATE_WIP:
return (False, "Batch has already been sent")
if len(batch.description) == 0:
return (False, "Please specify a subject")
recipient_count = Recipient.objects.filter(batch=batch, include=True).count()
if recipient_count == 0:
return (False, "Batch has no recipients")
return (True, None, None)
def _dispatch_batch(request, club, batch, attachments, test_user=None):
"""Queue a batch of emails to be sent
If a test_user is specified the email is only sent to that user.
Returns success (true/false)
"""
# get the recipients
if test_user:
recipients = [test_user]
else:
recipients = Recipient.objects.filter(
batch=batch,
include=True,
)
# build the template rendering context
context = {
"subject": batch.description,
}
if hasattr(batch, "batchcontent"):
context["email_body"] = batch.batchcontent.email_body
if batch.template:
org_template = batch.template
else:
org_template = club_default_template(club) or OrgEmailTemplate(
organisation=club
)
if org_template.banner:
context["img_src"] = org_template.banner.url
if org_template.footer:
context["footer"] = org_template.footer
if org_template.box_colour:
context["box_colour"] = org_template.box_colour
if org_template.box_font_colour:
context["box_font_colour"] = org_template.box_font_colour
# determine which EmailTemplate to use, and update context with
# and template specific parameters
if batch.batch_type in [
BatchID.BATCH_TYPE_CONGRESS,
BatchID.BATCH_TYPE_EVENT,
]:
po_template = "system - two headings flex"
activity = BatchActivity.objects.filter(batch=batch).first()
if activity.activity_type == BatchActivity.ACTIVITY_TYPE_CONGRESS:
congress = get_object_or_404(Congress, pk=activity.activity_id)
else:
event = get_object_or_404(Event, pk=activity.activity_id)
congress = event.congress
context["title1"] = (
f"Message from {request.user.full_name} on behalf of {congress}"
)
context["title2"] = batch.description
elif batch.batch_type in [
BatchID.BATCH_TYPE_MULTI,
]:
po_template = "system - two headings flex"
context["title1"] = f"Message from {request.user.full_name} on behalf of {club}"
context["title2"] = batch.description
elif batch.batch_type in [
BatchID.BATCH_TYPE_COMMS,
BatchID.BATCH_TYPE_RESULTS,
]:
po_template = "system - club"
context["title"] = batch.description
else:
context["title"] = batch.description
po_template = "system - default flex"
# other arguements required to send the email
# from_name = batch.from_name # where is this used ?
reply_to = batch.reply_to
if len(recipients) == 1:
context["name"] = recipients[0].first_name
# sender = f"{batch.from_name}<donotreply@myabf.com.au>" if batch.from_name else None
sender = custom_sender(batch.from_name)
send_cobalt_email_with_template(
to_address=recipients[0].email,
context=context,
template=po_template,
batch_id=None if test_user else batch,
reply_to=reply_to,
sender=sender,
attachments=attachments if len(attachments) > 0 else None,
batch_size=1,
)
if test_user is None:
_finalise_email_batch(batch, batch_size=1)
else:
# send in a separate thread
# start thread
thread = Thread(
target=_dispatch_batch_thread,
args=[
batch,
recipients,
context,
po_template,
reply_to,
attachments,
],
)
thread.setDaemon(True)
thread.start()
return True
def _dispatch_batch_thread(
batch,
recipients,
context,
po_template,
reply_to,
attachments,
):
"""Asynchronous thread to send bulk emails for a batch"""
# Mark the batch as in flight
batch.state = BatchID.BATCH_STATE_IN_FLIGHT
batch.batch_size = len(recipients)
batch.save()
# sender = f"{batch.from_name}<donotreply@myabf.com.au>" if batch.from_name else None
sender = custom_sender(batch.from_name)
try:
for recipient in recipients:
# JPG TESTING - to test queuing progress
# time.sleep(3)
context["name"] = recipient.first_name
send_cobalt_email_with_template(
to_address=recipient.email,
context=context,
template=po_template,
batch_id=batch,
reply_to=reply_to,
sender=sender,
attachments=attachments if len(attachments) > 0 else None,
batch_size=batch.batch_size,
)
logger.info(
f"Queued email to {recipient.first_name} {recipient.last_name}, {recipient.email}"
)
except Exception as e:
# something went wrong, so mark the batch as errored and reraise the exception
batch.state = BatchID.BATCH_STATE_ERRORED
batch.save()
logger.error(f"Error queuing email batch, Exception {e}")
raise
_finalise_email_batch(batch)
[docs]
@check_club_and_batch_access()
def compose_email_content_attachment_htmx(request, club, batch):
"""Handle the attachments pane"""
email_attachments = EmailAttachment.objects.filter(organisation=club).order_by(
"-pk"
)[:50]
# Add hx_vars for the delete function
for email_attachment in email_attachments:
email_attachment.hx_vars = (
f"club_id:{club.id},email_attachment_id:{email_attachment.id}"
)
email_attachment.modal_id = f"del_attachment{email_attachment.id}"
return render(
request,
"notifications//batch_email_content_email_attachment_htmx.html",
{"club": club, "batch": batch, "email_attachments": email_attachments},
)
[docs]
@check_club_and_batch_access()
def compose_email_content_upload_new_email_attachment_htmx(request, club, batch):
"""Upload a new email attachment for a club
Use the HTMX hx-trigger response header to tell the browser about it
"""
form = EmailAttachmentForm(request.POST, request.FILES)
if form.is_valid():
email_attachment = form.save(commit=False)
email_attachment.organisation = club
email_attachment.save()
trigger = f"""{{"post_attachment_add":{{"id": "{email_attachment.id}" , "name": "{email_attachment.filename()}"}}}}"""
return _email_attachment_list_htmx(
request, club, batch, hx_trigger_response=trigger
)
return HttpResponse("Error")
def _email_attachment_list_htmx(request, club, batch, hx_trigger_response=None):
"""Shows just the list of attachments, called if we delete or add an attachment"""
email_attachments = EmailAttachment.objects.filter(organisation=club).order_by(
"-pk"
)[:50]
# Add hx_vars for the delete function
for email_attachment in email_attachments:
email_attachment.hx_vars = (
f"club_id:{club.id},email_attachment_id:{email_attachment.id}"
)
email_attachment.modal_id = f"del_attachment{email_attachment.id}"
# For delete we need to trigger a response in the browser to remove this from the list (if present)
# We use the hx_trigger response header for this
response = render(
request,
"notifications/batch_email_content_email_attachments_list_htmx.html",
{"club": club, "batch": batch, "email_attachments": email_attachments},
)
if hx_trigger_response:
response["HX-Trigger"] = hx_trigger_response
return response
[docs]
@check_club_and_batch_access()
def compose_email_content_include_attachment_htmx(request, club, batch, attachment_id):
"""Include an attachment in the email
Save to the model and return the list of included attachments"""
attachment = get_object_or_404(EmailAttachment, pk=attachment_id)
existing = BatchAttachment.objects.filter(
batch=batch, attachment=attachment
).first()
if not existing:
batch_attachment = BatchAttachment()
batch_attachment.batch = batch
batch_attachment.attachment = attachment
batch_attachment.save()
return _compose_email_content_included_attachments_htmx(request, club, batch)
[docs]
@check_club_and_batch_access()
def compose_email_content_remove_attachment_htmx(
request, club, batch, batch_attachment_id
):
"""Remove a batch attachment from the email
Update the model and return the list of included attachments"""
batch_attachment = get_object_or_404(BatchAttachment, pk=batch_attachment_id)
batch_attachment.delete()
return _compose_email_content_included_attachments_htmx(request, club, batch)
[docs]
@check_club_and_batch_access()
def compose_email_content_included_attachments_htmx(request, club, batch):
"""Return the list of included attachments (ie batch attachments)"""
return _compose_email_content_included_attachments_htmx(request, club, batch)
def _compose_email_content_included_attachments_htmx(request, club, batch):
"""Return the list of included attachments (ie batch attachments)"""
batch_attachments = BatchAttachment.objects.filter(batch=batch)
return render(
request,
"notifications/batch_email_content_included_attachments_htmx.html",
{
"batch": batch,
"club": club,
"batch_attachments": batch_attachments,
},
)
def _finalise_email_batch(batch, batch_size=None):
"""Clean-up processing once a batch has been sent"""
if batch_size is not None:
batch.batch_size = batch_size
batch.created = timezone.now()
batch.state = BatchID.BATCH_STATE_COMPLETE
batch.save()
if hasattr(batch, "batchcontent"):
BatchContent.objects.filter(batch=batch).delete()
Recipient.objects.filter(batch=batch).delete()
BatchAttachment.objects.filter(batch=batch).delete()
[docs]
@check_club_and_batch_access()
def delete_email_batch(request, club, batch):
"""Delete an incomplete batch"""
batch.delete()
return redirect(
"organisations:club_menu_tab_entry_point", batch.organisation.id, "comms"
)
[docs]
def batch_queue_progress_htmx(request, batch_id_id):
"""Return an HTML fragment with the batches queuing progress"""
def _final_response(msg, refresh=True):
response = HttpResponse(msg, status=286)
response["HX-Refresh"] = "true"
return response
if not request.user.is_authenticated:
return redirect("/")
batch = BatchID.objects.filter(pk=batch_id_id).first()
if not batch:
# batch has been deleted by someone?
return _final_response("Deleted")
if batch.state != BatchID.BATCH_STATE_IN_FLIGHT:
# batch is no longer in flight
return _final_response("All queued")
if batch.batch_size == 0:
return HttpResponse("Unknown", status=286)
queued = (
Snooper.objects.select_related("post_office_email")
.filter(batch_id=batch)
.count()
)
if queued == batch.batch_size:
return _final_response("All queued")
else:
return HttpResponse(f"{queued / batch.batch_size:.0%} queued")
[docs]
def get_emails_sent_to_address(email_address, club, viewing_user, slice=20):
"""
Return a list of Post Office Email objects sent to the specified email address.
Only emails relevant to the club are returned, and only those that the viewing
user has access rights to read.
Returns the most recent <slice> emails, or None
"""
if not email_address:
return []
if rbac_user_has_role(
viewing_user, "notifications.admin.view"
) or rbac_user_has_role(viewing_user, "orgs.admin.edit"):
# user has global access so return all recent emails
post_office_emails = PostOfficeEmail.objects.filter(
to=[email_address]
).order_by("-pk")[:slice]
else:
# check relevant user access
comms_access = rbac_user_has_role(
viewing_user, f"notifications.orgcomms.{club.id}.edit"
)
congress_access = rbac_user_has_role(viewing_user, f"events.org.{club.id}.edit")
if not congress_access:
congress_access = rbac_user_has_role(
viewing_user, f"events.org.{club.id}.view"
)
if comms_access or congress_access:
# build a list of permitted batch types to view for this user
if comms_access:
permitted_batch_types = [
BatchID.BATCH_TYPE_ADMIN,
BatchID.BATCH_TYPE_COMMS,
BatchID.BATCH_TYPE_RESULTS,
]
else:
permitted_batch_types = []
if congress_access:
permitted_batch_types += [
BatchID.BATCH_TYPE_CONGRESS,
BatchID.BATCH_TYPE_EVENT,
BatchID.BATCH_TYPE_MULTI,
BatchID.BATCH_TYPE_ENTRY,
]
# Query PostOfficeEmail objects through the reverse relation from Snooper
post_office_emails = PostOfficeEmail.objects.filter(
snooper__batch_id__batch_type__in=permitted_batch_types,
snooper__batch_id__organisation=club,
to=[email_address],
).order_by("-pk")[:slice]
# JPG Query - should this really be testing for the role in EmailBatchRBAC?
# I think it gives the same result and is more efficient this way, but
# is perhaps building in a hidden dependency between RBAC roels and batch types
# The RBAC role is checked if the user tries to access the email.
else:
# No releavnt access so return nothing
post_office_emails = None
return post_office_emails