以OneBot V11为例,看看Matcher是怎么运行的
定义一个简单插件
hello = on_startswith("hello") @hello.handle() async def _(bot:Bot, event:PrivateMessageEvent): hello.finish("hello")
- 根据之前的内容,我们知道hello是一个Matcher子类
它的type为message,并且有一个startswith的Rule - 之后hello注册了一个handler,handler会转化为Dependent handler
由于OneBot V11的Bot、PrivateMessageEvent分别继承了基类Bot、Event,能够通过BotParam和EventParam的_check_perm
因此Dependent handler的params包括两个ModelField,field_info分别为BotParam和EventParam
由于不符合基类写法,这种handler只能由OneBot V11触发
Dependent handler的parameterless为空
那么Matcher和handler是怎么触发的呢?
对于Onebot V11,消息入口为nonebot.adapters.onebot.v11.bot.Bot.handle_event
本文只讨论handle_event,即event的处理,下一节讨论call_api,即消息的回复
class Bot(BaseBot): async def handle_event(self, event: Event) -> None: if isinstance(event, MessageEvent): event.message.reduce() await _check_reply(self, event) _check_at_me(self, event) _check_nickname(self, event) await handle_event(self, event)
由于本章不讨论适配,因此只需要看handle_event函数,这是NoneBot的函数,位于nonebot.message
async def handle_event(bot: "Bot", event: "Event") -> None: show_log = True log_msg = f"{escape_tag(bot.type)} {escape_tag(bot.self_id)} | " try: log_msg += event.get_log_string() except NoLogException: show_log = False if show_log: logger.opt(colors=True).success(log_msg) # 日志打印,没什么好说的 state: Dict[Any, Any] = {} # 用于保存一些状态,state在bot收到消息后生成,在消息发出后销毁,期间可以对其进行操作传递数据 dependency_cache: T_DependencyCache = {} ``` dependency_cache保存了各种**可调用函数**,可调用函数来源于Param或普通函数(Depends的参数) Param或普通函数的调用可能有许多次类型转换,将最后转换成的可调用对象存起来,方便再次调用 类型为Dict[_DependentCallable[Any], Task[Any]] ``` async with AsyncExitStack() as stack: if coros := [ run_coro_with_catch( proc( bot=bot, event=event, state=state, stack=stack, dependency_cache=dependency_cache, ), (SkippedException,), ) for proc in _event_preprocessors ]: try: if show_log: logger.debug("Running PreProcessors...") await asyncio.gather(*coros) except IgnoredException as e: logger.opt(colors=True).info( f"Event {escape_tag(event.get_event_name())} is ignored" ) return except Exception as e: logger.opt(colors=True, exception=e).error( "" ) return try: TrieRule.get_value(bot, event, state) except Exception as e: logger.opt(colors=True, exception=e).warning( "Error while parsing command for event" ) break_flag = False for priority in sorted(matchers.keys()): if break_flag: break if show_log: logger.debug(f"Checking for matchers in priority {priority}...") pending_tasks = [ _check_matcher( priority, matcher, bot, event, state.copy(), stack, dependency_cache ) for matcher in matchers[priority] ] results = await asyncio.gather(*pending_tasks, return_exceptions=True) for result in results: if not isinstance(result, Exception): continue if isinstance(result, StopPropagation): break_flag = True logger.debug("Stop event propagation") else: logger.opt(colors=True, exception=result).error( " Error when running EventPreProcessors. " "Event ignored! " ) if coros := [ run_coro_with_catch( proc( bot=bot, event=event, state=state, stack=stack, dependency_cache=dependency_cache, ), (SkippedException,), ) for proc in _event_postprocessors ]: try: if show_log: logger.debug("Running PostProcessors...") await asyncio.gather(*coros) except Exception as e: logger.opt(colors=True, exception=e).error( " Error when checking Matcher. " ) Error when running EventPostProcessors
除去日志打印,分为四部分,下面一一介绍
event_preprocess
事件预处理最先触发,通过nonebot.message.event_preprocessor注册事件预处理器,使用IgnoredException可以终止事件传递
事件预处理器是Dependent类型,具有依赖注入功能,允许的Param有DependParam、BotParam、EventParam、StateParam、DefaultParam
例如
@event_preprocessor
async def _(bot) # 还记得这种写法的意思吗?任何adapter都能触发
if bot.type == "OneBot V11" # 跳过OneBot V11适配器的所有事件
raise IgnoredException
@event_preprocessor
async def _(event:v11.PrivateMessageEvent) # 由于Event不是基类,该event_preprocessor只能由OneBot V11适配器触发
if event.sender.user_id == 1000 # 忽视qq号为1000的用户的私聊
raise IgnoredException
下面分析event_preprocessor的运行
if coros := [ run_coro_with_catch( # 运行协程并且忽视特定报错(这里是SkippedException) proc( # proc是Dependent,调用Dependent的__call__ bot=bot, event=event, state=state, stack=stack, dependency_cache=dependency_cache, # ), (SkippedException,), # 忽视协程的SkippedException,意味着你可以使用nonebot.exception.SkippedException提前结束event_preprocessors但不结束事件传递 ) for proc in _event_preprocessors ]: try: if show_log: logger.debug("Running PreProcessors...") await asyncio.gather(*coros) # 同时运行所有协程 except IgnoredException as e: # 如果event_preprocessor抛出IgnoredException就终止事件的传递 logger.opt(colors=True).info( f"Event {escape_tag(event.get_event_name())} is ignored" ) return except Exception as e: # 异常捕获 logger.opt(colors=True, exception=e).error( "" ) return Error when running EventPreProcessors. " "Event ignored!
代码都很简单,下面我们来看Dependent是如何调用的
补充:Dependent的调用
@dataclass(frozen=True) class Dependent(Generic[R]): async def __call__(self, **kwargs: Any) -> R: ``` params可能包括bot、event、state、stack、matcher、dependency_cache,具体看是在哪调用的 ``` await self.check(**kwargs) # 检查 values = await self.solve(**kwargs) # 从上下文获取变量 if is_coroutine_callable(self.call): # 调用call return await cast(Callable[..., Awaitable[R]], self.call)(**values) else: return await run_sync(cast(Callable[..., R], self.call))(**values)
Dependent.check
@dataclass(frozen=True) class Dependent(Generic[R]): async def check(self, **params: Any) -> None: try: await asyncio.gather( *(param._check(**params) for param in self.parameterless) ) # 先检查parameterless,由于parameterless的元素就是Param子类,可以直调用Param._check await asyncio.gather( *( cast(Param, param.field_info)._check(**params) # 等价于param.field_info._check(**params) for param in self.params ) ) # 再检查param,由于self.params元素是ModelField,需要先获取field_info,类型为Param子类 except SkippedException as e: 如果检查过程中出现SkippedException就抛出 logger.trace(f"{self} skipped due to {e}") raise
因此Dependent.check本质上是调用Param._check,之前我们已经讲过Param的实例化,现在讲Param的调用
StateParam、MatcherParam、ArgParam、ExceptionParam、DefaultParam没有_check,会调用Param基类,因此无事发生
class DependParam(Param): async def _check(self, **kwargs: Any) -> None: sub_dependent: Dependent = self.extra["dependent"] await sub_dependent.check(**kwargs) ``` 还记得sub_dependent是什么吗? 例如 async def user_id(event:PrviateMessageEvent): return event.sender.user_id @xxx.handle() async def _(user_id=Depends(user_id)): pass Depends会调用DependsInner类,将user_id作为DependsInner的对象属性self.dependency 之后DependParam._check_param时,会将self.dependency转化为Dependent作为变量dependent存到DependParam 由于DependParam是Param子类,Param是FieldInfo子类,因此可以通过self.extra["dependent"]获取DependParam实例化时的变量dependent 所以在这里sub_dependent就是函数user_id转化的Dependent 于是DependParam._check时,就是调用sub_dependent._check,这样就能够实现嵌套检查 ```
class BotParam(Param): async def _check(self, bot: "Bot", **kwargs: Any) -> None: if checker := self.extra.get("checker"): check_field_type(checker, bot) ``` self.extra.get("checker")就是_check_param时实例化BotParam时传入的checker,是ModelField类型 第一节已经介绍过BotParam的checker了,这里不再详细介绍 对于基类Bot,checker为None,对于协议Bot,只有相应Bot能通过check 比如如果同时接入了qq协议端和微信协议端,那么类型注释是qq Bot的handler无法被微信协议的消息触发,因为BotParam._check无法通过 ```
class EventParam(Param): async def _check(self, bot: "EventParam", **kwargs: Any) -> None: if checker := self.extra.get("checker"): check_field_type(checker, bot) ``` 和BotParam如出一辙 ```
Dependent.solve
def check_field_type(field: ModelField, value: V) -> V: # 验证value是不是ModelField,是的话返回value _, errs_ = field.validate(value, {}, loc=()) if errs_: raise TypeMisMatch(field, value) return value @dataclass(frozen=True) class Dependent(Generic[R]): async def _solve_field(self, field: ModelField, params: Dict[str, Any]) -> Any: value = await cast(Param, field.field_info)._solve(**params) if value is Undefined: ``` 如果Param._solve的结果是Undefined,就获取field的默认值 还记得之前说过只有DefautParam的ModelField有默认值吗这个判断就是为DefautParam设计的 ``` value = field.get_default() return check_field_type(field, value) # 检查类型 async def solve(self, **params: Any) -> Dict[str, Any]: for param in self.parameterless: # 由于parameterless的元素是Param子类,可以直接调用_solve await param._solve(**params) values = await asyncio.gather( *(self._solve_field(field, params) for field in self.params) # params的元素是ModelField,需要先取出field_info再调用_solve ) return {field.name: value for field, value in zip(self.params, values)} ``` 可以看出,对于parameterless参数,运行时只是调用它的_solve 也就是说parameterless参数的返回值不重要,而在于进行一些操作,例如修改上下文 对于params,将_solve的结果与参数名构成字典 ```
下面来看各种Param._solve
class DependParam(Param): async def _solve( self, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, **kwargs: Any, ) -> Any: use_cache: bool = self.extra["use_cache"] # 是否使用dependency_cache缓存 dependency_cache = {} if dependency_cache is None else dependency_cache sub_dependent: Dependent = self.extra["dependent"] # 例如前文例子中的Dependent user_id函数 call = cast(Callable[..., Any], sub_dependent.call) # 相当于call = sub_dependent.call,例如前文的user_id函数 # 先使用已有的dependency_cache缓存运行sub_dependent,有缓存总比没有好 sub_values = await sub_dependent.solve( stack=stack, dependency_cache=dependency_cache, **kwargs, ) ``` sub_dependent是Dependent,嵌套嵌套依赖注入就是这么实现的 sub_values是通过依赖注入获取的参数,如Dependent use_id的sub_values就是{"event":xxx},因为use_id的参数是event ``` ``` 已经有了sub_dependent需要的参数sub_values,接下来调用sub_dependent ``` task: asyncio.Task[Any] if use_cache and call in dependency_cache: ``` 如果使用缓存并且已有call的可调用函数,那就调用它 如果没有对应call的则需要将其转化为可调用函数并存入dependency_cache ``` return await dependency_cache[call] elif is_gen_callable(call) or is_async_gen_callable(call): # 如果是生成器或异步生成器 assert isinstance( stack, AsyncExitStack ), "Generator dependency should be called in context" if is_gen_callable(call): # 如果是生成器 cm = run_sync_ctx_manager(contextmanager(call)(**sub_values)) else: # 如果是异步生成器 cm = asynccontextmanager(call)(**sub_values) task = asyncio.create_task(stack.enter_async_context(cm)) dependency_cache[call] = task return await task elif is_coroutine_callable(call): # 如果是协程函数,例如上例子的user_id函数 task = asyncio.create_task(call(**sub_values)) # 例如上例的[Task user_id(**{event:xxx})] dependency_cache[call] = task return await task else: # 如果是普通函数 task = asyncio.create_task(run_sync(call)(**sub_values)) dependency_cache[call] = task return await task ``` 把call统一转化为Task并存入dependency_cache,方便多次调用 await task获取返回值,例如上例就是从event中获取的user_id ```
class BotParam(Param): async def _solve(self, event: "Event", **kwargs: Any) -> Any: return bot # 直接返回bot,没什么好说的
class EventParam(Param): async def _solve(self, bot: "Bot", **kwargs: Any) -> Any: return event # 直接返回event,没什么好说的
class StateParam(Param): async def _solve(self, state: T_State, **kwargs: Any) -> Any: return state # 直接返回state,没什么好说的
class MatcherParam(Param): async def _solve(self, matcher: "Matcher", **kwargs: Any) -> Any: return matcher # 直接返回matcher,没什么好说的
class ArgParam(Param): async def _solve(self, matcher: "Matcher", **kwargs: Any) -> Any: message = matcher.get_arg(self.extra["key"]) # 从state中根据key获取响应数据,就是前文所提到的state if message is None: # 默认为Arg() return message if self.extra["type"] == "message": # 对应Arg() return message elif self.extra["type"] == "str": # 对应ArgStr() return str(message) else: # 对应ArgPlainText() return message.extract_plain_text()
class ExceptionParam(Param): async def _solve(self, exception: Optional[Exception] = None, **kwargs: Any) -> Any: return exception # 直接返回传入的exception
class DefaultParam(Param): async def _solve(self, **kwargs: Any) -> Any: return Undefined # 特殊用法,见前文
这样就通过Bot和Event构造出了各种Param的参数,由于ModelField还存有参数名,因此可以构造字典用于call的调用
Dependent.__call__
@dataclass(frozen=True) class Dependent(Generic[R]): async def __call__(self, **kwargs: Any) -> R: await self.check(**kwargs) values = await self.solve(**kwargs) # 获取了handler所需的参数 if is_coroutine_callable(self.call): return await cast(Callable[..., Awaitable[R]], self.call)(**values) # 如果handler是异步函数 else: return await run_sync(cast(Callable[..., R], self.call))(**values) # 如果handler是同步函数
Trie Match
try: TrieRule.get_value(bot, event, state) except Exception as e: logger.opt(colors=True, exception=e).warning( "Error while parsing command for event" )
TrieRule位于nonebot.rule
NoneBot支持以类似命令行的方式调用插件,如/xxx -a -b 10
利用TrieRule.get_value将这种命令拆分解析出来,放进state里,这样插件就可以直接使用
CMD_RESULT = TypedDict( "CMD_RESULT", { "command": Optional[Tuple[str, ...]], "raw_command": Optional[str], "command_arg": Optional[Message[MessageSegment]], "command_start": Optional[str], }, ) # 一个有代码提示的字典 TRIE_VALUE = NamedTuple( "TRIE_VALUE", [("command_start", str), ("command", Tuple[str, ...])] ) # 一个有代码提示的元组 class TrieRule: prefix: CharTrie = CharTrie() @classmethod def add_prefix(cls, prefix: str, value: TRIE_VALUE) -> None: if prefix in cls.prefix: logger.warning(f'Duplicated prefix rule "{prefix}"') return cls.prefix[prefix] = value @classmethod def get_value(cls, bot: Bot, event: Event, state: T_State) -> CMD_RESULT: prefix = CMD_RESULT( command=None, raw_command=None, command_arg=None, command_start=None ) # 相当于prefix = {command:None, raw_command:None, command_arg:None, command_start:None} state[PREFIX_KEY] = prefix # 存入state if event.get_type() != "message": # 如果事件类型不是message则直接结束 return prefix message = event.get_message() message_seg: MessageSegment = message[0] # 获取message第一个字段 if message_seg.is_text(): # 如果 第一个字段是文本 segment_text = str(message_seg).lstrip() # 删除第一个字段左边的空格 if pf := cls.prefix.longest_prefix(segment_text): # 找出cls.prefix中与segment_text的最长公共序列 value: TRIE_VALUE = pf.value prefix[RAW_CMD_KEY] = pf.key # /xxx -a -b 10 prefix[CMD_START_KEY] = value.command_start# /xxx prefix[CMD_KEY] = value.command # -a -b 10 msg = message.copy() msg.pop(0) new_message = msg.__class__(segment_text[len(pf.key) :].lstrip()) for new_segment in reversed(new_message): msg.insert(0, new_segment) prefix[CMD_ARG_KEY] = msg # msg是去除了命令前缀的消息 return prefix
Run Matcher
break_flag = False for priority in sorted(matchers.keys()): # 根据优先级运行,同一优先级的同时运行 if break_flag: break if show_log: logger.debug(f"Checking for matchers in priority {priority}...") pending_tasks = [ _check_matcher( priority, matcher, bot, event, state.copy(), stack, dependency_cache # 复制state防止污染 ) for matcher in matchers[priority] ] results = await asyncio.gather(*pending_tasks, return_exceptions=True) # 如果报错将报错作为结果返回 for result in results: if not isinstance(result, Exception): # 只关注报错 continue if isinstance(result, StopPropagation): # 出现StopPropagation后,下一优先级的Matcher将全都跳过,不影响event_postprocessors break_flag = True logger.debug("Stop event propagation") else: # 异常捕获 logger.opt(colors=True, exception=result).error( "" ) Error when checking Matcher.
_check_matcher
async def _check_matcher( priority: int, Matcher: Type[Matcher], bot: "Bot", event: "Event", state: T_State, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ) -> None: if Matcher.expire_time and datetime.now() > Matcher.expire_time: # 判断Matcher是否超过expire_time with contextlib.suppress(Exception): matchers[priority].remove(Matcher) # 移除过期的Matcher并返回 return try: if not await Matcher.check_perm( # 见后文 bot, event, stack, dependency_cache ) or not await Matcher.check_rule(bot, event, state, stack, dependency_cache): return except Exception as e: logger.opt(colors=True, exception=e).error( f"" ) return if Matcher.temp: # 如果Matcher是temp的 with contextlib.suppress(Exception): matchers[priority].remove(Matcher) # 运行一次就删除 await _run_matcher(Matcher, bot, event, state, stack, dependency_cache) Rule check failed for {Matcher}.
先校验permission和rule
Matcher.check_perm
class Matcher(metaclass=MatcherMeta): @classmethod async def check_perm( cls, bot: Bot, event: Event, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ) -> bool: event_type = event.get_type() return event_type == (cls.type or event_type) and await cls.permission( bot, event, stack, dependency_cache )
首先检查事件类型是否正确(如果Matcher.type为None则任何事件类型都可以通过检查),再检查permission
class Permission: async def __call__( self, bot: Bot, event: Event, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ) -> bool: if not self.checkers: # 空Permission直接通过检查 return True results = await asyncio.gather( *( run_coro_with_catch( checker( bot=bot, event=event, stack=stack, dependency_cache=dependency_cache, ), (SkippedException,), False, ) for checker in self.checkers # checker是Dependent[bool],返回值是bool值 ), ) return any(results) # 如果有checker返回值是False就不通过检查
Matcher.check_rule
class Matcher(metaclass=MatcherMeta): @classmethod async def check_rule( cls, bot: Bot, event: Event, state: T_State, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ) -> bool: event_type = event.get_type() return event_type == (cls.type or event_type) and await cls.rule( bot, event, state, stack, dependency_cache )
检查Rule时也是检查事件类型是否正确(如果Matcher.type为None则任何事件类型都可以通过检查),再检查Rule
class Rule: async def __call__( self, bot: Bot, event: Event, state: T_State, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ) -> bool: if not self.checkers: # 空Rule直接通过 return True try: results = await asyncio.gather( *( checker( bot=bot, event=event, state=state, stack=stack, dependency_cache=dependency_cache, ) for checker in self.checkers # checker是Dependent[bool],返回值是bool值 ) ) except SkippedException: # 在Rule的checker中可以使用SkippedException使检查直接通过 return False return all(results) # 如果有checker返回值是False就不通过检查
_run_matcher
permission和rule通过后调用_run_matcher
async def _run_matcher( Matcher: Type[Matcher], bot: "Bot", event: "Event", state: T_State, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ) -> None: logger.info(f"Event will be handled by {Matcher}") matcher = Matcher() ``` Matcher实例化,将handlers和state复制一份 他们与Matcher类的handlers和_default_state独立 self.handlers = self.handlers.copy() self.state = self._default_state.copy() ``` if coros := [ run_coro_with_catch( proc( matcher=matcher, bot=bot, event=event, state=state, stack=stack, dependency_cache=dependency_cache, ), (SkippedException,), ) for proc in _run_preprocessors ]: ``` run_preprocess,和event_preprocess几乎一样 用run_preprocessor装饰函数来注册 支持的Param、DependParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam ``` try: await asyncio.gather(*coros) except IgnoredException: logger.opt(colors=True).info(f"{matcher} running is cancelled") return except Exception as e: logger.opt(colors=True, exception=e).error( "" ) return exception = None try: logger.debug(f"Running {matcher}") await matcher.run(bot, event, state, stack, dependency_cache) # 后文介绍 except Exception as e: logger.opt(colors=True, exception=e).error( f" Error when running RunPreProcessors. Running cancelled! " ) exception = e if coros := [ run_coro_with_catch( proc( matcher=matcher, exception=exception, bot=bot, event=event, state=matcher.state, stack=stack, dependency_cache=dependency_cache, ), (SkippedException,), ) for proc in _run_postprocessors ]: ``` run_postprocess,和event_preprocess几乎一样 用run_postprocess装饰函数来注册 但是,它可以传入exception,也就是Matcher运行时抛出的错误,因此可以使用ExceptionParam 支持的Param、DependParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam、ExceptionParam ``` try: await asyncio.gather(*coros) except Exception as e: logger.opt(colors=True, exception=e).error( " Running {matcher} failed. " ) if matcher.block: # 如果matcher是block,就抛出StopPropagation异常,可以被捕获并阻止低优先度的Matcher运行 raise StopPropagation return Error when running RunPostProcessors
Matcher.simple_run
Matcher.run会先调用Matcher.simple_run
class Matcher(metaclass=MatcherMeta): async def simple_run( self, bot: Bot, event: Event, state: T_State, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ): logger.trace( f"{self} run with incoming args: " f"bot={bot}, event={event!r}, state={state!r}" ) b_t = current_bot.set(bot) # 将bot设为当前bot e_t = current_event.set(event) # 将event设为当前event m_t = current_matcher.set(self) # 将matcher设为当前matcher try: # Refresh preprocess state self.state.update(state) # 合并全局state到self.state while self.handlers: # 从第一个handler开始依次运行 handler = self.handlers.pop(0) current_handler.set(handler) # 将handler设为当前handler logger.debug(f"Running handler {handler}") try: await handler( matcher=self, bot=bot, event=event, state=self.state, stack=stack, dependency_cache=dependency_cache, ) # handler是Dependent,可以这样运行 except SkippedException: # 遇到SkippedException继续正常遍历handlers,对于其它异常会抛出simple_run logger.debug(f"Handler {handler} skipped") except StopPropagation: # 如果遇到StopPropagation,修改Matcher为block self.block = True finally: ``` 结束后清空current_bot、current_event、current_matcher 注意保留了current_handler,因为后面会用到 ``` logger.info(f"{self} running complete") current_bot.reset(b_t) current_event.reset(e_t) current_matcher.reset(m_t)
Matcher.run
class User: def __init__( self, users: Tuple[str, ...], perm: Optional[Permission] = None ) -> None: self.users = users # 白名单用户 self.perm = perm # 用户权限 async def __call__(self, bot: Bot, event: Event) -> bool: try: session = event.get_session_id() except Exception: return False return bool( session in self.users and (self.perm is None or await self.perm(bot, event)) ) # 需要当前用户在白名单且通过其它权限校验 def USER(*users: str, perm: Optional[Permission] = None): return Permission(User(users, perm)) class Matcher(metaclass=MatcherMeta): @classmethod def type_updater(cls, func: T_TypeUpdater) -> T_TypeUpdater: cls._default_type_updater = Dependent[str].parse( call=func, allow_types=cls.HANDLER_PARAM_TYPES ) return func # 注册_default_type_updater,会被转化为Dpendent,因此可以依赖注入 async def update_type(self, bot: Bot, event: Event) -> str: updater = self.__class__._default_type_updater return ( await updater(bot=bot, event=event, state=self.state, matcher=self) if updater else "message" ) # 运行通过type_updater装饰器注册的updater,返回值为updater返回值或message,就是event的新type @classmethod def permission_updater(cls, func: T_PermissionUpdater) -> T_PermissionUpdater: cls._default_permission_updater = Dependent[Permission].parse( call=func, allow_types=cls.HANDLER_PARAM_TYPES ) return func # 注册permission_updater,会被转化为Dpendent,因此可以依赖注入 async def update_permission(self, bot: Bot, event: Event) -> Permission: if updater := self.__class__._default_permission_updater: ``` 如果有通过permission_updater装饰器注册的updater,就运行它,返回值是新的Permission ``` return await updater(bot=bot, event=event, state=self.state, matcher=self) # 如果没有通过permission_updater装饰器注册的updater permission = self.permission # 获取原先的permission if len(permission.checkers) == 1 and isinstance( user_perm := tuple(permission.checkers)[0].call, User ): # ,则 permission = user_perm.perm return USER(event.get_session_id(), perm=permission) ``` 将当前用户设为会话对象 如果如果有permission且permission的第一个checker不是User类型的Permisson,则保留原先permission - 也就是说将任何人都能触发的Matcher变成只有一个人能触发的Matcher,并保留原先permission 否则只保留原先permission中关于User的permission - 也就是说User permission对所有用户适用,其它permission只对单个用户有效,更新User后废除 ``` async def run( self, bot: Bot, event: Event, state: T_State, stack: Optional[AsyncExitStack] = None, dependency_cache: Optional[T_DependencyCache] = None, ): try: await self.simple_run(bot, event, state, stack, dependency_cache) # 见上文 except RejectedException: # 如果handler抛出RejectedException,该情况将在下节具体介绍 await self.resolve_reject() type_ = await self.update_type(bot, event) # 更新事件类型 permission = await self.update_permission(bot, event) # 更新permission Matcher.new( type_, Rule(), permission, self.handlers, temp=True, # 只允行一次 priority=0, # 优先级为0,所以说不建议把Matcher的priority设为负数,可能出现异常 block=True, # 是block,因此总是在Matcher第一个运行、运行后会阻止之后的Matcher,并且删除自己 plugin=self.plugin, module=self.module, expire_time=bot.config.session_expire_timeout, default_state=self.state, default_type_updater=self.__class__._default_type_updater, default_permission_updater=self.__class__._default_permission_updater, ) except PausedException: # 如果handler抛出PausedException type_ = await self.update_type(bot, event) # 更新事件类型 permission = await self.update_permission(bot, event) # 更新permission Matcher.new( type_, Rule(), permission, self.handlers, temp=True, # 只允行一次 priority=0, # 优先级为0 block=True, # 是block,因此总是在Matcher第一个运行、运行后会阻止之后的Matcher,并且删除自己 plugin=self.plugin, module=self.module, expire_time=bot.config.session_expire_timeout, default_state=self.state, default_type_updater=self.__class__._default_type_updater, default_permission_updater=self.__class__._default_permission_updater, ) # 创建新Matcher except FinishedException: # 如果handler抛出FinishedException pass # 无后续操作 # 不管什么异常都会导致handler和Matcher的运行结束
后两种异常很好理解,但对于RejectedException可能难理解,因为需要结合下节的内容才能知道为什么这么设计
event_postprocessors
没什么好说的,不过支持所有Param(DependParam、ExceptionParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam)
if coros := [ run_coro_with_catch( proc( bot=bot, event=event, state=state, stack=stack, dependency_cache=dependency_cache, ), (SkippedException,), ) for proc in _event_postprocessors ]: try: if show_log: logger.debug("Running PostProcessors...") await asyncio.gather(*coros) except Exception as e: logger.opt(colors=True, exception=e).error( "" ) Error when running EventPostProcessors