mirror of
https://github.com/nonebot/nonebot2.git
synced 2025-07-28 00:31:14 +00:00
✨ finish typing change
This commit is contained in:
@ -1,24 +1,24 @@
|
||||
import asyncio
|
||||
|
||||
from nonebot import on_message
|
||||
from nonebot.typing import State
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.permission import USER
|
||||
from nonebot.adapters import Bot, Event
|
||||
|
||||
a = on_message(priority=0, permission=USER(123123123), temp=True)
|
||||
a = on_message(priority=0, permission=USER("123123123"), temp=True)
|
||||
|
||||
|
||||
@a.handle()
|
||||
async def test_a(bot: Bot, event: Event, state: State):
|
||||
async def test_a(bot: Bot, event: Event, state: T_State):
|
||||
print("======== A Received ========")
|
||||
print("======== A Running Completed ========")
|
||||
|
||||
|
||||
b = on_message(priority=0, permission=USER(123456789), temp=True)
|
||||
b = on_message(priority=0, permission=USER("123456789"), temp=True)
|
||||
|
||||
|
||||
@b.handle()
|
||||
async def test_b(bot: Bot, event: Event, state: State):
|
||||
async def test_b(bot: Bot, event: Event, state: T_State):
|
||||
print("======== B Received ========")
|
||||
await asyncio.sleep(10)
|
||||
print("======== B Running Completed ========")
|
||||
@ -28,5 +28,5 @@ c = on_message(priority=0, permission=USER(1111111111))
|
||||
|
||||
|
||||
@c.handle()
|
||||
async def test_c(bot: Bot, event: Event, state: State):
|
||||
async def test_c(bot: Bot, event: Event, state: T_State):
|
||||
print("======== C Received ========")
|
||||
|
@ -1,12 +1,10 @@
|
||||
from nonebot.typing import State
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.permission import GROUP_OWNER
|
||||
|
||||
from . import cmd
|
||||
|
||||
test_1 = cmd.command("1", aliases={"test"}, permission=GROUP_OWNER)
|
||||
test_1 = cmd.command("1", aliases={"test"})
|
||||
|
||||
|
||||
@test_1.handle()
|
||||
async def test1(bot: Bot, event: Event, state: State):
|
||||
await test_1.finish(event.raw_message)
|
||||
async def test1(bot: Bot, event: Event):
|
||||
await test_1.finish(event.get_message())
|
||||
|
@ -1,16 +1,17 @@
|
||||
from nonebot.typing import State
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.adapters.cqhttp import HeartbeatMetaEvent
|
||||
|
||||
from . import match
|
||||
|
||||
|
||||
async def heartbeat(bot: Bot, event: Event, state: State) -> bool:
|
||||
return event.detail_type == "heartbeat"
|
||||
async def heartbeat(bot: Bot, event: Event, state: T_State) -> bool:
|
||||
return isinstance(event, HeartbeatMetaEvent)
|
||||
|
||||
|
||||
test = match.on_metaevent(rule=heartbeat)
|
||||
|
||||
|
||||
@test.receive()
|
||||
async def handle_heartbeat(bot: Bot, event: Event, state: State):
|
||||
async def handle_heartbeat(bot: Bot):
|
||||
print("[i] Heartbeat")
|
||||
|
@ -1,15 +1,16 @@
|
||||
from nonebot.typing import State
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.plugin import on_metaevent
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.adapters.cqhttp import HeartbeatMetaEvent
|
||||
|
||||
|
||||
async def heartbeat(bot: Bot, event: Event, state: State) -> bool:
|
||||
return event.detail_type == "heartbeat"
|
||||
async def heartbeat(bot: Bot, event: Event, state: T_State) -> bool:
|
||||
return isinstance(event, HeartbeatMetaEvent)
|
||||
|
||||
|
||||
test_matcher = on_metaevent(heartbeat)
|
||||
|
||||
|
||||
@test_matcher.receive()
|
||||
async def handle_heartbeat(bot: Bot, event: Event, state: dict):
|
||||
async def handle_heartbeat(bot: Bot, event: Event, state: T_State):
|
||||
print("[i] Heartbeat")
|
||||
|
@ -1,14 +1,14 @@
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.typing import State
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.plugin import on_command
|
||||
from nonebot.adapters.cqhttp import Bot, Event
|
||||
from nonebot.adapters import Bot, Event
|
||||
|
||||
test_command = on_command("帮助", to_me())
|
||||
|
||||
|
||||
@test_command.handle()
|
||||
async def test_handler(bot: Bot, event: Event, state: State):
|
||||
args = str(event.message).strip()
|
||||
async def test_handler(bot: Bot, event: Event, state: T_State):
|
||||
args = str(event.get_message()).strip()
|
||||
print("[!] Command:", state["_prefix"], "Args:", args)
|
||||
if args:
|
||||
state["help"] = args
|
||||
@ -17,7 +17,7 @@ async def test_handler(bot: Bot, event: Event, state: State):
|
||||
|
||||
|
||||
@test_command.got("help", prompt="你要帮助的命令是?")
|
||||
async def test_handler(bot: Bot, event: Event, state: State):
|
||||
async def test_handler(bot: Bot, event: Event, state: T_State):
|
||||
print("[!] Command 帮助:", state["help"])
|
||||
if state["help"] not in ["test1", "test2"]:
|
||||
await test_command.reject(f"{state['help']} 不支持,请重新输入!")
|
||||
|
@ -1,18 +1,18 @@
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.typing import State
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.plugin import on_startswith
|
||||
from nonebot.permission import GROUP_ADMIN
|
||||
from nonebot.adapters.ding import Bot as DingBot, Event as DingEvent
|
||||
from nonebot.adapters.cqhttp import Bot as CQHTTPBot, Event as CQHTTPEvent
|
||||
from nonebot.permission import SUPERUSER
|
||||
from nonebot.adapters.ding import Bot as DingBot
|
||||
from nonebot.adapters.cqhttp import Bot as CQHTTPBot
|
||||
|
||||
test_command = on_startswith("hello", to_me(), permission=GROUP_ADMIN)
|
||||
test_command = on_startswith("hello", to_me(), permission=SUPERUSER)
|
||||
|
||||
|
||||
@test_command.handle()
|
||||
async def test_handler(bot: CQHTTPBot, event: CQHTTPEvent, state: State):
|
||||
async def test_handler(bot: CQHTTPBot):
|
||||
await test_command.finish("cqhttp hello")
|
||||
|
||||
|
||||
@test_command.handle()
|
||||
async def test_handler(bot: DingBot, event: DingEvent, state: State):
|
||||
async def test_handler(bot: DingBot):
|
||||
await test_command.finish("ding hello")
|
||||
|
@ -1,15 +1,15 @@
|
||||
from nonebot.typing import State
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.message import event_preprocessor, run_preprocessor
|
||||
|
||||
|
||||
@event_preprocessor
|
||||
async def handle(bot: Bot, event: Event, state: State):
|
||||
async def handle(bot: Bot, event: Event, state: T_State):
|
||||
state["preprocessed"] = True
|
||||
print(type(event), event)
|
||||
|
||||
|
||||
@run_preprocessor
|
||||
async def run(matcher: Matcher, bot: Bot, event: Event, state: State):
|
||||
async def run(matcher: Matcher, bot: Bot, event: Event, state: T_State):
|
||||
print(matcher)
|
||||
|
@ -1,13 +1,13 @@
|
||||
from nonebot import on_command
|
||||
from nonebot.rule import to_me
|
||||
from nonebot.typing import State
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.adapters import Bot, Event
|
||||
|
||||
weather = on_command("天气", rule=to_me(), priority=1)
|
||||
|
||||
|
||||
@weather.handle()
|
||||
async def handle_first_receive(bot: Bot, event: Event, state: State):
|
||||
async def handle_first_receive(bot: Bot, event: Event, state: T_State):
|
||||
args = str(event.get_message()).strip() # 首次发送命令时跟随的参数,例:/天气 上海,则args为上海
|
||||
print(f"==={args}===")
|
||||
if args:
|
||||
@ -15,7 +15,7 @@ async def handle_first_receive(bot: Bot, event: Event, state: State):
|
||||
|
||||
|
||||
@weather.got("city", prompt="你想查询哪个城市的天气呢?")
|
||||
async def handle_city(bot: Bot, state: State):
|
||||
async def handle_city(bot: Bot, state: T_State):
|
||||
city = state["city"]
|
||||
if city not in ["上海", "北京"]:
|
||||
await weather.reject("你想查询的城市暂不支持,请重新输入!")
|
||||
|
Reference in New Issue
Block a user