from maubot import Plugin, MessageEvent from maubot.handlers import command # Needed for configuration from typing import Type from mautrix.util.config import BaseProxyConfig, ConfigUpdateHelper import aiohttp import subprocess # Ensures a running instance gets an updated config from the Maubot interface class Config(BaseProxyConfig): def do_update(self, helper: ConfigUpdateHelper) -> None: helper.copy("command_prefix") helper.copy("allowed_regions") helper.copy("region_configs") helper.copy("send_reaction") class ConsumerSignal(Plugin): # Get configuration at startup async def start(self) -> None: self.config.load_and_update() # Get config @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: return Config # Get !command_name setting from config to register it def get_command_name(self) -> str: return self.config["command_prefix"] # Checks if a sender if allowed to send for a particular region def validate_sender(self, region: str, sender: str): # Mautrix isn't documented, like at all, so I'm just gonna catch the # error because IDK how to see if a map is inside a map. try: allowed_list = self.config["allowed_regions"][region] except: return False if allowed_list is None: return True # Empty list if sender in allowed_list: return True # Sender not allowed in region config return False # Does the necessary config checks for the given event # Returns list of recursive configs to process def validate_report(self, evt: MessageEvent, message: str): # Split command (minus !command_name) into tokens tokens = message.split() region = tokens[0].lower() # Each command must have a state/territory designation and a message if len(tokens) < 2: return [] # And we must have self.config["region_configs"] try: trashvariable = self.config["region_configs"] except: return [] configs = [] # To be returned allowed_globally = self.validate_sender("__global__", evt.sender) allowed_region = self.validate_sender(region, evt.sender) # If user is allowed globally and/or for a region, # the plugin should process both the region and __global__ configs if allowed_globally or allowed_region: for i in ["__global__", region]: try: configs.append(self.config["region_configs"][i]) except: trashvariable = None return configs # What gets called when !command_name message is sent @command.new(name=get_command_name, help="Report Something") @command.argument("message", pass_raw=True) async def report(self, evt: MessageEvent, message: str) -> None: # If all have passed signal_posts_passed = None # Iterate through each endpoint that the message should be pushed to # (if any) for region_config in self.validate_report(evt, message): if signal_posts_passed is None: signal_posts_passed = True url = region_config["endpoint"] bearer = region_config["bearer"] group_id = region_config["group_id"] groups = [group_id] self.log.debug(url) self.log.debug(bearer) self.log.debug(group_id) split_message = message.split() body = "[" + split_message[0] + "] " + ' '.join(split_message[1:]) json = { "jsonrpc": "2.0", "method": "send", "params": { "groupId": groups, "message": body } } headers = { "Authentication": bearer, "Content-Type": "application/json", "Accept": "application/json", } async with self.http.get(url, json=json, headers=headers) as response: self.log.error(response.status) if not response.status == 200: signal_posts_passed = False # Send reaction based on successful or failedPOSTs if self.config["send_reaction"]: if signal_posts_passed is True: await evt.react("👍") elif signal_posts_passed is False: await evt.react("👎") # That's all, folks