mirror of
https://github.com/LiteyukiStudio/nonebot-plugin-marshoai.git
synced 2025-07-31 08:59:51 +00:00
@ -1,14 +1,12 @@
|
||||
import base64
|
||||
import json
|
||||
import mimetypes
|
||||
import os
|
||||
import uuid
|
||||
from typing import Any, Optional
|
||||
|
||||
import aiofiles # type: ignore
|
||||
import httpx
|
||||
import nonebot_plugin_localstore as store
|
||||
|
||||
# from zhDateTime import DateTime
|
||||
from azure.ai.inference.aio import ChatCompletionsClient
|
||||
from azure.ai.inference.models import SystemMessage
|
||||
from nonebot import get_driver
|
||||
@ -27,10 +25,33 @@ nickname_json = None # 记录昵称
|
||||
praises_json = None # 记录夸赞名单
|
||||
loaded_target_list = [] # 记录已恢复备份的上下文的列表
|
||||
|
||||
# 时间参数相关
|
||||
if config.marshoai_enable_time_prompt:
|
||||
_weekdays = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
|
||||
_time_prompt = "现在的时间是{date_time},{weekday_name},农历{lunar_date}。"
|
||||
|
||||
|
||||
# noinspection LongLine
|
||||
chromium_headers = {
|
||||
_chromium_headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:134.0) Gecko/20100101 Firefox/134.0"
|
||||
}
|
||||
"""
|
||||
最新的火狐用户代理头
|
||||
"""
|
||||
|
||||
|
||||
# noinspection LongLine
|
||||
_praises_init_data = {
|
||||
"like": [
|
||||
{
|
||||
"name": "Asankilp",
|
||||
"advantages": "赋予了Marsho猫娘人格,使用手机,在vim与vscode的加持下为Marsho写了许多代码,使Marsho更加可爱",
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
初始夸赞名单之数据
|
||||
"""
|
||||
|
||||
|
||||
async def get_image_raw_and_type(
|
||||
@ -45,11 +66,10 @@ async def get_image_raw_and_type(
|
||||
|
||||
return:
|
||||
tuple[bytes, str]: 图片二进制数据, 图片MIME格式
|
||||
|
||||
"""
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url, headers=chromium_headers, timeout=timeout)
|
||||
response = await client.get(url, headers=_chromium_headers, timeout=timeout)
|
||||
if response.status_code == 200:
|
||||
# 获取图片数据
|
||||
content_type = response.headers.get("Content-Type")
|
||||
@ -87,13 +107,15 @@ async def make_chat(
|
||||
model_name: str,
|
||||
tools: Optional[list] = None,
|
||||
):
|
||||
"""调用ai获取回复
|
||||
"""
|
||||
调用ai获取回复
|
||||
|
||||
参数:
|
||||
client: 用于与AI模型进行通信
|
||||
msg: 消息内容
|
||||
model_name: 指定AI模型名
|
||||
tools: 工具列表"""
|
||||
tools: 工具列表
|
||||
"""
|
||||
return await client.complete(
|
||||
messages=msg,
|
||||
model=model_name,
|
||||
@ -110,13 +132,15 @@ async def make_chat_openai(
|
||||
model_name: str,
|
||||
tools: Optional[list] = None,
|
||||
):
|
||||
"""使用 Openai SDK 调用ai获取回复
|
||||
"""
|
||||
使用 Openai SDK 调用ai获取回复
|
||||
|
||||
参数:
|
||||
client: 用于与AI模型进行通信
|
||||
msg: 消息内容
|
||||
model_name: 指定AI模型名
|
||||
tools: 工具列表"""
|
||||
tools: 工具列表
|
||||
"""
|
||||
return await client.chat.completions.create(
|
||||
messages=msg,
|
||||
model=model_name,
|
||||
@ -134,17 +158,9 @@ def get_praises():
|
||||
praises_file = store.get_plugin_data_file(
|
||||
"praises.json"
|
||||
) # 夸赞名单文件使用localstore存储
|
||||
if not os.path.exists(praises_file):
|
||||
init_data = {
|
||||
"like": [
|
||||
{
|
||||
"name": "Asankilp",
|
||||
"advantages": "赋予了Marsho猫娘人格,使用vim与vscode为Marsho写了许多代码,使Marsho更加可爱",
|
||||
}
|
||||
]
|
||||
}
|
||||
if not praises_file.exists():
|
||||
with open(praises_file, "w", encoding="utf-8") as f:
|
||||
json.dump(init_data, f, ensure_ascii=False, indent=4)
|
||||
json.dump(_praises_init_data, f, ensure_ascii=False, indent=4)
|
||||
with open(praises_file, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
praises_json = data
|
||||
@ -154,19 +170,11 @@ def get_praises():
|
||||
async def refresh_praises_json():
|
||||
global praises_json
|
||||
praises_file = store.get_plugin_data_file("praises.json")
|
||||
if not os.path.exists(praises_file):
|
||||
init_data = {
|
||||
"like": [
|
||||
{
|
||||
"name": "Asankilp",
|
||||
"advantages": "赋予了Marsho猫娘人格,使用vim与vscode为Marsho写了许多代码,使Marsho更加可爱",
|
||||
}
|
||||
]
|
||||
}
|
||||
if not praises_file.exists():
|
||||
with open(praises_file, "w", encoding="utf-8") as f:
|
||||
json.dump(init_data, f, ensure_ascii=False, indent=4)
|
||||
with open(praises_file, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
json.dump(_praises_init_data, f, ensure_ascii=False, indent=4) # 异步?
|
||||
async with aiofiles.open(praises_file, "r", encoding="utf-8") as f:
|
||||
data = json.loads(await f.read())
|
||||
praises_json = data
|
||||
|
||||
|
||||
@ -179,33 +187,48 @@ def build_praises():
|
||||
|
||||
|
||||
async def save_context_to_json(name: str, context: Any, path: str):
|
||||
context_dir = store.get_plugin_data_dir() / path
|
||||
os.makedirs(context_dir, exist_ok=True)
|
||||
file_path = os.path.join(context_dir, f"{name}.json")
|
||||
with open(file_path, "w", encoding="utf-8") as json_file:
|
||||
(context_dir := store.get_plugin_data_dir() / path).mkdir(
|
||||
parents=True, exist_ok=True
|
||||
)
|
||||
# os.makedirs(context_dir, exist_ok=True)
|
||||
with open(context_dir / f"{name}.json", "w", encoding="utf-8") as json_file:
|
||||
json.dump(context, json_file, ensure_ascii=False, indent=4)
|
||||
|
||||
|
||||
async def load_context_from_json(name: str, path: str) -> list:
|
||||
"""从指定路径加载历史记录"""
|
||||
context_dir = store.get_plugin_data_dir() / path
|
||||
os.makedirs(context_dir, exist_ok=True)
|
||||
file_path = os.path.join(context_dir, f"{name}.json")
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as json_file:
|
||||
return json.load(json_file)
|
||||
except FileNotFoundError:
|
||||
(context_dir := store.get_plugin_data_dir() / path).mkdir(
|
||||
parents=True, exist_ok=True
|
||||
)
|
||||
if (file_path := context_dir / f"{name}.json").exists():
|
||||
async with aiofiles.open(file_path, "r", encoding="utf-8") as json_file:
|
||||
return json.loads(await json_file.read())
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
async def get_nicknames():
|
||||
"""获取nickname_json, 优先来源于全局变量"""
|
||||
global nickname_json
|
||||
if nickname_json is None:
|
||||
filename = store.get_plugin_data_file("nickname.json")
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
async with aiofiles.open(filename, "r", encoding="utf-8") as f:
|
||||
nickname_json = json.loads(await f.read())
|
||||
except Exception:
|
||||
nickname_json = {}
|
||||
return nickname_json
|
||||
|
||||
|
||||
async def set_nickname(user_id: str, name: str):
|
||||
global nickname_json
|
||||
filename = store.get_plugin_data_file("nickname.json")
|
||||
if not os.path.exists(filename):
|
||||
if not filename.exists():
|
||||
data = {}
|
||||
else:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
async with aiofiles.open(filename, "r", encoding="utf-8") as f:
|
||||
data = json.loads(await f.read())
|
||||
data[user_id] = name
|
||||
if name == "" and user_id in data:
|
||||
del data[user_id]
|
||||
@ -214,30 +237,17 @@ async def set_nickname(user_id: str, name: str):
|
||||
nickname_json = data
|
||||
|
||||
|
||||
# noinspection PyBroadException
|
||||
async def get_nicknames():
|
||||
"""获取nickname_json, 优先来源于全局变量"""
|
||||
global nickname_json
|
||||
if nickname_json is None:
|
||||
filename = store.get_plugin_data_file("nickname.json")
|
||||
try:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
nickname_json = json.load(f)
|
||||
except Exception:
|
||||
nickname_json = {}
|
||||
return nickname_json
|
||||
|
||||
|
||||
async def refresh_nickname_json():
|
||||
"""强制刷新nickname_json, 刷新全局变量"""
|
||||
global nickname_json
|
||||
filename = store.get_plugin_data_file("nickname.json")
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
nickname_json = json.load(f)
|
||||
async with aiofiles.open(
|
||||
store.get_plugin_data_file("nickname.json"), "r", encoding="utf-8"
|
||||
) as f:
|
||||
nickname_json = json.loads(await f.read())
|
||||
except Exception:
|
||||
logger.error("Error loading nickname.json")
|
||||
logger.error("刷新 nickname_json 表错误:无法载入 nickname.json 文件")
|
||||
|
||||
|
||||
def get_prompt():
|
||||
@ -247,13 +257,18 @@ def get_prompt():
|
||||
if config.marshoai_enable_praises:
|
||||
praises_prompt = build_praises()
|
||||
prompts += praises_prompt
|
||||
|
||||
if config.marshoai_enable_time_prompt:
|
||||
current_time = DateTime.now().strftime("%Y.%m.%d %H:%M:%S")
|
||||
current_lunar_date = (
|
||||
DateTime.now().to_lunar().date_hanzify()[5:]
|
||||
) # 库更新之前使用切片
|
||||
time_prompt = f"现在的时间是{current_time},农历{current_lunar_date}。"
|
||||
prompts += time_prompt
|
||||
prompts += _time_prompt.format(
|
||||
date_time=(current_time := DateTime.now()).strftime(
|
||||
"%Y年%m月%d日 %H:%M:%S"
|
||||
),
|
||||
weekday_name=_weekdays[current_time.weekday()],
|
||||
lunar_date=current_time.to_lunar().date_hanzify(
|
||||
"{干支年}{生肖}年{月份}月{日期}日"
|
||||
),
|
||||
)
|
||||
|
||||
marsho_prompt = config.marshoai_prompt
|
||||
spell = SystemMessage(content=marsho_prompt + prompts).as_dict()
|
||||
return spell
|
||||
|
Reference in New Issue
Block a user