第三节 Matcher的运行(handle_event)

realhuhu 157 0

以OneBot V11为例,看看Matcher是怎么运行的
定义一个简单插件

hello = on_startswith("hello")

@hello.handle()
async def _(bot:Bot, event:PrivateMessageEvent):
    hello.finish("hello")
  • 根据之前的内容,我们知道hello是一个Matcher子类
    它的type为message,并且有一个startswith的Rule
  • 之后hello注册了一个handler,handler会转化为Dependent handler
    由于OneBot V11的Bot、PrivateMessageEvent分别继承了基类Bot、Event,能够通过BotParam和EventParam的_check_perm
    因此Dependent handler的params包括两个ModelField,field_info分别为BotParam和EventParam
    由于不符合基类写法,这种handler只能由OneBot V11触发
    Dependent handler的parameterless为空

那么Matcher和handler是怎么触发的呢?
对于Onebot V11,消息入口为nonebot.adapters.onebot.v11.bot.Bot.handle_event
本文只讨论handle_event,即event的处理,下一节讨论call_api,即消息的回复

class Bot(BaseBot):
    async def handle_event(self, event: Event) -> None:
        if isinstance(event, MessageEvent):
            event.message.reduce()
            await _check_reply(self, event)
            _check_at_me(self, event)
            _check_nickname(self, event)

        await handle_event(self, event)

由于本章不讨论适配,因此只需要看handle_event函数,这是NoneBot的函数,位于nonebot.message

async def handle_event(bot: "Bot", event: "Event") -> None:
    show_log = True
    log_msg = f"{escape_tag(bot.type)} {escape_tag(bot.self_id)} | "
    try:
        log_msg += event.get_log_string()
    except NoLogException:
        show_log = False
    if show_log:
        logger.opt(colors=True).success(log_msg) # 日志打印,没什么好说的

    state: Dict[Any, Any] = {} # 用于保存一些状态,state在bot收到消息后生成,在消息发出后销毁,期间可以对其进行操作传递数据
    dependency_cache: T_DependencyCache = {}
    ```
    dependency_cache保存了各种**可调用函数**,可调用函数来源于Param或普通函数(Depends的参数)
    Param或普通函数的调用可能有许多次类型转换,将最后转换成的可调用对象存起来,方便再次调用
    类型为Dict[_DependentCallable[Any], Task[Any]]
    ```

    async with AsyncExitStack() as stack:
        if coros := [
            run_coro_with_catch(
                proc(
                    bot=bot,
                    event=event,
                    state=state,
                    stack=stack,
                    dependency_cache=dependency_cache,
                ),
                (SkippedException,),
            )
            for proc in _event_preprocessors
        ]:
            try:
                if show_log:
                    logger.debug("Running PreProcessors...")
                await asyncio.gather(*coros)
            except IgnoredException as e:
                logger.opt(colors=True).info(
                    f"Event {escape_tag(event.get_event_name())} is ignored"
                )
                return
            except Exception as e:
                logger.opt(colors=True, exception=e).error(
                    "Error when running EventPreProcessors. "
                    "Event ignored!"
                )
                return

        try:
            TrieRule.get_value(bot, event, state)
        except Exception as e:
            logger.opt(colors=True, exception=e).warning(
                "Error while parsing command for event"
            )

        break_flag = False
        for priority in sorted(matchers.keys()):
            if break_flag:
                break

            if show_log:
                logger.debug(f"Checking for matchers in priority {priority}...")

            pending_tasks = [
                _check_matcher(
                    priority, matcher, bot, event, state.copy(), stack, dependency_cache
                )
                for matcher in matchers[priority]
            ]

            results = await asyncio.gather(*pending_tasks, return_exceptions=True)

            for result in results:
                if not isinstance(result, Exception):
                    continue
                if isinstance(result, StopPropagation):
                    break_flag = True
                    logger.debug("Stop event propagation")
                else:
                    logger.opt(colors=True, exception=result).error(
                        "Error when checking Matcher."
                    )

        if coros := [
            run_coro_with_catch(
                proc(
                    bot=bot,
                    event=event,
                    state=state,
                    stack=stack,
                    dependency_cache=dependency_cache,
                ),
                (SkippedException,),
            )
            for proc in _event_postprocessors
        ]:
            try:
                if show_log:
                    logger.debug("Running PostProcessors...")
                await asyncio.gather(*coros)
            except Exception as e:
                logger.opt(colors=True, exception=e).error(
                    "Error when running EventPostProcessors"
                )

除去日志打印,分为四部分,下面一一介绍

event_preprocess

事件预处理最先触发,通过nonebot.message.event_preprocessor注册事件预处理器,使用IgnoredException可以终止事件传递
事件预处理器是Dependent类型,具有依赖注入功能,允许的Param有DependParam、BotParam、EventParam、StateParam、DefaultParam
例如

@event_preprocessor
async def _(bot) # 还记得这种写法的意思吗?任何adapter都能触发
    if bot.type == "OneBot V11" # 跳过OneBot V11适配器的所有事件
        raise IgnoredException

@event_preprocessor
async def _(event:v11.PrivateMessageEvent) # 由于Event不是基类,该event_preprocessor只能由OneBot V11适配器触发
    if event.sender.user_id == 1000 # 忽视qq号为1000的用户的私聊
        raise IgnoredException

下面分析event_preprocessor的运行

if coros := [
    run_coro_with_catch( # 运行协程并且忽视特定报错(这里是SkippedException)
        proc( # proc是Dependent,调用Dependent的__call__
            bot=bot,
            event=event,
            state=state,
            stack=stack,
            dependency_cache=dependency_cache, # 
        ),
        (SkippedException,), # 忽视协程的SkippedException,意味着你可以使用nonebot.exception.SkippedException提前结束event_preprocessors但不结束事件传递
    )
    for proc in _event_preprocessors
]:
    try:
        if show_log:
            logger.debug("Running PreProcessors...")
        await asyncio.gather(*coros) # 同时运行所有协程
    except IgnoredException as e: # 如果event_preprocessor抛出IgnoredException就终止事件的传递
        logger.opt(colors=True).info(
            f"Event {escape_tag(event.get_event_name())} is ignored"
        )
        return
    except Exception as e: # 异常捕获
        logger.opt(colors=True, exception=e).error(
            "Error when running EventPreProcessors. "
            "Event ignored!"
        )
        return

代码都很简单,下面我们来看Dependent是如何调用的

补充:Dependent的调用

@dataclass(frozen=True)
class Dependent(Generic[R]):
    async def __call__(self, **kwargs: Any) -> R:
        ```
        params可能包括bot、event、state、stack、matcher、dependency_cache,具体看是在哪调用的
        ```
        await self.check(**kwargs) # 检查

        values = await self.solve(**kwargs) # 从上下文获取变量

        if is_coroutine_callable(self.call): # 调用call
            return await cast(Callable[..., Awaitable[R]], self.call)(**values)
        else:
            return await run_sync(cast(Callable[..., R], self.call))(**values)

Dependent.check

@dataclass(frozen=True)
class Dependent(Generic[R]):
    async def check(self, **params: Any) -> None:
        try:
            await asyncio.gather(
                *(param._check(**params) for param in self.parameterless)
            ) # 先检查parameterless,由于parameterless的元素就是Param子类,可以直调用Param._check
            await asyncio.gather(
                *(
                    cast(Param, param.field_info)._check(**params) # 等价于param.field_info._check(**params)
                    for param in self.params
                )
            ) # 再检查param,由于self.params元素是ModelField,需要先获取field_info,类型为Param子类
        except SkippedException as e: 如果检查过程中出现SkippedException就抛出
            logger.trace(f"{self} skipped due to {e}")
            raise

因此Dependent.check本质上是调用Param._check,之前我们已经讲过Param的实例化,现在讲Param的调用
StateParam、MatcherParam、ArgParam、ExceptionParam、DefaultParam没有_check,会调用Param基类,因此无事发生

class DependParam(Param):
	async def _check(self, **kwargs: Any) -> None:
		sub_dependent: Dependent = self.extra["dependent"]
		await sub_dependent.check(**kwargs)
	```
	还记得sub_dependent是什么吗?
	例如
	async def user_id(event:PrviateMessageEvent):
		return event.sender.user_id
	@xxx.handle()
	async def _(user_id=Depends(user_id)):
		pass
	Depends会调用DependsInner类,将user_id作为DependsInner的对象属性self.dependency
	之后DependParam._check_param时,会将self.dependency转化为Dependent作为变量dependent存到DependParam
	由于DependParam是Param子类,Param是FieldInfo子类,因此可以通过self.extra["dependent"]获取DependParam实例化时的变量dependent
	所以在这里sub_dependent就是函数user_id转化的Dependent
	于是DependParam._check时,就是调用sub_dependent._check,这样就能够实现嵌套检查
	```
class BotParam(Param):
	async def _check(self, bot: "Bot", **kwargs: Any) -> None:
		if checker := self.extra.get("checker"):
			check_field_type(checker, bot)
	```
	self.extra.get("checker")就是_check_param时实例化BotParam时传入的checker,是ModelField类型
	第一节已经介绍过BotParam的checker了,这里不再详细介绍
	对于基类Bot,checker为None,对于协议Bot,只有相应Bot能通过check
	比如如果同时接入了qq协议端和微信协议端,那么类型注释是qq Bot的handler无法被微信协议的消息触发,因为BotParam._check无法通过
	```
class EventParam(Param):
	async def _check(self, bot: "EventParam", **kwargs: Any) -> None:
		if checker := self.extra.get("checker"):
			check_field_type(checker, bot)
	```
	和BotParam如出一辙
	```

Dependent.solve

def check_field_type(field: ModelField, value: V) -> V: # 验证value是不是ModelField,是的话返回value
    _, errs_ = field.validate(value, {}, loc=())
    if errs_:
        raise TypeMisMatch(field, value)
    return value

@dataclass(frozen=True)
class Dependent(Generic[R]):
    async def _solve_field(self, field: ModelField, params: Dict[str, Any]) -> Any:
        value = await cast(Param, field.field_info)._solve(**params)
        if value is Undefined:
            ```
            如果Param._solve的结果是Undefined,就获取field的默认值
            还记得之前说过只有DefautParam的ModelField有默认值吗这个判断就是为DefautParam设计的
            ```
            value = field.get_default()
        return check_field_type(field, value) # 检查类型

    async def solve(self, **params: Any) -> Dict[str, Any]:
        for param in self.parameterless: # 由于parameterless的元素是Param子类,可以直接调用_solve
            await param._solve(**params)

        values = await asyncio.gather(
            *(self._solve_field(field, params) for field in self.params) # params的元素是ModelField,需要先取出field_info再调用_solve
        )
        return {field.name: value for field, value in zip(self.params, values)}
    ```
    可以看出,对于parameterless参数,运行时只是调用它的_solve
    也就是说parameterless参数的返回值不重要,而在于进行一些操作,例如修改上下文
    对于params,将_solve的结果与参数名构成字典
    ```

下面来看各种Param._solve

class DependParam(Param):
	async def _solve(
		self,
		stack: Optional[AsyncExitStack] = None,
		dependency_cache: Optional[T_DependencyCache] = None,
		**kwargs: Any,
	) -> Any:
		use_cache: bool = self.extra["use_cache"] # 是否使用dependency_cache缓存
		dependency_cache = {} if dependency_cache is None else dependency_cache

		sub_dependent: Dependent = self.extra["dependent"] # 例如前文例子中的Dependent user_id函数
		call = cast(Callable[..., Any], sub_dependent.call) # 相当于call = sub_dependent.call,例如前文的user_id函数

		# 先使用已有的dependency_cache缓存运行sub_dependent,有缓存总比没有好
		sub_values = await sub_dependent.solve(
			stack=stack,
			dependency_cache=dependency_cache,
			**kwargs,
		)
		```
		sub_dependent是Dependent,嵌套嵌套依赖注入就是这么实现的
		sub_values是通过依赖注入获取的参数,如Dependent use_id的sub_values就是{"event":xxx},因为use_id的参数是event
		```

		```
		已经有了sub_dependent需要的参数sub_values,接下来调用sub_dependent
		```
		task: asyncio.Task[Any]
		if use_cache and call in dependency_cache:
			```
			如果使用缓存并且已有call的可调用函数,那就调用它
			如果没有对应call的则需要将其转化为可调用函数并存入dependency_cache
			```
			return await dependency_cache[call]
		elif is_gen_callable(call) or is_async_gen_callable(call): # 如果是生成器或异步生成器
			assert isinstance(
				stack, AsyncExitStack
			), "Generator dependency should be called in context"
			if is_gen_callable(call): # 如果是生成器
				cm = run_sync_ctx_manager(contextmanager(call)(**sub_values))
			else: # 如果是异步生成器
				cm = asynccontextmanager(call)(**sub_values)
			task = asyncio.create_task(stack.enter_async_context(cm))
			dependency_cache[call] = task
			return await task
		elif is_coroutine_callable(call): # 如果是协程函数,例如上例子的user_id函数
			task = asyncio.create_task(call(**sub_values)) # 例如上例的[Task user_id(**{event:xxx})]
			dependency_cache[call] = task
			return await task
		else: # 如果是普通函数
			task = asyncio.create_task(run_sync(call)(**sub_values))
			dependency_cache[call] = task
			return await task
		```
		把call统一转化为Task并存入dependency_cache,方便多次调用
		await task获取返回值,例如上例就是从event中获取的user_id
		```
class BotParam(Param):
	async def _solve(self, event: "Event", **kwargs: Any) -> Any:
		return bot # 直接返回bot,没什么好说的
class EventParam(Param):
	async def _solve(self, bot: "Bot", **kwargs: Any) -> Any:
		return event # 直接返回event,没什么好说的
class StateParam(Param):
	async def _solve(self, state: T_State, **kwargs: Any) -> Any:
		return state # 直接返回state,没什么好说的
class MatcherParam(Param):
	async def _solve(self, matcher: "Matcher", **kwargs: Any) -> Any:
		return matcher # 直接返回matcher,没什么好说的
class ArgParam(Param):
	async def _solve(self, matcher: "Matcher", **kwargs: Any) -> Any:
		message = matcher.get_arg(self.extra["key"]) # 从state中根据key获取响应数据,就是前文所提到的state
		if message is None: # 默认为Arg()
			return message
		if self.extra["type"] == "message": # 对应Arg()
			return message
		elif self.extra["type"] == "str": # 对应ArgStr()
			return str(message)
		else: # 对应ArgPlainText()
			return message.extract_plain_text()
class ExceptionParam(Param):
	async def _solve(self, exception: Optional[Exception] = None, **kwargs: Any) -> Any:
		return exception # 直接返回传入的exception
class DefaultParam(Param):
	async def _solve(self, **kwargs: Any) -> Any:
		return Undefined # 特殊用法,见前文

这样就通过Bot和Event构造出了各种Param的参数,由于ModelField还存有参数名,因此可以构造字典用于call的调用

Dependent.__call__

@dataclass(frozen=True)
class Dependent(Generic[R]):
    async def __call__(self, **kwargs: Any) -> R:
        await self.check(**kwargs)

        values = await self.solve(**kwargs) # 获取了handler所需的参数

        if is_coroutine_callable(self.call):
            return await cast(Callable[..., Awaitable[R]], self.call)(**values) # 如果handler是异步函数
        else:
            return await run_sync(cast(Callable[..., R], self.call))(**values) # 如果handler是同步函数
至此我们就讲完了所有关于Dependent的代码

Trie Match

try:
    TrieRule.get_value(bot, event, state)
except Exception as e:
    logger.opt(colors=True, exception=e).warning(
        "Error while parsing command for event"
    )

TrieRule位于nonebot.rule
NoneBot支持以类似命令行的方式调用插件,如/xxx -a -b 10
利用TrieRule.get_value将这种命令拆分解析出来,放进state里,这样插件就可以直接使用

CMD_RESULT = TypedDict(
    "CMD_RESULT",
    {
        "command": Optional[Tuple[str, ...]],
        "raw_command": Optional[str],
        "command_arg": Optional[Message[MessageSegment]],
        "command_start": Optional[str],
    },
) # 一个有代码提示的字典

TRIE_VALUE = NamedTuple(
    "TRIE_VALUE", [("command_start", str), ("command", Tuple[str, ...])]
) # 一个有代码提示的元组

class TrieRule:
    prefix: CharTrie = CharTrie()

    @classmethod
    def add_prefix(cls, prefix: str, value: TRIE_VALUE) -> None:
        if prefix in cls.prefix:
            logger.warning(f'Duplicated prefix rule "{prefix}"')
            return
        cls.prefix[prefix] = value

    @classmethod
    def get_value(cls, bot: Bot, event: Event, state: T_State) -> CMD_RESULT:
        prefix = CMD_RESULT(
            command=None, raw_command=None, command_arg=None, command_start=None
        ) # 相当于prefix = {command:None, raw_command:None, command_arg:None, command_start:None}
        state[PREFIX_KEY] = prefix # 存入state
        if event.get_type() != "message": # 如果事件类型不是message则直接结束
            return prefix

        message = event.get_message()
        message_seg: MessageSegment = message[0] # 获取message第一个字段
        if message_seg.is_text(): # 如果 第一个字段是文本
            segment_text = str(message_seg).lstrip() # 删除第一个字段左边的空格
            if pf := cls.prefix.longest_prefix(segment_text): # 找出cls.prefix中与segment_text的最长公共序列
                value: TRIE_VALUE = pf.value
                prefix[RAW_CMD_KEY] = pf.key # /xxx -a -b 10
                prefix[CMD_START_KEY] = value.command_start# /xxx
                prefix[CMD_KEY] = value.command # -a -b 10
                msg = message.copy()
                msg.pop(0)
                new_message = msg.__class__(segment_text[len(pf.key) :].lstrip())
                for new_segment in reversed(new_message):
                    msg.insert(0, new_segment)
                prefix[CMD_ARG_KEY] = msg # msg是去除了命令前缀的消息

        return prefix

Run Matcher

break_flag = False
for priority in sorted(matchers.keys()): # 根据优先级运行,同一优先级的同时运行
    if break_flag:
        break

    if show_log:
        logger.debug(f"Checking for matchers in priority {priority}...")

    pending_tasks = [
        _check_matcher(
            priority, matcher, bot, event, state.copy(), stack, dependency_cache # 复制state防止污染
        )
        for matcher in matchers[priority]
    ]

    results = await asyncio.gather(*pending_tasks, return_exceptions=True) # 如果报错将报错作为结果返回

    for result in results:
        if not isinstance(result, Exception): # 只关注报错
            continue
        if isinstance(result, StopPropagation): # 出现StopPropagation后,下一优先级的Matcher将全都跳过,不影响event_postprocessors
            break_flag = True
            logger.debug("Stop event propagation")
        else: # 异常捕获
            logger.opt(colors=True, exception=result).error(
                "Error when checking Matcher."
            )

_check_matcher

async def _check_matcher(
    priority: int,
    Matcher: Type[Matcher],
    bot: "Bot",
    event: "Event",
    state: T_State,
    stack: Optional[AsyncExitStack] = None,
    dependency_cache: Optional[T_DependencyCache] = None,
) -> None:
    if Matcher.expire_time and datetime.now() > Matcher.expire_time: # 判断Matcher是否超过expire_time
        with contextlib.suppress(Exception):
            matchers[priority].remove(Matcher) # 移除过期的Matcher并返回
        return

    try:
        if not await Matcher.check_perm( # 见后文
            bot, event, stack, dependency_cache
        ) or not await Matcher.check_rule(bot, event, state, stack, dependency_cache):
            return
    except Exception as e:
        logger.opt(colors=True, exception=e).error(
            f"Rule check failed for {Matcher}."
        )
        return

    if Matcher.temp: # 如果Matcher是temp的
        with contextlib.suppress(Exception):
            matchers[priority].remove(Matcher) # 运行一次就删除
    await _run_matcher(Matcher, bot, event, state, stack, dependency_cache)

先校验permission和rule

Matcher.check_perm

class Matcher(metaclass=MatcherMeta):
    @classmethod
    async def check_perm(
        cls,
        bot: Bot,
        event: Event,
        stack: Optional[AsyncExitStack] = None,
        dependency_cache: Optional[T_DependencyCache] = None,
    ) -> bool:
        event_type = event.get_type()
        return event_type == (cls.type or event_type) and await cls.permission(
            bot, event, stack, dependency_cache
        )

首先检查事件类型是否正确(如果Matcher.type为None则任何事件类型都可以通过检查),再检查permission

class Permission:
    async def __call__(
        self,
        bot: Bot,
        event: Event,
        stack: Optional[AsyncExitStack] = None,
        dependency_cache: Optional[T_DependencyCache] = None,
    ) -> bool:
        if not self.checkers: # 空Permission直接通过检查
            return True
        results = await asyncio.gather(
            *(
                run_coro_with_catch(
                    checker(
                        bot=bot,
                        event=event,
                        stack=stack,
                        dependency_cache=dependency_cache,
                    ),
                    (SkippedException,),
                    False,
                )
                for checker in self.checkers # checker是Dependent[bool],返回值是bool值
            ),
        )
        return any(results) # 如果有checker返回值是False就不通过检查

Matcher.check_rule

class Matcher(metaclass=MatcherMeta):
    @classmethod
    async def check_rule(
        cls,
        bot: Bot,
        event: Event,
        state: T_State,
        stack: Optional[AsyncExitStack] = None,
        dependency_cache: Optional[T_DependencyCache] = None,
    ) -> bool:
        event_type = event.get_type()
        return event_type == (cls.type or event_type) and await cls.rule(
            bot, event, state, stack, dependency_cache
        )

检查Rule时也是检查事件类型是否正确(如果Matcher.type为None则任何事件类型都可以通过检查),再检查Rule

class Rule:
    async def __call__(
        self,
        bot: Bot,
        event: Event,
        state: T_State,
        stack: Optional[AsyncExitStack] = None,
        dependency_cache: Optional[T_DependencyCache] = None,
    ) -> bool:
        if not self.checkers: # 空Rule直接通过
            return True
        try:
            results = await asyncio.gather(
                *(
                    checker(
                        bot=bot,
                        event=event,
                        state=state,
                        stack=stack,
                        dependency_cache=dependency_cache,
                    )
                    for checker in self.checkers # checker是Dependent[bool],返回值是bool值
                )
            )
        except SkippedException: # 在Rule的checker中可以使用SkippedException使检查直接通过
            return False
        return all(results) # 如果有checker返回值是False就不通过检查

_run_matcher

permission和rule通过后调用_run_matcher

async def _run_matcher(
    Matcher: Type[Matcher],
    bot: "Bot",
    event: "Event",
    state: T_State,
    stack: Optional[AsyncExitStack] = None,
    dependency_cache: Optional[T_DependencyCache] = None,
) -> None:
    logger.info(f"Event will be handled by {Matcher}")
    matcher = Matcher()
    ```
    Matcher实例化,将handlers和state复制一份
    他们与Matcher类的handlers和_default_state独立
    self.handlers = self.handlers.copy()
    self.state = self._default_state.copy()
    ```
    if coros := [
        run_coro_with_catch(
            proc(
                matcher=matcher,
                bot=bot,
                event=event,
                state=state,
                stack=stack,
                dependency_cache=dependency_cache,
            ),
            (SkippedException,),
        )
        for proc in _run_preprocessors
    ]:
        ```
        run_preprocess,和event_preprocess几乎一样
        用run_preprocessor装饰函数来注册
        支持的Param、DependParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam
        ```
        try:
            await asyncio.gather(*coros)
        except IgnoredException:
            logger.opt(colors=True).info(f"{matcher} running is cancelled")
            return
        except Exception as e:
            logger.opt(colors=True, exception=e).error(
                "Error when running RunPreProcessors. Running cancelled!"
            )
            return

    exception = None
    try:
        logger.debug(f"Running {matcher}")
        await matcher.run(bot, event, state, stack, dependency_cache) # 后文介绍
    except Exception as e:
        logger.opt(colors=True, exception=e).error(
            f"Running {matcher} failed."
        )
        exception = e

    if coros := [
        run_coro_with_catch(
            proc(
                matcher=matcher,
                exception=exception,
                bot=bot,
                event=event,
                state=matcher.state,
                stack=stack,
                dependency_cache=dependency_cache,
        ),
        (SkippedException,),
        )
        for proc in _run_postprocessors
    ]:
        ```
        run_postprocess,和event_preprocess几乎一样
        用run_postprocess装饰函数来注册
        但是,它可以传入exception,也就是Matcher运行时抛出的错误,因此可以使用ExceptionParam
        支持的Param、DependParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam、ExceptionParam
        ```
        try:
            await asyncio.gather(*coros)
        except Exception as e:
            logger.opt(colors=True, exception=e).error(
                "Error when running RunPostProcessors"
            )
        if matcher.block: # 如果matcher是block,就抛出StopPropagation异常,可以被捕获并阻止低优先度的Matcher运行
            raise StopPropagation
        return

Matcher.simple_run

Matcher.run会先调用Matcher.simple_run

class Matcher(metaclass=MatcherMeta):
    async def simple_run(
        self,
        bot: Bot,
        event: Event,
        state: T_State,
        stack: Optional[AsyncExitStack] = None,
        dependency_cache: Optional[T_DependencyCache] = None,
    ):
        logger.trace(
            f"{self} run with incoming args: "
            f"bot={bot}, event={event!r}, state={state!r}"
        )
        b_t = current_bot.set(bot) # 将bot设为当前bot
        e_t = current_event.set(event) # 将event设为当前event
        m_t = current_matcher.set(self) # 将matcher设为当前matcher
        try:
            # Refresh preprocess state
            self.state.update(state) # 合并全局state到self.state

            while self.handlers: # 从第一个handler开始依次运行
                handler = self.handlers.pop(0)
                current_handler.set(handler) # 将handler设为当前handler
                logger.debug(f"Running handler {handler}")
                try:
                    await handler(
                        matcher=self,
                        bot=bot,
                        event=event,
                        state=self.state,
                        stack=stack,
                        dependency_cache=dependency_cache,
                    ) # handler是Dependent,可以这样运行
                except SkippedException: # 遇到SkippedException继续正常遍历handlers,对于其它异常会抛出simple_run
                    logger.debug(f"Handler {handler} skipped")
        except StopPropagation: # 如果遇到StopPropagation,修改Matcher为block
            self.block = True
        finally:
            ```
            结束后清空current_bot、current_event、current_matcher
            注意保留了current_handler,因为后面会用到
            ```
            logger.info(f"{self} running complete")
            current_bot.reset(b_t)
            current_event.reset(e_t)
            current_matcher.reset(m_t)

Matcher.run

class User:
    def __init__(
        self, users: Tuple[str, ...], perm: Optional[Permission] = None
    ) -> None:
        self.users = users # 白名单用户
        self.perm = perm # 用户权限

    async def __call__(self, bot: Bot, event: Event) -> bool:
        try:
            session = event.get_session_id()
        except Exception:
            return False
        return bool(
            session in self.users and (self.perm is None or await self.perm(bot, event))
        ) # 需要当前用户在白名单且通过其它权限校验

def USER(*users: str, perm: Optional[Permission] = None):
    return Permission(User(users, perm))

class Matcher(metaclass=MatcherMeta):
    @classmethod
    def type_updater(cls, func: T_TypeUpdater) -> T_TypeUpdater:
        cls._default_type_updater = Dependent[str].parse(
            call=func, allow_types=cls.HANDLER_PARAM_TYPES
        )
        return func
    # 注册_default_type_updater,会被转化为Dpendent,因此可以依赖注入

    async def update_type(self, bot: Bot, event: Event) -> str:
        updater = self.__class__._default_type_updater
        return (
            await updater(bot=bot, event=event, state=self.state, matcher=self)
            if updater
            else "message"
        )
    # 运行通过type_updater装饰器注册的updater,返回值为updater返回值或message,就是event的新type

    @classmethod
    def permission_updater(cls, func: T_PermissionUpdater) -> T_PermissionUpdater:
        cls._default_permission_updater = Dependent[Permission].parse(
            call=func, allow_types=cls.HANDLER_PARAM_TYPES
        )
        return func
    # 注册permission_updater,会被转化为Dpendent,因此可以依赖注入

    async def update_permission(self,  bot: Bot, event: Event) -> Permission:
        if updater := self.__class__._default_permission_updater:
            ```
            如果有通过permission_updater装饰器注册的updater,就运行它,返回值是新的Permission
            ```
            return await updater(bot=bot, event=event, state=self.state, matcher=self)
        # 如果没有通过permission_updater装饰器注册的updater
        permission = self.permission # 获取原先的permission
        if len(permission.checkers) == 1 and isinstance(
            user_perm := tuple(permission.checkers)[0].call, User
        ): # ,则
            permission = user_perm.perm
        return USER(event.get_session_id(), perm=permission)
        ```
        将当前用户设为会话对象
        如果如果有permission且permission的第一个checker不是User类型的Permisson,则保留原先permission
        - 也就是说将任何人都能触发的Matcher变成只有一个人能触发的Matcher,并保留原先permission
        否则只保留原先permission中关于User的permission
        - 也就是说User permission对所有用户适用,其它permission只对单个用户有效,更新User后废除
        ```

    async def run(
        self,
        bot: Bot,
        event: Event,
        state: T_State,
        stack: Optional[AsyncExitStack] = None,
        dependency_cache: Optional[T_DependencyCache] = None,
    ):
        try:
            await self.simple_run(bot, event, state, stack, dependency_cache) # 见上文
        except RejectedException: # 如果handler抛出RejectedException,该情况将在下节具体介绍
            await self.resolve_reject()
            type_ = await self.update_type(bot, event) # 更新事件类型
            permission = await self.update_permission(bot, event) # 更新permission
            Matcher.new(
                type_,
                Rule(),
                permission,
                self.handlers,
                temp=True, # 只允行一次
                priority=0, # 优先级为0,所以说不建议把Matcher的priority设为负数,可能出现异常
                block=True, # 是block,因此总是在Matcher第一个运行、运行后会阻止之后的Matcher,并且删除自己
                plugin=self.plugin,
                module=self.module,
                expire_time=bot.config.session_expire_timeout,
                default_state=self.state,
                default_type_updater=self.__class__._default_type_updater,
                default_permission_updater=self.__class__._default_permission_updater,
            )
        except PausedException: # 如果handler抛出PausedException
            type_ = await self.update_type(bot, event) # 更新事件类型
            permission = await self.update_permission(bot, event) # 更新permission
            Matcher.new(
                type_,
                Rule(),
                permission,
                self.handlers,
                temp=True, # 只允行一次
                priority=0, # 优先级为0
                block=True, # 是block,因此总是在Matcher第一个运行、运行后会阻止之后的Matcher,并且删除自己
                plugin=self.plugin,
                module=self.module,
                expire_time=bot.config.session_expire_timeout,
                default_state=self.state,
                default_type_updater=self.__class__._default_type_updater,
                default_permission_updater=self.__class__._default_permission_updater,
            ) # 创建新Matcher
        except FinishedException: # 如果handler抛出FinishedException
            pass # 无后续操作
        # 不管什么异常都会导致handler和Matcher的运行结束

后两种异常很好理解,但对于RejectedException可能难理解,因为需要结合下节的内容才能知道为什么这么设计

event_postprocessors

没什么好说的,不过支持所有Param(DependParam、ExceptionParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam)

if coros := [
    run_coro_with_catch(
        proc(
            bot=bot,
            event=event,
            state=state,
            stack=stack,
            dependency_cache=dependency_cache,
        ),
        (SkippedException,),
    )
    for proc in _event_postprocessors
]:
    try:
        if show_log:
            logger.debug("Running PostProcessors...")
        await asyncio.gather(*coros)
    except Exception as e:
        logger.opt(colors=True, exception=e).error(
            "Error when running EventPostProcessors"
        )

发表评论 取消回复
表情 图片 链接 代码

分享