from maubot import Plugin, MessageEvent from maubot.handlers import command # Needed for configuration from typing import Type from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper import aiohttp # Ensures a running instance gets an updated config from the Maubot interface class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("command_prefix") helper.copy("allowed_regions") helper.copy("region_configs") helper.copy("send_reaction") class ConsumerNTFY(Plugin): # Get configuration at startup async def start(self) -> None: self.config.load_and_update() # Get config @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config # Get !command_name setting from config to register it def get_command_name(self) -> str: return self.config["command_prefix"] # Checks if a sender if allowed to send for a particular region def validateSender(self, region: str, sender: str): # Check that config value exists if not self.config["allowed_regions"]: return False # Check that region is allowed if not self.config["allowed_regions"][region]: return False # All senders allowed for this region if len(self.config["allowed_regions"][region]) == 0: return True # Check that sender is allowed for region if not sender in self.config["allowed_regions"][region]: return False return True # Does the necesary config checks for the given event # Returns list of regions to process (strings) # Currently just the specified region and "__global__" def validateReport(self, evt: MessageEvent, message: str): # Split command (minus !command_name) into tokens tokens = message.split() region = tokens[0].lower() # Each command must have a state/territory designation and a message if len(tokens) < 2: return None self.log.debug(region) # This is a list of regions to process for this specific message # This is only used to consider __global__ regions_to_process = [] if (self.validateSender("__global__", evt.sender)): regions_to_process.append("__global__") if (self.validateSender(region, evt.sender)): regions_to_process.append(region) return regions_to_process # What gets called when !command_name message is sent @command.new(name=get_command_name, help="Report Something") @command.argument("message", pass_raw=True) async def report(self, evt: MessageEvent, message: str) -> None: # If all have passed ntfy_posts_passed = None # Iterate through each endpoint that the message should be pushed to # (if any) for region in self.validateReport(evt, message): # Detect no regions in reaction if ntfy_posts_passed is None: ntfy_posts_passed = True # Grab region-specific conrfig region_config = self.config["region_configs"][region] # Create notification text split_message = message.split() text = "[" + split_message[0] + "] " + ' '.join(message.split()[1:]) # Build URL url = region_config["server_url"] + "/" + region_config["server_topic"] # Consider authentication authentication = None if region_config["server_use_authentication"]: authentication = aiohttp.BasicAuth(region_config["server_username"], region_config["server_password"]) # Send notification async with self.http.post(url, data=text, auth=authentication) as response: if not response.status == 200: ntfy_posts_passed = False # Send reaction based on successful or failedPOSTs if self.config["send_reaction"]: if ntfy_posts_passed is True: await evt.react("👍") elif ntfy_posts_passed is False: await evt.react("👎") # That's all, folks