mirror of
https://github.com/LiteyukiStudio/LiteyukiBot.git
synced 2025-07-27 05:10:55 +00:00
✨ 轻雪天气更新
This commit is contained in:
@ -66,7 +66,7 @@ async def _(result: Arparma):
|
||||
target=Node(bot_id=target[0], session_type=target[1], session_id=target[2]),
|
||||
inde=len(pushes_db.all(Push(), default=[]))
|
||||
)
|
||||
pushes_db.upsert(push1)
|
||||
pushes_db.save(push1)
|
||||
|
||||
if result.subcommands["add"].args.get("bidirectional"):
|
||||
push2 = Push(
|
||||
@ -74,7 +74,7 @@ async def _(result: Arparma):
|
||||
target=Node(bot_id=source[0], session_type=source[1], session_id=source[2]),
|
||||
inde=len(pushes_db.all(Push(), default=[]))
|
||||
)
|
||||
pushes_db.upsert(push2)
|
||||
pushes_db.save(push2)
|
||||
await add_push.finish("添加成功")
|
||||
else:
|
||||
await add_push.finish("参数缺失")
|
||||
|
@ -150,10 +150,10 @@ def set_plugin_session_enable(event: T_MessageEvent, plugin_name: str, enable: b
|
||||
if event.message_type == "group":
|
||||
__group_data[str(event.group_id)] = session
|
||||
print(session)
|
||||
group_db.upsert(session)
|
||||
group_db.save(session)
|
||||
else:
|
||||
__user_data[str(event.user_id)] = session
|
||||
user_db.upsert(session)
|
||||
user_db.save(session)
|
||||
|
||||
|
||||
def get_plugin_global_enable(plugin_name: str) -> bool:
|
||||
@ -193,7 +193,7 @@ def set_plugin_global_enable(plugin_name: str, enable: bool):
|
||||
default=GlobalPlugin(module_name=plugin_name, enabled=True))
|
||||
plugin.enabled = enable
|
||||
|
||||
plugin_db.upsert(plugin)
|
||||
plugin_db.save(plugin)
|
||||
__global_enable[plugin_name] = enable
|
||||
|
||||
|
||||
@ -242,4 +242,4 @@ def set_group_enable(group_id: str, enable: bool):
|
||||
group.enable = enable
|
||||
|
||||
__group_data[group_id] = group
|
||||
group_db.upsert(group)
|
||||
group_db.save(group)
|
||||
|
@ -225,7 +225,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot, npm: Matcher):
|
||||
found_in_db_plugin = plugin_db.first(InstalledPlugin(), "module_name = ?", plugin_name) # 查询数据库中是否已经安装
|
||||
if r_load:
|
||||
if found_in_db_plugin is None:
|
||||
plugin_db.upsert(installed_plugin)
|
||||
plugin_db.save(installed_plugin)
|
||||
info = md.escape(ulang.get("npm.install_success", NAME=store_plugin.name)) # markdown转义
|
||||
await md.send_md(
|
||||
f"{info}\n\n"
|
||||
|
@ -55,18 +55,22 @@ data
|
||||
"""
|
||||
|
||||
|
||||
async def generate_status_card(bot: dict, hardware: dict, liteyuki: dict, lang="zh-CN", bot_id="0") -> bytes:
|
||||
return await template2image(
|
||||
get_path("templates/status.html", abs_path=True),
|
||||
{
|
||||
"data": {
|
||||
"bot" : bot,
|
||||
"hardware" : hardware,
|
||||
"liteyuki" : liteyuki,
|
||||
"localization": get_local_data(lang)
|
||||
}
|
||||
}
|
||||
)
|
||||
async def generate_status_card(bot: dict, hardware: dict, liteyuki: dict, lang="zh-CN", bot_id="0", use_cache=False) -> bytes:
|
||||
if not use_cache:
|
||||
return await template2image(
|
||||
get_path("templates/status.html", abs_path=True),
|
||||
{
|
||||
"data": {
|
||||
"bot" : bot,
|
||||
"hardware" : hardware,
|
||||
"liteyuki" : liteyuki,
|
||||
"localization": get_local_data(lang)
|
||||
}
|
||||
},
|
||||
debug=True
|
||||
)
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def get_local_data(lang_code) -> dict:
|
||||
@ -97,7 +101,7 @@ def get_local_data(lang_code) -> dict:
|
||||
"cores" : lang.get("status.cores"),
|
||||
"process" : lang.get("status.process"),
|
||||
"resources" : lang.get("status.resources"),
|
||||
|
||||
"description" : lang.get("status.description"),
|
||||
}
|
||||
|
||||
|
||||
|
@ -49,7 +49,7 @@ async def _(result: Arparma, event: T_MessageEvent, bot: T_Bot):
|
||||
r = set_profile(result.args["key"], result.args["value"], str(event.user_id))
|
||||
if r:
|
||||
user.profile[result.args["key"]] = result.args["value"]
|
||||
user_db.upsert(user) # 数据库保存
|
||||
user_db.save(user) # 数据库保存
|
||||
await profile_alc.finish(
|
||||
ulang.get(
|
||||
"user.profile.set_success",
|
||||
|
@ -1,7 +1,7 @@
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot import get_driver
|
||||
from .qweather import *
|
||||
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="轻雪天气",
|
||||
description="基于和风天气api的天气插件",
|
||||
@ -9,9 +9,19 @@ __plugin_meta__ = PluginMetadata(
|
||||
type="application",
|
||||
homepage="https://github.com/snowykami/LiteyukiBot",
|
||||
extra={
|
||||
"liteyuki": True,
|
||||
"toggleable" : True,
|
||||
"default_enable" : True,
|
||||
"liteyuki" : True,
|
||||
"toggleable" : True,
|
||||
"default_enable": True,
|
||||
}
|
||||
)
|
||||
|
||||
from ...utils.base.data_manager import set_memory_data
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
# 检查是否为开发者模式
|
||||
is_dev = await check_key_dev(get_config("weather_key", ""))
|
||||
set_memory_data("weather.is_dev", is_dev)
|
||||
|
@ -1,6 +1,8 @@
|
||||
from .qw_models import *
|
||||
import httpx
|
||||
|
||||
from ...utils.base.data_manager import get_memory_data
|
||||
from ...utils.base.language import Language
|
||||
|
||||
dev_url = "https://devapi.qweather.com/" # 开发HBa
|
||||
com_url = "https://api.qweather.com/" # 正式环境
|
||||
@ -17,6 +19,42 @@ def get_qw_lang(lang: str) -> str:
|
||||
return lang
|
||||
|
||||
|
||||
async def check_key_dev(key: str) -> bool:
|
||||
url = "https://api.qweather.com/v7/weather/now?"
|
||||
params = {
|
||||
"location": "101010100",
|
||||
"key" : key,
|
||||
}
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.get(url, params=params)
|
||||
return resp.json().get("code") != "200" # 查询不到付费数据为开发版
|
||||
|
||||
|
||||
def get_local_data(ulang_code: str) -> dict:
|
||||
"""
|
||||
获取本地化数据
|
||||
Args:
|
||||
ulang_code:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
ulang = Language(ulang_code)
|
||||
return {
|
||||
"monday" : ulang.get("weather.monday"),
|
||||
"tuesday" : ulang.get("weather.tuesday"),
|
||||
"wednesday": ulang.get("weather.wednesday"),
|
||||
"thursday" : ulang.get("weather.thursday"),
|
||||
"friday" : ulang.get("weather.friday"),
|
||||
"saturday" : ulang.get("weather.saturday"),
|
||||
"sunday" : ulang.get("weather.sunday"),
|
||||
"today" : ulang.get("weather.today"),
|
||||
"tomorrow" : ulang.get("weather.tomorrow"),
|
||||
"day" : ulang.get("weather.day"),
|
||||
"night" : ulang.get("weather.night"),
|
||||
}
|
||||
|
||||
|
||||
async def city_lookup(
|
||||
location: str,
|
||||
key: str,
|
||||
@ -54,7 +92,7 @@ async def get_weather_now(
|
||||
location: str,
|
||||
lang: str = "zh",
|
||||
unit: str = "m",
|
||||
dev: bool = True,
|
||||
dev: bool = get_memory_data("is_dev", True),
|
||||
) -> dict:
|
||||
url_path = "v7/weather/now?"
|
||||
url = dev_url + url_path if dev else com_url + url_path
|
||||
@ -74,7 +112,7 @@ async def get_weather_daily(
|
||||
location: str,
|
||||
lang: str = "zh",
|
||||
unit: str = "m",
|
||||
dev: bool = True,
|
||||
dev: bool = get_memory_data("is_dev", True),
|
||||
) -> dict:
|
||||
url_path = "v7/weather/%dd?" % (7 if dev else 30)
|
||||
url = dev_url + url_path if dev else com_url + url_path
|
||||
@ -94,7 +132,7 @@ async def get_weather_hourly(
|
||||
location: str,
|
||||
lang: str = "zh",
|
||||
unit: str = "m",
|
||||
dev: bool = True,
|
||||
dev: bool = get_memory_data("is_dev", True),
|
||||
) -> dict:
|
||||
url_path = "v7/weather/%dh?" % (24 if dev else 168)
|
||||
url = dev_url + url_path if dev else com_url + url_path
|
||||
@ -115,7 +153,7 @@ async def get_airquality(
|
||||
lang: str,
|
||||
pollutant: bool = False,
|
||||
station: bool = False,
|
||||
dev: bool = True
|
||||
dev: bool = get_memory_data("is_dev", True),
|
||||
) -> dict:
|
||||
url_path = f"airquality/v1/now/{location}?"
|
||||
url = dev_url + url_path if dev else com_url + url_path
|
||||
|
@ -1,4 +1,4 @@
|
||||
from nonebot import require
|
||||
from nonebot import require, on_endswith
|
||||
from nonebot.adapters.onebot.v11 import MessageSegment
|
||||
from nonebot.internal.matcher import Matcher
|
||||
|
||||
@ -7,7 +7,7 @@ from liteyuki.utils.base.ly_typing import T_MessageEvent
|
||||
|
||||
from .qw_api import *
|
||||
from liteyuki.utils.base.data_manager import User, user_db
|
||||
from liteyuki.utils.base.language import get_user_lang
|
||||
from liteyuki.utils.base.language import Language, get_user_lang
|
||||
from liteyuki.utils.base.resource import get_path
|
||||
from liteyuki.utils.message.html_tool import template2image
|
||||
|
||||
@ -24,40 +24,50 @@ from nonebot_plugin_alconna import on_alconna, Alconna, Args, MultiVar, Arparma
|
||||
).handle()
|
||||
async def _(result: Arparma, event: T_MessageEvent, matcher: Matcher):
|
||||
"""await alconna.send("weather", city)"""
|
||||
ulang = get_user_lang(str(event.user_id))
|
||||
kws = result.main_args.get("keywords")
|
||||
image = await get_weather_now_card(matcher, event, kws)
|
||||
await matcher.finish(MessageSegment.image(image))
|
||||
|
||||
|
||||
@on_endswith(("天气", "weather")).handle()
|
||||
async def _(event: T_MessageEvent, matcher: Matcher):
|
||||
"""await alconna.send("weather", city)"""
|
||||
kws = event.message.extract_plain_text()
|
||||
image = await get_weather_now_card(matcher, event, [kws.replace("天气", "").replace("weather", "")], False)
|
||||
await matcher.finish(MessageSegment.image(image))
|
||||
|
||||
|
||||
async def get_weather_now_card(matcher: Matcher, event: T_MessageEvent, keyword: list[str], tip: bool = True):
|
||||
ulang = get_user_lang(event.user_id)
|
||||
qw_lang = get_qw_lang(ulang.lang_code)
|
||||
key = get_config("weather_key")
|
||||
is_dev = get_config("weather_dev")
|
||||
|
||||
user: User = user_db.first(User(), "user_id = ?", str(event.user_id), default=User())
|
||||
|
||||
is_dev = get_memory_data("weather.is_dev", True)
|
||||
user: User = user_db.first(User(), "user_id = ?", event.user_id, default=User())
|
||||
# params
|
||||
unit = user.profile.get("unit", "m")
|
||||
stored_location = user.profile.get("location", None)
|
||||
|
||||
if not key:
|
||||
await matcher.finish(ulang.get("weather.no_key"))
|
||||
await matcher.finish(ulang.get("weather.no_key") if tip else None)
|
||||
|
||||
kws = result.main_args.get("keywords")
|
||||
if kws:
|
||||
if len(kws) >= 2:
|
||||
adm = kws[0]
|
||||
city = kws[-1]
|
||||
if keyword:
|
||||
if len(keyword) >= 2:
|
||||
adm = keyword[0]
|
||||
city = keyword[-1]
|
||||
else:
|
||||
adm = ""
|
||||
city = kws[0]
|
||||
city = keyword[0]
|
||||
city_info = await city_lookup(city, key, adm=adm, lang=qw_lang)
|
||||
city_name = " ".join(kws)
|
||||
city_name = " ".join(keyword)
|
||||
else:
|
||||
if not stored_location:
|
||||
await matcher.finish(ulang.get("liteyuki.invalid_command", TEXT="location"))
|
||||
await matcher.finish(ulang.get("liteyuki.invalid_command", TEXT="location") if tip else None)
|
||||
city_info = await city_lookup(stored_location, key, lang=qw_lang)
|
||||
city_name = stored_location
|
||||
if city_info.code == "200":
|
||||
location_data = city_info.location[0]
|
||||
else:
|
||||
await matcher.finish(ulang.get("weather.city_not_found", CITY=city_name))
|
||||
|
||||
await matcher.finish(ulang.get("weather.city_not_found", CITY=city_name) if tip else None)
|
||||
weather_now = await get_weather_now(key, location_data.id, lang=qw_lang, unit=unit, dev=is_dev)
|
||||
weather_daily = await get_weather_daily(key, location_data.id, lang=qw_lang, unit=unit, dev=is_dev)
|
||||
weather_hourly = await get_weather_hourly(key, location_data.id, lang=qw_lang, unit=unit, dev=is_dev)
|
||||
@ -76,9 +86,10 @@ async def _(result: Arparma, event: T_MessageEvent, matcher: Matcher):
|
||||
"weatherHourly": weather_hourly,
|
||||
"aqi" : aqi,
|
||||
"location" : location_data.dump(),
|
||||
"localization" : get_local_data(ulang.lang_code)
|
||||
}
|
||||
},
|
||||
debug=True,
|
||||
wait=1
|
||||
)
|
||||
await matcher.finish(MessageSegment.image(image))
|
||||
return image
|
||||
|
Reference in New Issue
Block a user