Rename package to "nonebot"

This commit is contained in:
Richard Chien
2018-12-27 20:23:45 +08:00
parent b8cb8e440b
commit 663797219a
71 changed files with 221 additions and 220 deletions

158
nonebot/natural_language.py Normal file
View File

@ -0,0 +1,158 @@
import asyncio
import re
from typing import Iterable, Optional, Callable, Union, NamedTuple
from . import NoneBot, permission as perm
from .command import call_command
from .log import logger
from .message import Message
from .session import BaseSession
from .typing import Context_T, CommandName_T, CommandArgs_T
_nl_processors = set()
class NLProcessor:
__slots__ = ('func', 'keywords', 'permission',
'only_to_me', 'only_short_message',
'allow_empty_message')
def __init__(self, *, func: Callable, keywords: Optional[Iterable],
permission: int, only_to_me: bool, only_short_message: bool,
allow_empty_message: bool):
self.func = func
self.keywords = keywords
self.permission = permission
self.only_to_me = only_to_me
self.only_short_message = only_short_message
self.allow_empty_message = allow_empty_message
def on_natural_language(keywords: Union[Optional[Iterable], Callable] = None,
*, permission: int = perm.EVERYBODY,
only_to_me: bool = True,
only_short_message: bool = True,
allow_empty_message: bool = False) -> Callable:
"""
Decorator to register a function as a natural language processor.
:param keywords: keywords to respond to, if None, respond to all messages
:param permission: permission required by the processor
:param only_to_me: only handle messages to me
:param only_short_message: only handle short messages
:param allow_empty_message: handle empty messages
"""
def deco(func: Callable) -> Callable:
nl_processor = NLProcessor(func=func, keywords=keywords,
permission=permission,
only_to_me=only_to_me,
only_short_message=only_short_message,
allow_empty_message=allow_empty_message)
_nl_processors.add(nl_processor)
return func
if isinstance(keywords, Callable):
# here "keywords" is the function to be decorated
return on_natural_language()(keywords)
else:
return deco
class NLPSession(BaseSession):
__slots__ = ('msg', 'msg_text', 'msg_images')
def __init__(self, bot: NoneBot, ctx: Context_T, msg: str):
super().__init__(bot, ctx)
self.msg = msg
tmp_msg = Message(msg)
self.msg_text = tmp_msg.extract_plain_text()
self.msg_images = [s.data['url'] for s in tmp_msg
if s.type == 'image' and 'url' in s.data]
class NLPResult(NamedTuple):
confidence: float
cmd_name: Union[str, CommandName_T]
cmd_args: Optional[CommandArgs_T] = None
async def handle_natural_language(bot: NoneBot, ctx: Context_T) -> bool:
"""
Handle a message as natural language.
This function is typically called by "handle_message".
:param bot: NoneBot instance
:param ctx: message context
:return: the message is handled as natural language
"""
msg = str(ctx['message'])
if bot.config.NICKNAME:
# check if the user is calling me with my nickname
if isinstance(bot.config.NICKNAME, str) or \
not isinstance(bot.config.NICKNAME, Iterable):
nicknames = (bot.config.NICKNAME,)
else:
nicknames = filter(lambda n: n, bot.config.NICKNAME)
nickname_regex = '|'.join(nicknames)
m = re.search(rf'^({nickname_regex})([\s,]|$)', msg, re.IGNORECASE)
if m:
nickname = m.group(1)
logger.debug(f'User is calling me {nickname}')
ctx['to_me'] = True
msg = msg[m.end():]
session = NLPSession(bot, ctx, msg)
# use msg_text here because CQ code "share" may be very long,
# at the same time some plugins may want to handle it
msg_text_length = len(session.msg_text)
futures = []
for p in _nl_processors:
if not p.allow_empty_message and not session.msg:
# don't allow empty msg, but it is one, so skip to next
continue
if p.only_short_message and \
msg_text_length > bot.config.SHORT_MESSAGE_MAX_LENGTH:
continue
if p.only_to_me and not ctx['to_me']:
continue
should_run = await perm.check_permission(bot, ctx, p.permission)
if should_run and p.keywords:
for kw in p.keywords:
if kw in session.msg_text:
break
else:
# no keyword matches
should_run = False
if should_run:
futures.append(asyncio.ensure_future(p.func(session)))
if futures:
# wait for possible results, and sort them by confidence
results = []
for fut in futures:
try:
results.append(await fut)
except Exception as e:
logger.error('An exception occurred while running '
'some natural language processor:')
logger.exception(e)
results = sorted(filter(lambda r: r, results),
key=lambda r: r.confidence, reverse=True)
logger.debug(f'NLP results: {results}')
if results and results[0].confidence >= 60.0:
# choose the result with highest confidence
logger.debug(f'NLP result with highest confidence: {results[0]}')
return await call_command(bot, ctx, results[0].cmd_name,
args=results[0].cmd_args,
check_perm=False)
else:
logger.debug('No NLP result having enough confidence')
return False