mirror of
				https://github.com/nonebot/nonebot2.git
				synced 2025-10-25 12:06:39 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			164 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			164 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import pytest
 | |
| from nonebug import App
 | |
| 
 | |
| from utils import make_fake_event, make_fake_message
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| async def test_matcher(app: App, load_plugin):
 | |
|     from plugins.matcher.matcher_process import (
 | |
|         test_got,
 | |
|         test_handle,
 | |
|         test_preset,
 | |
|         test_combine,
 | |
|         test_receive,
 | |
|         test_overload,
 | |
|     )
 | |
| 
 | |
|     message = make_fake_message()("text")
 | |
|     event = make_fake_event(_message=message)()
 | |
|     message_next = make_fake_message()("text_next")
 | |
|     event_next = make_fake_event(_message=message_next)()
 | |
| 
 | |
|     assert len(test_handle.handlers) == 1
 | |
|     async with app.test_matcher(test_handle) as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_call_send(event, "send", "result", at_sender=True)
 | |
|         ctx.should_finished()
 | |
| 
 | |
|     assert len(test_got.handlers) == 1
 | |
|     async with app.test_matcher(test_got) as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_call_send(event, "prompt key1", "result1")
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_call_send(event, "prompt key2", "result2")
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_call_send(event, "reject", "result3", at_sender=True)
 | |
|         ctx.should_rejected()
 | |
|         ctx.receive_event(bot, event_next)
 | |
| 
 | |
|     assert len(test_receive.handlers) == 1
 | |
|     async with app.test_matcher(test_receive) as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_call_send(event, "pause", "result", at_sender=True)
 | |
|         ctx.should_paused()
 | |
| 
 | |
|     assert len(test_receive.handlers) == 1
 | |
|     async with app.test_matcher(test_combine) as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_rejected()
 | |
|         ctx.receive_event(bot, event_next)
 | |
|         ctx.should_rejected()
 | |
|         ctx.receive_event(bot, event_next)
 | |
|         ctx.should_rejected()
 | |
|         ctx.receive_event(bot, event_next)
 | |
| 
 | |
|     assert len(test_preset.handlers) == 2
 | |
|     async with app.test_matcher(test_preset) as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_rejected()
 | |
|         ctx.receive_event(bot, event_next)
 | |
| 
 | |
|     assert len(test_overload.handlers) == 2
 | |
|     async with app.test_matcher(test_overload) as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         ctx.receive_event(bot, event)
 | |
|         ctx.should_finished()
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| async def test_type_updater(app: App, load_plugin):
 | |
|     from plugins.matcher.matcher_type import (
 | |
|         test_type_updater,
 | |
|         test_custom_updater,
 | |
|     )
 | |
| 
 | |
|     event = make_fake_event()()
 | |
| 
 | |
|     assert test_type_updater.type == "test"
 | |
|     async with app.test_api() as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         matcher = test_type_updater()
 | |
|         new_type = await matcher.update_type(bot, event)
 | |
|         assert new_type == "message"
 | |
| 
 | |
|     assert test_custom_updater.type == "test"
 | |
|     async with app.test_api() as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         matcher = test_custom_updater()
 | |
|         new_type = await matcher.update_type(bot, event)
 | |
|         assert new_type == "custom"
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| async def test_permission_updater(app: App, load_plugin):
 | |
|     from nonebot.permission import User
 | |
| 
 | |
|     from plugins.matcher.matcher_permission import (
 | |
|         default_permission,
 | |
|         test_custom_updater,
 | |
|         test_permission_updater,
 | |
|     )
 | |
| 
 | |
|     event = make_fake_event(_session_id="test")()
 | |
| 
 | |
|     assert test_permission_updater.permission is default_permission
 | |
|     async with app.test_api() as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         matcher = test_permission_updater()
 | |
|         new_perm = await matcher.update_permission(bot, event)
 | |
|         assert len(new_perm.checkers) == 1
 | |
|         checker = list(new_perm.checkers)[0].call
 | |
|         assert isinstance(checker, User)
 | |
|         assert checker.users == ("test",)
 | |
|         assert checker.perm is default_permission
 | |
| 
 | |
|     assert test_custom_updater.permission is default_permission
 | |
|     async with app.test_api() as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         matcher = test_custom_updater()
 | |
|         new_perm = await matcher.update_permission(bot, event)
 | |
|         assert new_perm is default_permission
 | |
| 
 | |
| 
 | |
| @pytest.mark.asyncio
 | |
| async def test_run(app: App):
 | |
|     from nonebot.matcher import Matcher, matchers
 | |
| 
 | |
|     assert not matchers
 | |
|     event = make_fake_event()()
 | |
| 
 | |
|     async def reject():
 | |
|         await Matcher.reject()
 | |
| 
 | |
|     test_reject = Matcher.new(handlers=[reject])
 | |
| 
 | |
|     async with app.test_api() as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         await test_reject().run(bot, event, {})
 | |
|         assert len(matchers[0]) == 1
 | |
|         assert len(matchers[0][0].handlers) == 1
 | |
| 
 | |
|     del matchers[0]
 | |
| 
 | |
|     async def pause():
 | |
|         await Matcher.pause()
 | |
| 
 | |
|     test_pause = Matcher.new(handlers=[pause])
 | |
| 
 | |
|     async with app.test_api() as ctx:
 | |
|         bot = ctx.create_bot()
 | |
|         await test_pause().run(bot, event, {})
 | |
|         assert len(matchers[0]) == 1
 | |
|         assert len(matchers[0][0].handlers) == 0
 |