mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-09-07 04:26:45 +00:00
plugin load
This commit is contained in:
@ -1,76 +1,70 @@
|
||||
import re
|
||||
import copy
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from functools import wraps
|
||||
from typing import Type, Union, Optional, Callable
|
||||
from collections import defaultdict
|
||||
from typing import Type, List, Dict, Optional, Callable
|
||||
|
||||
from .event import Event
|
||||
from .typing import Scope, Handler
|
||||
from .rule import Rule, startswith, regex, user
|
||||
from .typing import Handler
|
||||
from .rule import Rule, user
|
||||
from .exception import PausedException, RejectedException, FinishedException
|
||||
|
||||
matchers: Dict[int, List[Type["Matcher"]]] = defaultdict(list)
|
||||
|
||||
|
||||
class Matcher:
|
||||
|
||||
rule: Rule = Rule()
|
||||
scope: Scope = "ALL"
|
||||
permission: str = "ALL"
|
||||
block: bool = True
|
||||
handlers: list = []
|
||||
handlers: List[Handler] = []
|
||||
temp: bool = False
|
||||
priority: int = 1
|
||||
|
||||
_default_state: dict = {}
|
||||
_default_parser: Optional[Callable[[Event, dict], None]] = None
|
||||
_args_parser: Optional[Callable[[Event, dict], None]] = None
|
||||
|
||||
# _default_parser: Optional[Callable[[Event, dict], None]] = None
|
||||
# _args_parser: Optional[Callable[[Event, dict], None]] = None
|
||||
|
||||
def __init__(self):
|
||||
self.handlers = self.handlers.copy()
|
||||
self.state = self._default_state.copy()
|
||||
self.parser = self._args_parser or self._default_parser
|
||||
# self.parser = self._args_parser or self._default_parser
|
||||
|
||||
@classmethod
|
||||
def new(
|
||||
cls,
|
||||
rule: Rule = Rule(),
|
||||
scope: Scope = "ALL",
|
||||
permission: str = "ALL",
|
||||
block: bool = True,
|
||||
handlers: list = [],
|
||||
temp: bool = False,
|
||||
*,
|
||||
default_state: dict = {},
|
||||
default_parser: Optional[Callable[[Event, dict], None]] = None,
|
||||
args_parser: Optional[Callable[[Event, dict], None]] = None
|
||||
) -> Type["Matcher"]:
|
||||
|
||||
# class NewMatcher(cls):
|
||||
# rule: Rule = rule
|
||||
# scope: Scope = scope
|
||||
# permission: str = permission
|
||||
# block: bool = block
|
||||
# handlers: list = handlers
|
||||
# temp: bool = temp
|
||||
|
||||
# _default_state = default_state
|
||||
def new(cls,
|
||||
rule: Rule = Rule(),
|
||||
handlers: list = [],
|
||||
temp: bool = False,
|
||||
priority: int = 1,
|
||||
*,
|
||||
default_state: dict = {}) -> Type["Matcher"]:
|
||||
|
||||
NewMatcher = type(
|
||||
"Matcher", (Matcher,), {
|
||||
"rule": rule,
|
||||
"scope": scope,
|
||||
"permission": permission,
|
||||
"block": block,
|
||||
"handlers": handlers,
|
||||
"temp": temp,
|
||||
"_default_state": default_state,
|
||||
"_default_parser": default_parser,
|
||||
"_args_parser": args_parser,
|
||||
"priority": priority,
|
||||
"_default_state": default_state
|
||||
})
|
||||
|
||||
matchers[priority].append(NewMatcher)
|
||||
|
||||
return NewMatcher
|
||||
|
||||
# @classmethod
|
||||
# def args_parser(cls, func: Callable[[Event, dict], None]):
|
||||
# cls._default_parser = func
|
||||
# return func
|
||||
|
||||
@classmethod
|
||||
def args_parser(cls, func: Callable[[Event, dict], None]):
|
||||
cls._default_parser = func
|
||||
return func
|
||||
def handle(cls):
|
||||
|
||||
def _decorator(func: Handler) -> Handler:
|
||||
cls.handlers.append(func)
|
||||
return func
|
||||
|
||||
return _decorator
|
||||
|
||||
@classmethod
|
||||
def receive(cls):
|
||||
@ -78,121 +72,78 @@ class Matcher:
|
||||
def _decorator(func: Handler) -> Handler:
|
||||
|
||||
@wraps(func)
|
||||
def _handler(event: Event, state: dict):
|
||||
async def _handler(bot, event: Event, state: dict):
|
||||
raise PausedException
|
||||
|
||||
cls.handlers.append(_handler)
|
||||
cls.handlers.append(func)
|
||||
|
||||
return func
|
||||
|
||||
return _decorator
|
||||
|
||||
@classmethod
|
||||
def got(cls,
|
||||
key: str,
|
||||
prompt: Optional[str] = None,
|
||||
args_parser: Optional[Callable[[Event, dict], None]] = None):
|
||||
# @classmethod
|
||||
# def got(cls,
|
||||
# key: str,
|
||||
# prompt: Optional[str] = None,
|
||||
# args_parser: Optional[Callable[[Event, dict], None]] = None):
|
||||
|
||||
def _decorator(func: Handler) -> Handler:
|
||||
# def _decorator(func: Handler) -> Handler:
|
||||
|
||||
@wraps(func)
|
||||
def _handler(event: Event, state: dict):
|
||||
if key not in state:
|
||||
if state.get("__current_arg__", None) == key:
|
||||
state[key] = event.message
|
||||
del state["__current_arg__"]
|
||||
return func(event, state)
|
||||
state["__current_arg__"] = key
|
||||
cls._args_parser = args_parser
|
||||
raise RejectedException
|
||||
# @wraps(func)
|
||||
# def _handler(event: Event, state: dict):
|
||||
# if key not in state:
|
||||
# if state.get("__current_arg__", None) == key:
|
||||
# state[key] = event.message
|
||||
# del state["__current_arg__"]
|
||||
# return func(event, state)
|
||||
# state["__current_arg__"] = key
|
||||
# cls._args_parser = args_parser
|
||||
# raise RejectedException
|
||||
|
||||
return func(event, state)
|
||||
# return func(event, state)
|
||||
|
||||
cls.handlers.append(_handler)
|
||||
# cls.handlers.append(_handler)
|
||||
|
||||
return func
|
||||
# return func
|
||||
|
||||
return _decorator
|
||||
# return _decorator
|
||||
|
||||
@classmethod
|
||||
def finish(cls, prompt: Optional[str] = None):
|
||||
raise FinishedException
|
||||
# @classmethod
|
||||
# def finish(cls, prompt: Optional[str] = None):
|
||||
# raise FinishedException
|
||||
|
||||
@classmethod
|
||||
def reject(cls, prompt: Optional[str] = None):
|
||||
raise RejectedException
|
||||
# @classmethod
|
||||
# def reject(cls, prompt: Optional[str] = None):
|
||||
# raise RejectedException
|
||||
|
||||
async def run(self, event):
|
||||
async def run(self, bot, event):
|
||||
if not self.rule(event):
|
||||
return
|
||||
|
||||
try:
|
||||
if self.parser:
|
||||
await self.parser(event, state) # type: ignore
|
||||
# if self.parser:
|
||||
# await self.parser(event, state) # type: ignore
|
||||
|
||||
for _ in range(len(self.handlers)):
|
||||
handler = self.handlers.pop(0)
|
||||
await handler(event, self.state)
|
||||
await handler(bot, event, self.state)
|
||||
except RejectedException:
|
||||
# TODO: add tmp matcher to matcher tree
|
||||
self.handlers.insert(handler, 0)
|
||||
matcher = Matcher.new(self.rule,
|
||||
self.scope,
|
||||
self.permission,
|
||||
self.block,
|
||||
self.handlers.insert(0, handler) # type: ignore
|
||||
matcher = Matcher.new(user(event.user_id) & self.rule,
|
||||
self.handlers,
|
||||
temp=True,
|
||||
default_state=self.state,
|
||||
default_parser=self._default_parser,
|
||||
args_parser=self._args_parser)
|
||||
priority=0,
|
||||
default_state=self.state)
|
||||
matchers[0].append(matcher)
|
||||
return
|
||||
except PausedException:
|
||||
# TODO: add tmp matcher to matcher tree
|
||||
matcher = Matcher.new(self.rule,
|
||||
self.scope,
|
||||
self.permission,
|
||||
self.block,
|
||||
matcher = Matcher.new(user(event.user_id) & self.rule,
|
||||
self.handlers,
|
||||
temp=True,
|
||||
default_state=self.state,
|
||||
default_parser=self._default_parser,
|
||||
args_parser=self._args_parser)
|
||||
priority=0,
|
||||
default_state=self.state)
|
||||
matchers[0].append(matcher)
|
||||
return
|
||||
except FinishedException:
|
||||
return
|
||||
|
||||
|
||||
def on_message(rule: Rule,
|
||||
scope="ALL",
|
||||
permission="ALL",
|
||||
block=True,
|
||||
*,
|
||||
handlers=[],
|
||||
temp=False,
|
||||
state={}) -> Type[Matcher]:
|
||||
# TODO: add matcher to matcher tree
|
||||
return Matcher.new(rule,
|
||||
scope,
|
||||
permission,
|
||||
block,
|
||||
handlers=handlers,
|
||||
temp=temp,
|
||||
default_state=state)
|
||||
|
||||
|
||||
# def on_startswith(msg,
|
||||
# start: int = None,
|
||||
# end: int = None,
|
||||
# rule: Optional[Rule] = None,
|
||||
# **kwargs) -> Type[Matcher]:
|
||||
# return on_message(startswith(msg, start, end) &
|
||||
# rule, **kwargs) if rule else on_message(
|
||||
# startswith(msg, start, end), **kwargs)
|
||||
|
||||
# def on_regex(pattern,
|
||||
# flags: Union[int, re.RegexFlag] = 0,
|
||||
# rule: Optional[Rule] = None,
|
||||
# **kwargs) -> Type[Matcher]:
|
||||
# return on_message(regex(pattern, flags) &
|
||||
# rule, **kwargs) if rule else on_message(
|
||||
# regex(pattern, flags), **kwargs)
|
||||
|
Reference in New Issue
Block a user