Blacken and isort code

This commit is contained in:
Tulir Asokan 2022-03-25 14:22:37 +02:00
parent 6257979e7c
commit 068e268c63
97 changed files with 1781 additions and 1086 deletions

View file

@ -1,5 +1,5 @@
# maubot - A plugin-based Matrix bot system.
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2022 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -13,29 +13,46 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import (Union, Callable, Sequence, Pattern, Awaitable, NewType, Optional, Any, List,
Dict, Tuple, Set, Iterable)
from typing import (
Any,
Awaitable,
Callable,
Dict,
Iterable,
List,
NewType,
Optional,
Pattern,
Sequence,
Set,
Tuple,
Union,
)
from abc import ABC, abstractmethod
import asyncio
import functools
import inspect
import re
from mautrix.types import MessageType, EventType
from mautrix.types import EventType, MessageType
from ..matrix import MaubotMessageEvent
from . import event
PrefixType = Optional[Union[str, Callable[[], str], Callable[[Any], str]]]
AliasesType = Union[List[str], Tuple[str, ...], Set[str], Callable[[str], bool],
Callable[[Any, str], bool]]
CommandHandlerFunc = NewType("CommandHandlerFunc",
Callable[[MaubotMessageEvent, Any], Awaitable[Any]])
CommandHandlerDecorator = NewType("CommandHandlerDecorator",
Callable[[Union['CommandHandler', CommandHandlerFunc]],
'CommandHandler'])
PassiveCommandHandlerDecorator = NewType("PassiveCommandHandlerDecorator",
Callable[[CommandHandlerFunc], CommandHandlerFunc])
AliasesType = Union[
List[str], Tuple[str, ...], Set[str], Callable[[str], bool], Callable[[Any, str], bool]
]
CommandHandlerFunc = NewType(
"CommandHandlerFunc", Callable[[MaubotMessageEvent, Any], Awaitable[Any]]
)
CommandHandlerDecorator = NewType(
"CommandHandlerDecorator",
Callable[[Union["CommandHandler", CommandHandlerFunc]], "CommandHandler"],
)
PassiveCommandHandlerDecorator = NewType(
"PassiveCommandHandlerDecorator", Callable[[CommandHandlerFunc], CommandHandlerFunc]
)
def _split_in_two(val: str, split_by: str) -> List[str]:
@ -67,15 +84,26 @@ class CommandHandler:
return self.__bound_copies__[instance]
except KeyError:
new_ch = type(self)(self.__mb_func__)
keys = ["parent", "subcommands", "arguments", "help", "get_name", "is_command_match",
"require_subcommand", "arg_fallthrough", "event_handler", "event_type",
"msgtypes"]
keys = [
"parent",
"subcommands",
"arguments",
"help",
"get_name",
"is_command_match",
"require_subcommand",
"arg_fallthrough",
"event_handler",
"event_type",
"msgtypes",
]
for key in keys:
key = f"__mb_{key}__"
setattr(new_ch, key, getattr(self, key))
new_ch.__bound_instance__ = instance
new_ch.__mb_subcommands__ = [subcmd.__get__(instance, instancetype)
for subcmd in self.__mb_subcommands__]
new_ch.__mb_subcommands__ = [
subcmd.__get__(instance, instancetype) for subcmd in self.__mb_subcommands__
]
self.__bound_copies__[instance] = new_ch
return new_ch
@ -83,8 +111,13 @@ class CommandHandler:
def __command_match_unset(self, val: str) -> bool:
raise NotImplementedError("Hmm")
async def __call__(self, evt: MaubotMessageEvent, *, _existing_args: Dict[str, Any] = None,
remaining_val: str = None) -> Any:
async def __call__(
self,
evt: MaubotMessageEvent,
*,
_existing_args: Dict[str, Any] = None,
remaining_val: str = None,
) -> Any:
if evt.sender == evt.client.mxid or evt.content.msgtype not in self.__mb_msgtypes__:
return
if remaining_val is None:
@ -120,21 +153,25 @@ class CommandHandler:
return await self.__mb_func__(self.__bound_instance__, evt, **call_args)
return await self.__mb_func__(evt, **call_args)
async def __call_subcommand__(self, evt: MaubotMessageEvent, call_args: Dict[str, Any],
remaining_val: str) -> Tuple[bool, Any]:
async def __call_subcommand__(
self, evt: MaubotMessageEvent, call_args: Dict[str, Any], remaining_val: str
) -> Tuple[bool, Any]:
command, remaining_val = _split_in_two(remaining_val.strip(), " ")
for subcommand in self.__mb_subcommands__:
if subcommand.__mb_is_command_match__(subcommand.__bound_instance__, command):
return True, await subcommand(evt, _existing_args=call_args,
remaining_val=remaining_val)
return True, await subcommand(
evt, _existing_args=call_args, remaining_val=remaining_val
)
return False, None
async def __parse_args__(self, evt: MaubotMessageEvent, call_args: Dict[str, Any],
remaining_val: str) -> Tuple[bool, str]:
async def __parse_args__(
self, evt: MaubotMessageEvent, call_args: Dict[str, Any], remaining_val: str
) -> Tuple[bool, str]:
for arg in self.__mb_arguments__:
try:
remaining_val, call_args[arg.name] = arg.match(remaining_val.strip(), evt=evt,
instance=self.__bound_instance__)
remaining_val, call_args[arg.name] = arg.match(
remaining_val.strip(), evt=evt, instance=self.__bound_instance__
)
if arg.required and call_args[arg.name] is None:
raise ValueError("Argument required")
except ArgumentSyntaxError as e:
@ -155,8 +192,9 @@ class CommandHandler:
@property
def __mb_usage_args__(self) -> str:
arg_usage = " ".join(f"<{arg.label}>" if arg.required else f"[{arg.label}]"
for arg in self.__mb_arguments__)
arg_usage = " ".join(
f"<{arg.label}>" if arg.required else f"[{arg.label}]" for arg in self.__mb_arguments__
)
if self.__mb_subcommands__ and self.__mb_arg_fallthrough__:
arg_usage += " " + self.__mb_usage_subcommand__
return arg_usage
@ -172,15 +210,19 @@ class CommandHandler:
@property
def __mb_prefix__(self) -> str:
if self.__mb_parent__:
return (f"!{self.__mb_parent__.__mb_get_name__(self.__bound_instance__)} "
f"{self.__mb_name__}")
return (
f"!{self.__mb_parent__.__mb_get_name__(self.__bound_instance__)} "
f"{self.__mb_name__}"
)
return f"!{self.__mb_name__}"
@property
def __mb_usage_inline__(self) -> str:
if not self.__mb_arg_fallthrough__:
return (f"* {self.__mb_name__} {self.__mb_usage_args__} - {self.__mb_help__}\n"
f"* {self.__mb_name__} {self.__mb_usage_subcommand__}")
return (
f"* {self.__mb_name__} {self.__mb_usage_args__} - {self.__mb_help__}\n"
f"* {self.__mb_name__} {self.__mb_usage_subcommand__}"
)
return f"* {self.__mb_name__} {self.__mb_usage_args__} - {self.__mb_help__}"
@property
@ -192,8 +234,10 @@ class CommandHandler:
if not self.__mb_arg_fallthrough__:
if not self.__mb_arguments__:
return f"**Usage:** {self.__mb_prefix__} [subcommand] [...]"
return (f"**Usage:** {self.__mb_prefix__} {self.__mb_usage_args__}"
f" _OR_ {self.__mb_prefix__} {self.__mb_usage_subcommand__}")
return (
f"**Usage:** {self.__mb_prefix__} {self.__mb_usage_args__}"
f" _OR_ {self.__mb_prefix__} {self.__mb_usage_subcommand__}"
)
return f"**Usage:** {self.__mb_prefix__} {self.__mb_usage_args__}"
@property
@ -202,14 +246,25 @@ class CommandHandler:
return f"{self.__mb_usage_without_subcommands__} \n{self.__mb_subcommands_list__}"
return self.__mb_usage_without_subcommands__
def subcommand(self, name: PrefixType = None, *, help: str = None, aliases: AliasesType = None,
required_subcommand: bool = True, arg_fallthrough: bool = True,
) -> CommandHandlerDecorator:
def subcommand(
self,
name: PrefixType = None,
*,
help: str = None,
aliases: AliasesType = None,
required_subcommand: bool = True,
arg_fallthrough: bool = True,
) -> CommandHandlerDecorator:
def decorator(func: Union[CommandHandler, CommandHandlerFunc]) -> CommandHandler:
if not isinstance(func, CommandHandler):
func = CommandHandler(func)
new(name, help=help, aliases=aliases, require_subcommand=required_subcommand,
arg_fallthrough=arg_fallthrough)(func)
new(
name,
help=help,
aliases=aliases,
require_subcommand=required_subcommand,
arg_fallthrough=arg_fallthrough,
)(func)
func.__mb_parent__ = self
func.__mb_event_handler__ = False
self.__mb_subcommands__.append(func)
@ -218,10 +273,17 @@ class CommandHandler:
return decorator
def new(name: PrefixType = None, *, help: str = None, aliases: AliasesType = None,
event_type: EventType = EventType.ROOM_MESSAGE, msgtypes: Iterable[MessageType] = None,
require_subcommand: bool = True, arg_fallthrough: bool = True,
must_consume_args: bool = True) -> CommandHandlerDecorator:
def new(
name: PrefixType = None,
*,
help: str = None,
aliases: AliasesType = None,
event_type: EventType = EventType.ROOM_MESSAGE,
msgtypes: Iterable[MessageType] = None,
require_subcommand: bool = True,
arg_fallthrough: bool = True,
must_consume_args: bool = True,
) -> CommandHandlerDecorator:
def decorator(func: Union[CommandHandler, CommandHandlerFunc]) -> CommandHandler:
if not isinstance(func, CommandHandler):
func = CommandHandler(func)
@ -242,8 +304,9 @@ def new(name: PrefixType = None, *, help: str = None, aliases: AliasesType = Non
else:
func.__mb_is_command_match__ = aliases
elif isinstance(aliases, (list, set, tuple)):
func.__mb_is_command_match__ = lambda self, val: (val == func.__mb_get_name__(self)
or val in aliases)
func.__mb_is_command_match__ = lambda self, val: (
val == func.__mb_get_name__(self) or val in aliases
)
else:
func.__mb_is_command_match__ = lambda self, val: val == func.__mb_get_name__(self)
# Decorators are executed last to first, so we reverse the argument list.
@ -267,8 +330,9 @@ class ArgumentSyntaxError(ValueError):
class Argument(ABC):
def __init__(self, name: str, label: str = None, *, required: bool = False,
pass_raw: bool = False) -> None:
def __init__(
self, name: str, label: str = None, *, required: bool = False, pass_raw: bool = False
) -> None:
self.name = name
self.label = label or name
self.required = required
@ -286,8 +350,15 @@ class Argument(ABC):
class RegexArgument(Argument):
def __init__(self, name: str, label: str = None, *, required: bool = False,
pass_raw: bool = False, matches: str = None) -> None:
def __init__(
self,
name: str,
label: str = None,
*,
required: bool = False,
pass_raw: bool = False,
matches: str = None,
) -> None:
super().__init__(name, label, required=required, pass_raw=pass_raw)
matches = f"^{matches}" if self.pass_raw else f"^{matches}$"
self.regex = re.compile(matches)
@ -298,14 +369,23 @@ class RegexArgument(Argument):
val = re.split(r"\s", val, 1)[0]
match = self.regex.match(val)
if match:
return (orig_val[:match.start()] + orig_val[match.end():],
match.groups() or val[match.start():match.end()])
return (
orig_val[: match.start()] + orig_val[match.end() :],
match.groups() or val[match.start() : match.end()],
)
return orig_val, None
class CustomArgument(Argument):
def __init__(self, name: str, label: str = None, *, required: bool = False,
pass_raw: bool = False, matcher: Callable[[str], Any]) -> None:
def __init__(
self,
name: str,
label: str = None,
*,
required: bool = False,
pass_raw: bool = False,
matcher: Callable[[str], Any],
) -> None:
super().__init__(name, label, required=required, pass_raw=pass_raw)
self.matcher = matcher
@ -316,7 +396,7 @@ class CustomArgument(Argument):
val = re.split(r"\s", val, 1)[0]
res = self.matcher(val)
if res is not None:
return orig_val[len(val):], res
return orig_val[len(val) :], res
return orig_val, None
@ -325,12 +405,18 @@ class SimpleArgument(Argument):
if self.pass_raw:
return "", val
res = re.split(r"\s", val, 1)[0]
return val[len(res):], res
return val[len(res) :], res
def argument(name: str, label: str = None, *, required: bool = True, matches: Optional[str] = None,
parser: Optional[Callable[[str], Any]] = None, pass_raw: bool = False
) -> CommandHandlerDecorator:
def argument(
name: str,
label: str = None,
*,
required: bool = True,
matches: Optional[str] = None,
parser: Optional[Callable[[str], Any]] = None,
pass_raw: bool = False,
) -> CommandHandlerDecorator:
if matches:
return RegexArgument(name, label, required=required, matches=matches, pass_raw=pass_raw)
elif parser:
@ -339,11 +425,17 @@ def argument(name: str, label: str = None, *, required: bool = True, matches: Op
return SimpleArgument(name, label, required=required, pass_raw=pass_raw)
def passive(regex: Union[str, Pattern], *, msgtypes: Sequence[MessageType] = (MessageType.TEXT,),
field: Callable[[MaubotMessageEvent], str] = lambda evt: evt.content.body,
event_type: EventType = EventType.ROOM_MESSAGE, multiple: bool = False,
case_insensitive: bool = False, multiline: bool = False, dot_all: bool = False
) -> PassiveCommandHandlerDecorator:
def passive(
regex: Union[str, Pattern],
*,
msgtypes: Sequence[MessageType] = (MessageType.TEXT,),
field: Callable[[MaubotMessageEvent], str] = lambda evt: evt.content.body,
event_type: EventType = EventType.ROOM_MESSAGE,
multiple: bool = False,
case_insensitive: bool = False,
multiline: bool = False,
dot_all: bool = False,
) -> PassiveCommandHandlerDecorator:
if not isinstance(regex, Pattern):
flags = re.RegexFlag.UNICODE
if case_insensitive:
@ -372,12 +464,14 @@ def passive(regex: Union[str, Pattern], *, msgtypes: Sequence[MessageType] = (Me
return
data = field(evt)
if multiple:
val = [(data[match.pos:match.endpos], *match.groups())
for match in regex.finditer(data)]
val = [
(data[match.pos : match.endpos], *match.groups())
for match in regex.finditer(data)
]
else:
match = regex.search(data)
if match:
val = (data[match.pos:match.endpos], *match.groups())
val = (data[match.pos : match.endpos], *match.groups())
else:
val = None
if val: