以OneBot V11为例,看看Matcher是怎么运行的
定义一个简单插件
hello = on_startswith("hello")
@hello.handle()
async def _(bot:Bot, event:PrivateMessageEvent):
hello.finish("hello")
- 根据之前的内容,我们知道hello是一个Matcher子类
它的type为message,并且有一个startswith的Rule - 之后hello注册了一个handler,handler会转化为Dependent handler
由于OneBot V11的Bot、PrivateMessageEvent分别继承了基类Bot、Event,能够通过BotParam和EventParam的_check_perm
因此Dependent handler的params包括两个ModelField,field_info分别为BotParam和EventParam
由于不符合基类写法,这种handler只能由OneBot V11触发
Dependent handler的parameterless为空
那么Matcher和handler是怎么触发的呢?
对于Onebot V11,消息入口为nonebot.adapters.onebot.v11.bot.Bot.handle_event
本文只讨论handle_event,即event的处理,下一节讨论call_api,即消息的回复
class Bot(BaseBot):
async def handle_event(self, event: Event) -> None:
if isinstance(event, MessageEvent):
event.message.reduce()
await _check_reply(self, event)
_check_at_me(self, event)
_check_nickname(self, event)
await handle_event(self, event)
由于本章不讨论适配,因此只需要看handle_event函数,这是NoneBot的函数,位于nonebot.message
async def handle_event(bot: "Bot", event: "Event") -> None:
show_log = True
log_msg = f"{escape_tag(bot.type)} {escape_tag(bot.self_id)} | "
try:
log_msg += event.get_log_string()
except NoLogException:
show_log = False
if show_log:
logger.opt(colors=True).success(log_msg) # 日志打印,没什么好说的
state: Dict[Any, Any] = {} # 用于保存一些状态,state在bot收到消息后生成,在消息发出后销毁,期间可以对其进行操作传递数据
dependency_cache: T_DependencyCache = {}
```
dependency_cache保存了各种**可调用函数**,可调用函数来源于Param或普通函数(Depends的参数)
Param或普通函数的调用可能有许多次类型转换,将最后转换成的可调用对象存起来,方便再次调用
类型为Dict[_DependentCallable[Any], Task[Any]]
```
async with AsyncExitStack() as stack:
if coros := [
run_coro_with_catch(
proc(
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
)
for proc in _event_preprocessors
]:
try:
if show_log:
logger.debug("Running PreProcessors...")
await asyncio.gather(*coros)
except IgnoredException as e:
logger.opt(colors=True).info(
f"Event {escape_tag(event.get_event_name())} is ignored"
)
return
except Exception as e:
logger.opt(colors=True, exception=e).error(
"Error when running EventPreProcessors. "
"Event ignored! "
)
return
try:
TrieRule.get_value(bot, event, state)
except Exception as e:
logger.opt(colors=True, exception=e).warning(
"Error while parsing command for event"
)
break_flag = False
for priority in sorted(matchers.keys()):
if break_flag:
break
if show_log:
logger.debug(f"Checking for matchers in priority {priority}...")
pending_tasks = [
_check_matcher(
priority, matcher, bot, event, state.copy(), stack, dependency_cache
)
for matcher in matchers[priority]
]
results = await asyncio.gather(*pending_tasks, return_exceptions=True)
for result in results:
if not isinstance(result, Exception):
continue
if isinstance(result, StopPropagation):
break_flag = True
logger.debug("Stop event propagation")
else:
logger.opt(colors=True, exception=result).error(
"Error when checking Matcher. "
)
if coros := [
run_coro_with_catch(
proc(
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
)
for proc in _event_postprocessors
]:
try:
if show_log:
logger.debug("Running PostProcessors...")
await asyncio.gather(*coros)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"Error when running EventPostProcessors "
)
除去日志打印,分为四部分,下面一一介绍
event_preprocess
事件预处理最先触发,通过nonebot.message.event_preprocessor注册事件预处理器,使用IgnoredException可以终止事件传递
事件预处理器是Dependent类型,具有依赖注入功能,允许的Param有DependParam、BotParam、EventParam、StateParam、DefaultParam
例如
@event_preprocessor
async def _(bot) # 还记得这种写法的意思吗?任何adapter都能触发
if bot.type == "OneBot V11" # 跳过OneBot V11适配器的所有事件
raise IgnoredException
@event_preprocessor
async def _(event:v11.PrivateMessageEvent) # 由于Event不是基类,该event_preprocessor只能由OneBot V11适配器触发
if event.sender.user_id == 1000 # 忽视qq号为1000的用户的私聊
raise IgnoredException
下面分析event_preprocessor的运行
if coros := [
run_coro_with_catch( # 运行协程并且忽视特定报错(这里是SkippedException)
proc( # proc是Dependent,调用Dependent的__call__
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache, #
),
(SkippedException,), # 忽视协程的SkippedException,意味着你可以使用nonebot.exception.SkippedException提前结束event_preprocessors但不结束事件传递
)
for proc in _event_preprocessors
]:
try:
if show_log:
logger.debug("Running PreProcessors...")
await asyncio.gather(*coros) # 同时运行所有协程
except IgnoredException as e: # 如果event_preprocessor抛出IgnoredException就终止事件的传递
logger.opt(colors=True).info(
f"Event {escape_tag(event.get_event_name())} is ignored"
)
return
except Exception as e: # 异常捕获
logger.opt(colors=True, exception=e).error(
"Error when running EventPreProcessors. "
"Event ignored! "
)
return
代码都很简单,下面我们来看Dependent是如何调用的
补充:Dependent的调用
@dataclass(frozen=True)
class Dependent(Generic[R]):
async def __call__(self, **kwargs: Any) -> R:
```
params可能包括bot、event、state、stack、matcher、dependency_cache,具体看是在哪调用的
```
await self.check(**kwargs) # 检查
values = await self.solve(**kwargs) # 从上下文获取变量
if is_coroutine_callable(self.call): # 调用call
return await cast(Callable[..., Awaitable[R]], self.call)(**values)
else:
return await run_sync(cast(Callable[..., R], self.call))(**values)
Dependent.check
@dataclass(frozen=True)
class Dependent(Generic[R]):
async def check(self, **params: Any) -> None:
try:
await asyncio.gather(
*(param._check(**params) for param in self.parameterless)
) # 先检查parameterless,由于parameterless的元素就是Param子类,可以直调用Param._check
await asyncio.gather(
*(
cast(Param, param.field_info)._check(**params) # 等价于param.field_info._check(**params)
for param in self.params
)
) # 再检查param,由于self.params元素是ModelField,需要先获取field_info,类型为Param子类
except SkippedException as e: 如果检查过程中出现SkippedException就抛出
logger.trace(f"{self} skipped due to {e}")
raise
因此Dependent.check本质上是调用Param._check,之前我们已经讲过Param的实例化,现在讲Param的调用
StateParam、MatcherParam、ArgParam、ExceptionParam、DefaultParam没有_check,会调用Param基类,因此无事发生
class DependParam(Param): async def _check(self, **kwargs: Any) -> None: sub_dependent: Dependent = self.extra["dependent"] await sub_dependent.check(**kwargs) ``` 还记得sub_dependent是什么吗? 例如 async def user_id(event:PrviateMessageEvent): return event.sender.user_id @xxx.handle() async def _(user_id=Depends(user_id)): pass Depends会调用DependsInner类,将user_id作为DependsInner的对象属性self.dependency 之后DependParam._check_param时,会将self.dependency转化为Dependent作为变量dependent存到DependParam 由于DependParam是Param子类,Param是FieldInfo子类,因此可以通过self.extra["dependent"]获取DependParam实例化时的变量dependent 所以在这里sub_dependent就是函数user_id转化的Dependent 于是DependParam._check时,就是调用sub_dependent._check,这样就能够实现嵌套检查 ```
class BotParam(Param):
async def _check(self, bot: "Bot", **kwargs: Any) -> None:
if checker := self.extra.get("checker"):
check_field_type(checker, bot)
```
self.extra.get("checker")就是_check_param时实例化BotParam时传入的checker,是ModelField类型
第一节已经介绍过BotParam的checker了,这里不再详细介绍
对于基类Bot,checker为None,对于协议Bot,只有相应Bot能通过check
比如如果同时接入了qq协议端和微信协议端,那么类型注释是qq Bot的handler无法被微信协议的消息触发,因为BotParam._check无法通过
```
class EventParam(Param):
async def _check(self, bot: "EventParam", **kwargs: Any) -> None:
if checker := self.extra.get("checker"):
check_field_type(checker, bot)
```
和BotParam如出一辙
```
Dependent.solve
def check_field_type(field: ModelField, value: V) -> V: # 验证value是不是ModelField,是的话返回value
_, errs_ = field.validate(value, {}, loc=())
if errs_:
raise TypeMisMatch(field, value)
return value
@dataclass(frozen=True)
class Dependent(Generic[R]):
async def _solve_field(self, field: ModelField, params: Dict[str, Any]) -> Any:
value = await cast(Param, field.field_info)._solve(**params)
if value is Undefined:
```
如果Param._solve的结果是Undefined,就获取field的默认值
还记得之前说过只有DefautParam的ModelField有默认值吗这个判断就是为DefautParam设计的
```
value = field.get_default()
return check_field_type(field, value) # 检查类型
async def solve(self, **params: Any) -> Dict[str, Any]:
for param in self.parameterless: # 由于parameterless的元素是Param子类,可以直接调用_solve
await param._solve(**params)
values = await asyncio.gather(
*(self._solve_field(field, params) for field in self.params) # params的元素是ModelField,需要先取出field_info再调用_solve
)
return {field.name: value for field, value in zip(self.params, values)}
```
可以看出,对于parameterless参数,运行时只是调用它的_solve
也就是说parameterless参数的返回值不重要,而在于进行一些操作,例如修改上下文
对于params,将_solve的结果与参数名构成字典
```
下面来看各种Param._solve
class DependParam(Param):
async def _solve(
self,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
**kwargs: Any,
) -> Any:
use_cache: bool = self.extra["use_cache"] # 是否使用dependency_cache缓存
dependency_cache = {} if dependency_cache is None else dependency_cache
sub_dependent: Dependent = self.extra["dependent"] # 例如前文例子中的Dependent user_id函数
call = cast(Callable[..., Any], sub_dependent.call) # 相当于call = sub_dependent.call,例如前文的user_id函数
# 先使用已有的dependency_cache缓存运行sub_dependent,有缓存总比没有好
sub_values = await sub_dependent.solve(
stack=stack,
dependency_cache=dependency_cache,
**kwargs,
)
```
sub_dependent是Dependent,嵌套嵌套依赖注入就是这么实现的
sub_values是通过依赖注入获取的参数,如Dependent use_id的sub_values就是{"event":xxx},因为use_id的参数是event
```
```
已经有了sub_dependent需要的参数sub_values,接下来调用sub_dependent
```
task: asyncio.Task[Any]
if use_cache and call in dependency_cache:
```
如果使用缓存并且已有call的可调用函数,那就调用它
如果没有对应call的则需要将其转化为可调用函数并存入dependency_cache
```
return await dependency_cache[call]
elif is_gen_callable(call) or is_async_gen_callable(call): # 如果是生成器或异步生成器
assert isinstance(
stack, AsyncExitStack
), "Generator dependency should be called in context"
if is_gen_callable(call): # 如果是生成器
cm = run_sync_ctx_manager(contextmanager(call)(**sub_values))
else: # 如果是异步生成器
cm = asynccontextmanager(call)(**sub_values)
task = asyncio.create_task(stack.enter_async_context(cm))
dependency_cache[call] = task
return await task
elif is_coroutine_callable(call): # 如果是协程函数,例如上例子的user_id函数
task = asyncio.create_task(call(**sub_values)) # 例如上例的[Task user_id(**{event:xxx})]
dependency_cache[call] = task
return await task
else: # 如果是普通函数
task = asyncio.create_task(run_sync(call)(**sub_values))
dependency_cache[call] = task
return await task
```
把call统一转化为Task并存入dependency_cache,方便多次调用
await task获取返回值,例如上例就是从event中获取的user_id
```
class BotParam(Param): async def _solve(self, event: "Event", **kwargs: Any) -> Any: return bot # 直接返回bot,没什么好说的
class EventParam(Param): async def _solve(self, bot: "Bot", **kwargs: Any) -> Any: return event # 直接返回event,没什么好说的
class StateParam(Param): async def _solve(self, state: T_State, **kwargs: Any) -> Any: return state # 直接返回state,没什么好说的
class MatcherParam(Param): async def _solve(self, matcher: "Matcher", **kwargs: Any) -> Any: return matcher # 直接返回matcher,没什么好说的
class ArgParam(Param): async def _solve(self, matcher: "Matcher", **kwargs: Any) -> Any: message = matcher.get_arg(self.extra["key"]) # 从state中根据key获取响应数据,就是前文所提到的state if message is None: # 默认为Arg() return message if self.extra["type"] == "message": # 对应Arg() return message elif self.extra["type"] == "str": # 对应ArgStr() return str(message) else: # 对应ArgPlainText() return message.extract_plain_text()
class ExceptionParam(Param): async def _solve(self, exception: Optional[Exception] = None, **kwargs: Any) -> Any: return exception # 直接返回传入的exception
class DefaultParam(Param): async def _solve(self, **kwargs: Any) -> Any: return Undefined # 特殊用法,见前文
这样就通过Bot和Event构造出了各种Param的参数,由于ModelField还存有参数名,因此可以构造字典用于call的调用
Dependent.__call__
@dataclass(frozen=True)
class Dependent(Generic[R]):
async def __call__(self, **kwargs: Any) -> R:
await self.check(**kwargs)
values = await self.solve(**kwargs) # 获取了handler所需的参数
if is_coroutine_callable(self.call):
return await cast(Callable[..., Awaitable[R]], self.call)(**values) # 如果handler是异步函数
else:
return await run_sync(cast(Callable[..., R], self.call))(**values) # 如果handler是同步函数
Trie Match
try:
TrieRule.get_value(bot, event, state)
except Exception as e:
logger.opt(colors=True, exception=e).warning(
"Error while parsing command for event"
)
TrieRule位于nonebot.rule
NoneBot支持以类似命令行的方式调用插件,如/xxx -a -b 10
利用TrieRule.get_value将这种命令拆分解析出来,放进state里,这样插件就可以直接使用
CMD_RESULT = TypedDict(
"CMD_RESULT",
{
"command": Optional[Tuple[str, ...]],
"raw_command": Optional[str],
"command_arg": Optional[Message[MessageSegment]],
"command_start": Optional[str],
},
) # 一个有代码提示的字典
TRIE_VALUE = NamedTuple(
"TRIE_VALUE", [("command_start", str), ("command", Tuple[str, ...])]
) # 一个有代码提示的元组
class TrieRule:
prefix: CharTrie = CharTrie()
@classmethod
def add_prefix(cls, prefix: str, value: TRIE_VALUE) -> None:
if prefix in cls.prefix:
logger.warning(f'Duplicated prefix rule "{prefix}"')
return
cls.prefix[prefix] = value
@classmethod
def get_value(cls, bot: Bot, event: Event, state: T_State) -> CMD_RESULT:
prefix = CMD_RESULT(
command=None, raw_command=None, command_arg=None, command_start=None
) # 相当于prefix = {command:None, raw_command:None, command_arg:None, command_start:None}
state[PREFIX_KEY] = prefix # 存入state
if event.get_type() != "message": # 如果事件类型不是message则直接结束
return prefix
message = event.get_message()
message_seg: MessageSegment = message[0] # 获取message第一个字段
if message_seg.is_text(): # 如果 第一个字段是文本
segment_text = str(message_seg).lstrip() # 删除第一个字段左边的空格
if pf := cls.prefix.longest_prefix(segment_text): # 找出cls.prefix中与segment_text的最长公共序列
value: TRIE_VALUE = pf.value
prefix[RAW_CMD_KEY] = pf.key # /xxx -a -b 10
prefix[CMD_START_KEY] = value.command_start# /xxx
prefix[CMD_KEY] = value.command # -a -b 10
msg = message.copy()
msg.pop(0)
new_message = msg.__class__(segment_text[len(pf.key) :].lstrip())
for new_segment in reversed(new_message):
msg.insert(0, new_segment)
prefix[CMD_ARG_KEY] = msg # msg是去除了命令前缀的消息
return prefix
Run Matcher
break_flag = False
for priority in sorted(matchers.keys()): # 根据优先级运行,同一优先级的同时运行
if break_flag:
break
if show_log:
logger.debug(f"Checking for matchers in priority {priority}...")
pending_tasks = [
_check_matcher(
priority, matcher, bot, event, state.copy(), stack, dependency_cache # 复制state防止污染
)
for matcher in matchers[priority]
]
results = await asyncio.gather(*pending_tasks, return_exceptions=True) # 如果报错将报错作为结果返回
for result in results:
if not isinstance(result, Exception): # 只关注报错
continue
if isinstance(result, StopPropagation): # 出现StopPropagation后,下一优先级的Matcher将全都跳过,不影响event_postprocessors
break_flag = True
logger.debug("Stop event propagation")
else: # 异常捕获
logger.opt(colors=True, exception=result).error(
"Error when checking Matcher. "
)
_check_matcher
async def _check_matcher(
priority: int,
Matcher: Type[Matcher],
bot: "Bot",
event: "Event",
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> None:
if Matcher.expire_time and datetime.now() > Matcher.expire_time: # 判断Matcher是否超过expire_time
with contextlib.suppress(Exception):
matchers[priority].remove(Matcher) # 移除过期的Matcher并返回
return
try:
if not await Matcher.check_perm( # 见后文
bot, event, stack, dependency_cache
) or not await Matcher.check_rule(bot, event, state, stack, dependency_cache):
return
except Exception as e:
logger.opt(colors=True, exception=e).error(
f"Rule check failed for {Matcher}. "
)
return
if Matcher.temp: # 如果Matcher是temp的
with contextlib.suppress(Exception):
matchers[priority].remove(Matcher) # 运行一次就删除
await _run_matcher(Matcher, bot, event, state, stack, dependency_cache)
先校验permission和rule
Matcher.check_perm
class Matcher(metaclass=MatcherMeta):
@classmethod
async def check_perm(
cls,
bot: Bot,
event: Event,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
event_type = event.get_type()
return event_type == (cls.type or event_type) and await cls.permission(
bot, event, stack, dependency_cache
)
首先检查事件类型是否正确(如果Matcher.type为None则任何事件类型都可以通过检查),再检查permission
class Permission:
async def __call__(
self,
bot: Bot,
event: Event,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
if not self.checkers: # 空Permission直接通过检查
return True
results = await asyncio.gather(
*(
run_coro_with_catch(
checker(
bot=bot,
event=event,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
False,
)
for checker in self.checkers # checker是Dependent[bool],返回值是bool值
),
)
return any(results) # 如果有checker返回值是False就不通过检查
Matcher.check_rule
class Matcher(metaclass=MatcherMeta):
@classmethod
async def check_rule(
cls,
bot: Bot,
event: Event,
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
event_type = event.get_type()
return event_type == (cls.type or event_type) and await cls.rule(
bot, event, state, stack, dependency_cache
)
检查Rule时也是检查事件类型是否正确(如果Matcher.type为None则任何事件类型都可以通过检查),再检查Rule
class Rule:
async def __call__(
self,
bot: Bot,
event: Event,
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> bool:
if not self.checkers: # 空Rule直接通过
return True
try:
results = await asyncio.gather(
*(
checker(
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache,
)
for checker in self.checkers # checker是Dependent[bool],返回值是bool值
)
)
except SkippedException: # 在Rule的checker中可以使用SkippedException使检查直接通过
return False
return all(results) # 如果有checker返回值是False就不通过检查
_run_matcher
permission和rule通过后调用_run_matcher
async def _run_matcher(
Matcher: Type[Matcher],
bot: "Bot",
event: "Event",
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
) -> None:
logger.info(f"Event will be handled by {Matcher}")
matcher = Matcher()
```
Matcher实例化,将handlers和state复制一份
他们与Matcher类的handlers和_default_state独立
self.handlers = self.handlers.copy()
self.state = self._default_state.copy()
```
if coros := [
run_coro_with_catch(
proc(
matcher=matcher,
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
)
for proc in _run_preprocessors
]:
```
run_preprocess,和event_preprocess几乎一样
用run_preprocessor装饰函数来注册
支持的Param、DependParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam
```
try:
await asyncio.gather(*coros)
except IgnoredException:
logger.opt(colors=True).info(f"{matcher} running is cancelled")
return
except Exception as e:
logger.opt(colors=True, exception=e).error(
"Error when running RunPreProcessors. Running cancelled! "
)
return
exception = None
try:
logger.debug(f"Running {matcher}")
await matcher.run(bot, event, state, stack, dependency_cache) # 后文介绍
except Exception as e:
logger.opt(colors=True, exception=e).error(
f"Running {matcher} failed. "
)
exception = e
if coros := [
run_coro_with_catch(
proc(
matcher=matcher,
exception=exception,
bot=bot,
event=event,
state=matcher.state,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
)
for proc in _run_postprocessors
]:
```
run_postprocess,和event_preprocess几乎一样
用run_postprocess装饰函数来注册
但是,它可以传入exception,也就是Matcher运行时抛出的错误,因此可以使用ExceptionParam
支持的Param、DependParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam、ExceptionParam
```
try:
await asyncio.gather(*coros)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"Error when running RunPostProcessors "
)
if matcher.block: # 如果matcher是block,就抛出StopPropagation异常,可以被捕获并阻止低优先度的Matcher运行
raise StopPropagation
return
Matcher.simple_run
Matcher.run会先调用Matcher.simple_run
class Matcher(metaclass=MatcherMeta):
async def simple_run(
self,
bot: Bot,
event: Event,
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
):
logger.trace(
f"{self} run with incoming args: "
f"bot={bot}, event={event!r}, state={state!r}"
)
b_t = current_bot.set(bot) # 将bot设为当前bot
e_t = current_event.set(event) # 将event设为当前event
m_t = current_matcher.set(self) # 将matcher设为当前matcher
try:
# Refresh preprocess state
self.state.update(state) # 合并全局state到self.state
while self.handlers: # 从第一个handler开始依次运行
handler = self.handlers.pop(0)
current_handler.set(handler) # 将handler设为当前handler
logger.debug(f"Running handler {handler}")
try:
await handler(
matcher=self,
bot=bot,
event=event,
state=self.state,
stack=stack,
dependency_cache=dependency_cache,
) # handler是Dependent,可以这样运行
except SkippedException: # 遇到SkippedException继续正常遍历handlers,对于其它异常会抛出simple_run
logger.debug(f"Handler {handler} skipped")
except StopPropagation: # 如果遇到StopPropagation,修改Matcher为block
self.block = True
finally:
```
结束后清空current_bot、current_event、current_matcher
注意保留了current_handler,因为后面会用到
```
logger.info(f"{self} running complete")
current_bot.reset(b_t)
current_event.reset(e_t)
current_matcher.reset(m_t)
Matcher.run
class User:
def __init__(
self, users: Tuple[str, ...], perm: Optional[Permission] = None
) -> None:
self.users = users # 白名单用户
self.perm = perm # 用户权限
async def __call__(self, bot: Bot, event: Event) -> bool:
try:
session = event.get_session_id()
except Exception:
return False
return bool(
session in self.users and (self.perm is None or await self.perm(bot, event))
) # 需要当前用户在白名单且通过其它权限校验
def USER(*users: str, perm: Optional[Permission] = None):
return Permission(User(users, perm))
class Matcher(metaclass=MatcherMeta):
@classmethod
def type_updater(cls, func: T_TypeUpdater) -> T_TypeUpdater:
cls._default_type_updater = Dependent[str].parse(
call=func, allow_types=cls.HANDLER_PARAM_TYPES
)
return func
# 注册_default_type_updater,会被转化为Dpendent,因此可以依赖注入
async def update_type(self, bot: Bot, event: Event) -> str:
updater = self.__class__._default_type_updater
return (
await updater(bot=bot, event=event, state=self.state, matcher=self)
if updater
else "message"
)
# 运行通过type_updater装饰器注册的updater,返回值为updater返回值或message,就是event的新type
@classmethod
def permission_updater(cls, func: T_PermissionUpdater) -> T_PermissionUpdater:
cls._default_permission_updater = Dependent[Permission].parse(
call=func, allow_types=cls.HANDLER_PARAM_TYPES
)
return func
# 注册permission_updater,会被转化为Dpendent,因此可以依赖注入
async def update_permission(self, bot: Bot, event: Event) -> Permission:
if updater := self.__class__._default_permission_updater:
```
如果有通过permission_updater装饰器注册的updater,就运行它,返回值是新的Permission
```
return await updater(bot=bot, event=event, state=self.state, matcher=self)
# 如果没有通过permission_updater装饰器注册的updater
permission = self.permission # 获取原先的permission
if len(permission.checkers) == 1 and isinstance(
user_perm := tuple(permission.checkers)[0].call, User
): # ,则
permission = user_perm.perm
return USER(event.get_session_id(), perm=permission)
```
将当前用户设为会话对象
如果如果有permission且permission的第一个checker不是User类型的Permisson,则保留原先permission
- 也就是说将任何人都能触发的Matcher变成只有一个人能触发的Matcher,并保留原先permission
否则只保留原先permission中关于User的permission
- 也就是说User permission对所有用户适用,其它permission只对单个用户有效,更新User后废除
```
async def run(
self,
bot: Bot,
event: Event,
state: T_State,
stack: Optional[AsyncExitStack] = None,
dependency_cache: Optional[T_DependencyCache] = None,
):
try:
await self.simple_run(bot, event, state, stack, dependency_cache) # 见上文
except RejectedException: # 如果handler抛出RejectedException,该情况将在下节具体介绍
await self.resolve_reject()
type_ = await self.update_type(bot, event) # 更新事件类型
permission = await self.update_permission(bot, event) # 更新permission
Matcher.new(
type_,
Rule(),
permission,
self.handlers,
temp=True, # 只允行一次
priority=0, # 优先级为0,所以说不建议把Matcher的priority设为负数,可能出现异常
block=True, # 是block,因此总是在Matcher第一个运行、运行后会阻止之后的Matcher,并且删除自己
plugin=self.plugin,
module=self.module,
expire_time=bot.config.session_expire_timeout,
default_state=self.state,
default_type_updater=self.__class__._default_type_updater,
default_permission_updater=self.__class__._default_permission_updater,
)
except PausedException: # 如果handler抛出PausedException
type_ = await self.update_type(bot, event) # 更新事件类型
permission = await self.update_permission(bot, event) # 更新permission
Matcher.new(
type_,
Rule(),
permission,
self.handlers,
temp=True, # 只允行一次
priority=0, # 优先级为0
block=True, # 是block,因此总是在Matcher第一个运行、运行后会阻止之后的Matcher,并且删除自己
plugin=self.plugin,
module=self.module,
expire_time=bot.config.session_expire_timeout,
default_state=self.state,
default_type_updater=self.__class__._default_type_updater,
default_permission_updater=self.__class__._default_permission_updater,
) # 创建新Matcher
except FinishedException: # 如果handler抛出FinishedException
pass # 无后续操作
# 不管什么异常都会导致handler和Matcher的运行结束
后两种异常很好理解,但对于RejectedException可能难理解,因为需要结合下节的内容才能知道为什么这么设计
event_postprocessors
没什么好说的,不过支持所有Param(DependParam、ExceptionParam、BotParam、EventParam、StateParam、ArgParam、MatcherParam、DefaultParam)
if coros := [
run_coro_with_catch(
proc(
bot=bot,
event=event,
state=state,
stack=stack,
dependency_cache=dependency_cache,
),
(SkippedException,),
)
for proc in _event_postprocessors
]:
try:
if show_log:
logger.debug("Running PostProcessors...")
await asyncio.gather(*coros)
except Exception as e:
logger.opt(colors=True, exception=e).error(
"Error when running EventPostProcessors "
)