from maubot import Plugin, MessageEvent from maubot.handlers import command # Needed for configuration from typing import Type from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper import aiohttp # Ensures a running instance gets an updated config from the Maubot interface class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("command_prefix") helper.copy("allowed_reigons") helper.copy("reigon_configs") helper.copy("send_reaction") class ConsumerNTFY(Plugin): # Get configuration at startup async def start(self) -> None: self.config.load_and_update() # Get config @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config # Get !command_name setting from config to register it def get_command_name(self) -> str: return self.config["command_prefix"] # Checks if a sender if allowed to send for a particular reigon def validateSender(self, reigon: str, sender: str): # Check that config value exists if not self.config["allowed_reigons"]: return False # Check that reigon is allowed if not self.config["allowed_reigons"][reigon]: return False # All senders allowed for this reigon if len(self.config["allowed_reigons"][reigon]) == 0: return True # Check that sender is allowed for reigon if not sender in self.config["allowed_reigons"][reigon]: return False return True # Does the necesary config checks for the given event # Returns list of reigons to process (strings) # Currently just the specified reigon and "__global__" def validateReport(self, evt: MessageEvent, message: str): # Split command (minus !command_name) into tokens tokens = message.split() reigon = tokens[0].lower() # Each command must have a state/territory designation and a message if len(tokens) < 2: return None self.log.debug(reigon) # This is a list of reigons to process for this specific message # This is only used to consider __global__ reigons_to_process = [] if (self.validateSender("__global__", evt.sender)): reigons_to_process.append("__global__") if (self.validateSender(reigon, evt.sender)): reigons_to_process.append(reigon) return reigons_to_process # What gets called when !command_name message is sent @command.new(name=get_command_name, help="Report Something") @command.argument("message", pass_raw=True) async def report(self, evt: MessageEvent, message: str) -> None: # If all have passed ntfy_posts_passed = None # Iterate through each endpoint that the message should be pushed to # (if any) for reigon in self.validateReport(evt, message): # Detect no reigons in reaction if ntfy_posts_passed is None: ntfy_posts_passed = True # Grab reigon-specific conrfig reigon_config = self.config["reigon_configs"][reigon] # Create notification text split_message = message.split() text = "[" + split_message[0] + "] " + ' '.join(message.split()[1:]) # Build URL url = reigon_config["server_url"] + "/" + reigon_config["server_topic"] # Consider authentication authentication = None if reigon_config["server_use_authentication"]: authentication = aiohttp.BasicAuth(reigon_config["server_username"], reigon_config["server_password"]) # Send notification async with self.http.post(url, data=text, auth=authentication) as response: if not response.status == 200: ntfy_posts_passed = False # Send reaction based on successful or failedPOSTs if self.config["send_reaction"]: if ntfy_posts_passed is True: await evt.react("👍") elif ntfy_posts_passed is False: await evt.react("👎") # That's all, folks