from maubot import Plugin, MessageEvent from maubot.handlers import command # Needed for configuration from typing import Type from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper import aiohttp # Ensures a running instance gets an updated config from the Maubot interface class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("command_prefix") helper.copy("allowed_regions") helper.copy("region_configs") helper.copy("send_reaction") class ConsumerNTFY(Plugin): # Get configuration at startup async def start(self) -> None: self.config.load_and_update() # Get config @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config # Get !command_name setting from config to register it def get_command_name(self) -> str: return self.config["command_prefix"] # Checks if a sender if allowed to send for a particular region def validate_sender(self, region: str, sender: str): # Mautrix isn't documented, like at all, so I'm just gonna catch the # error because IDK how to see if a map is inside a map. try: allowed_list = self.config["allowed_regions"][region] except: return False if len(allowed_list) == 0: return True if sender in allowed_list: return True # Sender not allowed in region config return False # Does the necessary config checks for the given event # Returns list of recursive configs to process def validate_report(self, evt: MessageEvent, message: str): # Split command (minus !command_name) into tokens tokens = message.split() region = tokens[0].lower() # Each command must have a state/territory designation and a message if len(tokens) < 2: return [] # And we must have self.config["region_configs"] try: trashvariable = self.config["allowed_regions"][region] except: return [] configs = [] # To be returned allowed_globally = self.validate_sender("__global__", evt.sender) allowed_region = self.validate_sender(region, evt.sender) # If user is allowed globally and/or for a region, # the plugin should process both the region and __global__ configs if allowed_globally or allowed_region: for i in ["__global__", region]: try: configs.append(self.config["region_configs"][i]) except: trashvariable = None return configs # What gets called when !command_name message is sent @command.new(name=get_command_name, help="Report Something") @command.argument("message", pass_raw=True) async def report(self, evt: MessageEvent, message: str) -> None: # If all have passed ntfy_posts_passed = None # Iterate through each endpoint that the message should be pushed to # (if any) for region_config in self.validate_report(evt, message): # Detect no regions in reaction if ntfy_posts_passed is None: ntfy_posts_passed = True self.log.debug(region_config) # Create notification text split_message = message.split() text = "[" + split_message[0] + "] " + ' '.join(message.split()[1:]) # Build URL url = region_config["server_url"] + "/" + region_config["server_topic"] # Consider authentication auth = None if "username" in region_config and "password" in region_config: auth = aiohttp.BasicAuth(region_config["username"], region_config["password"]) # Send notification async with self.http.post(url, data=text, auth=auth) as response: if not response.status == 200: ntfy_posts_passed = False # Send reaction based on successful or failedPOSTs if self.config["send_reaction"]: if ntfy_posts_passed is True: await evt.react("👍") elif ntfy_posts_passed is False: await evt.react("👎") # That's all, folks