mirror of
https://github.com/LiteyukiStudio/LiteyukiBot.git
synced 2025-07-27 23:50:56 +00:00
✨ 智障回复功能
This commit is contained in:
@ -18,7 +18,7 @@ memory_database = {
|
||||
|
||||
|
||||
class User(LiteModel):
|
||||
TABLE_NAME:str = "user"
|
||||
TABLE_NAME: str = "user"
|
||||
user_id: str = Field(str(), alias="user_id")
|
||||
username: str = Field(str(), alias="username")
|
||||
profile: dict[str, str] = Field(dict(), alias="profile")
|
||||
@ -34,6 +34,7 @@ class Group(LiteModel):
|
||||
enabled_plugins: list[str] = Field([], alias="enabled_plugins")
|
||||
disabled_plugins: list[str] = Field([], alias="disabled_plugins")
|
||||
enable: bool = Field(True, alias="enable") # 群聊全局机器人是否启用
|
||||
config: dict = Field({}, alias="config")
|
||||
|
||||
|
||||
class InstalledPlugin(LiteModel):
|
||||
@ -50,7 +51,7 @@ class GlobalPlugin(LiteModel):
|
||||
|
||||
|
||||
class StoredConfig(LiteModel):
|
||||
TABLE_NAME :str= "stored_config"
|
||||
TABLE_NAME: str = "stored_config"
|
||||
config: dict = {}
|
||||
|
||||
|
||||
@ -78,8 +79,8 @@ def set_memory_data(key: str, value) -> None:
|
||||
|
||||
"""
|
||||
return memory_database.update({
|
||||
key: value
|
||||
})
|
||||
key: value
|
||||
})
|
||||
|
||||
|
||||
def get_memory_data(key: str, default=None) -> any:
|
||||
|
@ -64,9 +64,23 @@ class LiteyukiFunction:
|
||||
line: 行数
|
||||
Returns:
|
||||
"""
|
||||
|
||||
try:
|
||||
if "${" in cmd:
|
||||
# 此种情况下,{}内容不用管,只对${}内的内容进行format
|
||||
for i in range(len(cmd) - 1):
|
||||
if cmd[i] == "$" and cmd[i + 1] == "{":
|
||||
end = cmd.find("}", i)
|
||||
key = cmd[i + 2:end]
|
||||
cmd = cmd.replace(f"${{{key}}}", str(self.kwargs_data.get(key, "")))
|
||||
else:
|
||||
cmd = cmd.format(*self.args_data, **self.kwargs_data)
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
no_head = cmd.split(" ", 1)[1] if len(cmd.split(" ")) > 1 else ""
|
||||
try:
|
||||
head, args, kwargs = self.get_args(cmd)
|
||||
head, cmd_args, cmd_kwargs = self.get_args(cmd)
|
||||
except Exception as e:
|
||||
error_msg = f"Parsing error in {self.name} at line {line}: {e}"
|
||||
nonebot.logger.error(error_msg)
|
||||
@ -75,7 +89,7 @@ class LiteyukiFunction:
|
||||
|
||||
if head == "var":
|
||||
# 变量定义
|
||||
self.kwargs_data.update(kwargs)
|
||||
self.kwargs_data.update(cmd_kwargs)
|
||||
|
||||
elif head == "cmd":
|
||||
# 在当前计算机上执行命令
|
||||
@ -83,18 +97,18 @@ class LiteyukiFunction:
|
||||
|
||||
elif head == "api":
|
||||
# 调用Bot API 需要Bot实例
|
||||
await self.bot.call_api(args[1], **kwargs)
|
||||
await self.bot.call_api(cmd_args[1], **cmd_kwargs)
|
||||
|
||||
elif head == "function":
|
||||
# 调用轻雪函数
|
||||
func = get_function(args[1])
|
||||
func = get_function(cmd_args[1])
|
||||
func.bot = self.bot
|
||||
func.matcher = self.matcher
|
||||
await func(*args[2:], **kwargs)
|
||||
await func(*cmd_args[2:], **cmd_kwargs)
|
||||
|
||||
elif head == "sleep":
|
||||
# 等待一段时间
|
||||
await asyncio.sleep(float(args[1]))
|
||||
await asyncio.sleep(float(cmd_args[1]))
|
||||
|
||||
elif head == "nohup":
|
||||
# 挂起运行
|
||||
@ -106,6 +120,7 @@ class LiteyukiFunction:
|
||||
self.end = True
|
||||
return 0
|
||||
|
||||
|
||||
elif head == "await":
|
||||
# 等待所有协程执行完毕
|
||||
await asyncio.gather(*self.sub_tasks)
|
||||
|
@ -64,6 +64,11 @@ def load_resource_from_dir(path: str):
|
||||
from liteyuki.utils.base.ly_function import load_from_dir
|
||||
load_from_dir(os.path.join(path, "functions"))
|
||||
|
||||
if os.path.exists(os.path.join(path, "word_bank")):
|
||||
# 加载词库
|
||||
from liteyuki.utils.base.word_bank import load_from_dir
|
||||
load_from_dir(os.path.join(path, "word_bank"))
|
||||
|
||||
_loaded_resource_packs.insert(0, ResourceMetadata(**metadata))
|
||||
|
||||
|
||||
|
57
liteyuki/utils/base/word_bank.py
Normal file
57
liteyuki/utils/base/word_bank.py
Normal file
@ -0,0 +1,57 @@
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
from typing import Iterable
|
||||
|
||||
import nonebot
|
||||
|
||||
word_bank: dict[str, set[str]] = {}
|
||||
|
||||
|
||||
def load_from_file(file_path: str):
|
||||
"""
|
||||
从json文件中加载词库
|
||||
|
||||
Args:
|
||||
file_path: 文件路径
|
||||
"""
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
data = json.load(file)
|
||||
for key, value_list in data.items():
|
||||
if key not in word_bank:
|
||||
word_bank[key] = set()
|
||||
word_bank[key].update(value_list)
|
||||
|
||||
nonebot.logger.debug(f"Loaded word bank from {file_path}")
|
||||
|
||||
|
||||
def load_from_dir(dir_path: str):
|
||||
"""
|
||||
从目录中加载词库
|
||||
|
||||
Args:
|
||||
dir_path: 目录路径
|
||||
"""
|
||||
for file in os.listdir(dir_path):
|
||||
try:
|
||||
file_path = os.path.join(dir_path, file)
|
||||
if os.path.isfile(file_path):
|
||||
if file.endswith(".json"):
|
||||
load_from_file(file_path)
|
||||
except Exception as e:
|
||||
nonebot.logger.error(f"Failed to load language data from {file}: {e}")
|
||||
continue
|
||||
|
||||
|
||||
def get_reply(kws: Iterable[str]) -> str | None:
|
||||
"""
|
||||
获取回复
|
||||
Args:
|
||||
kws: 关键词
|
||||
Returns:
|
||||
"""
|
||||
for kw in kws:
|
||||
if kw in word_bank:
|
||||
return random.choice(list(word_bank[kw]))
|
||||
|
||||
return None
|
Reference in New Issue
Block a user