✨ 添加对lyfunction的支持
This commit is contained in:
@ -5,14 +5,17 @@ liteyuki function是一种类似于mcfunction的函数,用于在liteyuki中实
|
||||
可以用于一些轻量级插件的编写,无需Python代码
|
||||
SnowyKami
|
||||
"""
|
||||
import asyncio
|
||||
import functools
|
||||
# cmd *args **kwargs
|
||||
# api api_name **kwargs
|
||||
import os
|
||||
from typing import Any, Awaitable, Callable, Coroutine
|
||||
|
||||
import nonebot
|
||||
from nonebot import Bot
|
||||
from nonebot.adapters.satori import bot
|
||||
from nonebot.internal.matcher import Matcher
|
||||
|
||||
ly_function_extensions = (
|
||||
"lyf",
|
||||
@ -24,57 +27,115 @@ loaded_functions = dict()
|
||||
|
||||
|
||||
class LiteyukiFunction:
|
||||
def __init__(self, name: str, path: str):
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
self.path = path
|
||||
self.functions = list()
|
||||
|
||||
self.functions: list[str] = list()
|
||||
self.bot: Bot = None
|
||||
|
||||
self.var_data = dict()
|
||||
self.macro_data = dict()
|
||||
self.matcher: Matcher = None
|
||||
self.end = False
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
for _callable in self.functions:
|
||||
if _callable is not None:
|
||||
_callable(*args, **kwargs)
|
||||
self.sub_tasks: list[asyncio.Task] = list()
|
||||
|
||||
async def __call__(self, *args, **kwargs):
|
||||
for i, cmd in enumerate(self.functions):
|
||||
r = await self.execute_line(cmd, i, *args, **kwargs)
|
||||
if r == 0:
|
||||
msg = f"End function {self.name} by line {i}"
|
||||
nonebot.logger.debug(msg)
|
||||
for task in self.sub_tasks:
|
||||
task.cancel(msg)
|
||||
return
|
||||
|
||||
def __str__(self):
|
||||
return f"LiteyukiFunction({self.name}, {self.path})"
|
||||
return f"LiteyukiFunction({self.name})"
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
async def execute_line(self, line: str) -> Callable[[tuple, dict], Coroutine[Any, Any, Any] | Any] | None:
|
||||
async def execute_line(self, cmd: str, line: int = 0, *args, **kwargs) -> Any:
|
||||
"""
|
||||
解析一行轻雪函数
|
||||
Args:
|
||||
line:
|
||||
cmd: 命令
|
||||
line: 行数
|
||||
Returns:
|
||||
"""
|
||||
cmd = cmd.format(*args, **kwargs)
|
||||
no_head = cmd.split(" ", 1)[1] if len(cmd.split(" ")) > 1 else ""
|
||||
try:
|
||||
head, args, kwargs = self.get_args(cmd)
|
||||
except Exception as e:
|
||||
error_msg = f"Parsing error in {self.name} at line {line}: {e}"
|
||||
nonebot.logger.error(error_msg)
|
||||
await self.matcher.send(error_msg)
|
||||
return
|
||||
|
||||
args: list[str] = line.split(" ")
|
||||
head = args.pop(0)
|
||||
if head.startswith("#"):
|
||||
# 注释
|
||||
return None
|
||||
|
||||
elif head == "var":
|
||||
if head == "var":
|
||||
# 变量定义
|
||||
for arg in args:
|
||||
self.var_data[arg.split("=", 1)[0]] = eval(arg.split("=", 1)[1])
|
||||
self.var_data.update(kwargs)
|
||||
|
||||
elif head == "cmd":
|
||||
# 在当前计算机上执行命令
|
||||
os.system(line.split(" ", 1)[1])
|
||||
os.system(no_head)
|
||||
|
||||
elif head == "api":
|
||||
# 调用Bot API 需要Bot实例
|
||||
await self.bot.call_api(line.split(" ", 1)[1])
|
||||
await self.bot.call_api(args[1], **kwargs)
|
||||
|
||||
elif head == "function":
|
||||
# 调用轻雪函数
|
||||
return functools.partial(get_function, line.split(" ", 1)[1])
|
||||
func = get_function(args[1])
|
||||
func.bot = self.bot
|
||||
func.matcher = self.matcher
|
||||
await func(*args[2:], **kwargs)
|
||||
|
||||
elif head == "sleep":
|
||||
# 等待一段时间
|
||||
await asyncio.sleep(float(args[1]))
|
||||
|
||||
elif head == "nohup":
|
||||
# 挂起运行
|
||||
print("挂起运行")
|
||||
task = asyncio.create_task(self.execute_line(no_head))
|
||||
self.sub_tasks.append(task)
|
||||
|
||||
elif head == "end":
|
||||
# 结束所有函数
|
||||
self.end = True
|
||||
return 0
|
||||
|
||||
elif head == "await":
|
||||
# 等待所有协程执行完毕
|
||||
await asyncio.gather(*self.sub_tasks)
|
||||
|
||||
def get_args(self, line: str) -> tuple[str, tuple[str, ...], dict[str, Any]]:
|
||||
"""
|
||||
获取参数
|
||||
Args:
|
||||
line: 命令
|
||||
Returns:
|
||||
命令头 参数 关键字
|
||||
"""
|
||||
line = line.replace("\\=", "EQUAL_SIGN")
|
||||
head = ""
|
||||
args = list()
|
||||
kwargs = dict()
|
||||
for i, arg in enumerate(line.split(" ")):
|
||||
if "=" in arg:
|
||||
key, value = arg.split("=", 1)
|
||||
value = value.replace("EQUAL_SIGN", "=")
|
||||
try:
|
||||
value = eval(value)
|
||||
except:
|
||||
value = self.var_data.get(value, value)
|
||||
kwargs[key] = value
|
||||
else:
|
||||
if i == 0:
|
||||
head = arg
|
||||
args.append(arg)
|
||||
return head, tuple(args), kwargs
|
||||
|
||||
|
||||
def get_function(name: str) -> LiteyukiFunction | None:
|
||||
@ -89,7 +150,7 @@ def get_function(name: str) -> LiteyukiFunction | None:
|
||||
|
||||
def load_from_dir(path: str):
|
||||
"""
|
||||
从目录中加载轻雪函数,类似mcfunction
|
||||
从目录及其子目录中递归加载所有轻雪函数,类似mcfunction
|
||||
|
||||
Args:
|
||||
path: 目录路径
|
||||
@ -99,6 +160,8 @@ def load_from_dir(path: str):
|
||||
if os.path.isfile(f):
|
||||
if f.endswith(ly_function_extensions):
|
||||
load_from_file(f)
|
||||
if os.path.isdir(f):
|
||||
load_from_dir(f)
|
||||
|
||||
|
||||
def load_from_file(path: str):
|
||||
@ -108,4 +171,12 @@ def load_from_file(path: str):
|
||||
path:
|
||||
Returns:
|
||||
"""
|
||||
pass
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
name = ".".join(os.path.basename(path).split(".")[:-1])
|
||||
func = LiteyukiFunction(name)
|
||||
for i, line in enumerate(f.read().split("\n")):
|
||||
if line.startswith("#") or line.strip() == "":
|
||||
continue
|
||||
func.functions.append(line)
|
||||
loaded_functions[name] = func
|
||||
nonebot.logger.debug(f"Loaded function {name}")
|
||||
|
Reference in New Issue
Block a user