猜成语:修复拼音v的bug、同步主仓库的代码

This commit is contained in:
Eilles
2026-05-10 19:11:28 +08:00
parent 226a2a336c
commit d23007343d
4 changed files with 178 additions and 155 deletions
@@ -1,23 +1,25 @@
import asyncio import asyncio
from asyncio import TimerHandle from asyncio import TimerHandle
from typing import Any, Dict from typing import Any, Dict, Annotated
from typing_extensions import Annotated
import nonebot
from nonebot import on_regex, require from nonebot import on_regex, require
from nonebot.log import logger
from nonebot.matcher import Matcher from nonebot.matcher import Matcher
from nonebot.params import RegexDict from nonebot.params import EventToMe, Depends, RegexDict
from nonebot.plugin import PluginMetadata, inherit_supported_adapters from nonebot.plugin import PluginMetadata, inherit_supported_adapters
from nonebot.rule import to_me from nonebot.rule import to_me
from nonebot.utils import run_sync from nonebot.utils import run_sync
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
from nonebot.internal.adapter import Bot
from typing_extensions import Annotated
require("nonebot_plugin_alconna") require("nonebot_plugin_alconna")
require("nonebot_plugin_session") require("nonebot_plugin_uninfo")
from nonebot_plugin_alconna import ( from nonebot_plugin_alconna import (
AlcMatches,
Alconna, Alconna,
At,
Image, Image,
Option, Option,
Text, Text,
@@ -27,11 +29,12 @@ from nonebot_plugin_alconna import (
Args, Args,
Arparma, Arparma,
) )
from nonebot_plugin_session import SessionId, SessionIdType from nonebot_plugin_uninfo import Uninfo
from .config import Config, handle_config from .config import Config, handle_config
from .data_source import GuessResult, Handle from .data_source import GuessResult, Handle
from .utils import ( from .utils import (
v_to_u,
random_idiom, random_idiom,
wordbase_updater, wordbase_updater,
HANDLE_COMMON_PHRASES, HANDLE_COMMON_PHRASES,
@@ -40,7 +43,7 @@ from .utils import (
__plugin_meta__ = PluginMetadata( __plugin_meta__ = PluginMetadata(
name="猜成语", name="猜成语",
description="猜成语-睿乐特别版", description="汉字 Wordle 猜成语",
usage=( usage=(
"@我 + “猜成语”开始游戏;\n" "@我 + “猜成语”开始游戏;\n"
"你有十次的机会猜一个四字词语;\n" "你有十次的机会猜一个四字词语;\n"
@@ -50,29 +53,32 @@ __plugin_meta__ = PluginMetadata(
"每个格子的 汉字、声母、韵母、声调 都会独立进行颜色的指示。\n" "每个格子的 汉字、声母、韵母、声调 都会独立进行颜色的指示。\n"
"当四个格子都为青色时,你便赢得了游戏!\n" "当四个格子都为青色时,你便赢得了游戏!\n"
"可发送“结束”结束游戏;可发送“提示”查看提示。\n" "可发送“结束”结束游戏;可发送“提示”查看提示。\n"
"使用 --strict 选项开启非默认的成语检查,即猜测的短语必须是成语,\n" "使用 --strict|-s 选项开启非默认的成语检查,即猜测的短语必须是成语,\n"
"如:@我 猜成语 --strict\n" "如:猜成语 --strict\n"
"使用 --hard 选项开启困难词库\n" "使用 --difficult|-d 选项开启困难词库\n"
"管理员可以使用 新成语、成语答案 两个命令进行成语词库添加和答案查看" "管理员可以使用 新成语、成语答案 两个命令进行成语词库添加和答案查看"
), ),
type="application", type="application",
homepage="https://github.com/noneplugin/nonebot-plugin-handle", homepage="https://github.com/noneplugin/nonebot-plugin-handle",
config=Config, config=Config,
supported_adapters=inherit_supported_adapters( supported_adapters=inherit_supported_adapters(
"nonebot_plugin_alconna", "nonebot_plugin_session" "nonebot_plugin_alconna", "nonebot_plugin_uninfo"
), ),
extra={
"example": "@小Q 猜成语",
},
) )
games: Dict[str, Handle] = {} games: Dict[str, Handle] = {}
timers: Dict[str, TimerHandle] = {} timers: Dict[str, TimerHandle] = {}
UserId = Annotated[str, SessionId(SessionIdType.GROUP)]
def get_user_id(uninfo: Uninfo) -> str:
return f"{uninfo.scope}_{uninfo.self_id}_{uninfo.scene_path}"
UserId = Annotated[str, Depends(get_user_id)]
# 游戏运行判断规则
def game_is_running(user_id: UserId) -> bool: def game_is_running(user_id: UserId) -> bool:
return user_id in games return user_id in games
@@ -81,53 +87,7 @@ def game_not_running(user_id: UserId) -> bool:
return user_id not in games return user_id not in games
handle = on_alconna( # 游戏过程中的功能实现
Alconna(
"handle",
Option("-s|--strict", default=False, action=store_true),
Option("-d|--difficult", default=False, action=store_true),
),
aliases=("猜成语",),
rule=to_me() & game_not_running,
use_cmd_start=True,
block=True,
priority=13,
)
handle_hint = on_alconna(
"提示",
rule=game_is_running,
use_cmd_start=True,
block=True,
priority=13,
)
handle_stop = on_alconna(
"结束",
aliases=("结束游戏", "结束猜成语"),
rule=game_is_running,
use_cmd_start=True,
block=True,
priority=13,
)
# handle_update = on_alconna(
# "更新词库",
# aliases=("刷新词库", "猜成语刷新词库"),
# rule=to_me(),
# use_cmd_start=True,
# block=True,
# priority=13,
# )
handle_idiom = on_regex(
r"^(?P<idiom>[\u4e00-\u9fa5]{4})$",
rule=game_is_running,
block=True,
priority=14,
)
def stop_game(user_id: str): def stop_game(user_id: str):
if timer := timers.pop(user_id, None): if timer := timers.pop(user_id, None):
timer.cancel() timer.cancel()
@@ -154,13 +114,46 @@ def set_timeout(matcher: Matcher, user_id: str, timeout: float = 300):
timers[user_id] = timer timers[user_id] = timer
@handle.handle() # ————
handle_alc = Alconna(
"handle",
Option("-s|--strict", default=False, action=store_true),
Option("-d|--difficult", default=False, action=store_true),
)
handle_matcher = on_alconna(
handle_alc,
aliases=("猜成语",),
rule=(
(to_me() & game_not_running)
if handle_config.handle_require_tome
else (game_not_running)
),
use_cmd_start=True,
block=True,
priority=13,
)
@handle_matcher.handle()
async def _( async def _(
result: Arparma, result: Arparma,
matcher: Matcher, matcher: Matcher,
user_id: UserId, user_id: UserId,
alc_matches: AlcMatches,
to_me: bool = EventToMe(),
): ):
nonebot.logger.info(result.options)
header_match = str(alc_matches.header_match.result)
command = str(handle_alc.command)
if not (to_me or bool(header_match.rstrip(command))):
# 既不是对机器人说话,也不是以命令开头……
logger.debug("非 To me 命令,忽略")
matcher.block = False
await matcher.finish()
# nonebot.logger.info(result.options)
is_strict = handle_config.handle_strict_mode or result.options["strict"].value is_strict = handle_config.handle_strict_mode or result.options["strict"].value
idiom, explanation = random_idiom(result.options["difficult"].value) idiom, explanation = random_idiom(result.options["difficult"].value)
game = Handle(idiom, explanation, strict=is_strict) game = Handle(idiom, explanation, strict=is_strict)
@@ -175,7 +168,19 @@ async def _(
await msg.send() await msg.send()
@handle_hint.handle() # ————
handle_hint_matcher = on_alconna(
"handle_hint",
aliases=("提示", "猜成语提示"),
rule=game_is_running,
use_cmd_start=True,
block=True,
priority=13,
)
@handle_hint_matcher.handle()
async def _(matcher: Matcher, user_id: UserId): async def _(matcher: Matcher, user_id: UserId):
game = games[user_id] game = games[user_id]
set_timeout(matcher, user_id) set_timeout(matcher, user_id)
@@ -183,7 +188,19 @@ async def _(matcher: Matcher, user_id: UserId):
await UniMessage.image(raw=await run_sync(game.draw_hint)()).send() await UniMessage.image(raw=await run_sync(game.draw_hint)()).send()
@handle_stop.handle() # ————
handle_stop_matcher = on_alconna(
"handle_stop",
aliases=("结束", "结束游戏", "结束猜成语"),
rule=game_is_running,
use_cmd_start=True,
block=True,
priority=13,
)
@handle_stop_matcher.handle()
async def _(matcher: Matcher, user_id: UserId): async def _(matcher: Matcher, user_id: UserId):
game = games[user_id] game = games[user_id]
stop_game(user_id) stop_game(user_id)
@@ -194,11 +211,23 @@ async def _(matcher: Matcher, user_id: UserId):
await matcher.finish(msg) await matcher.finish(msg)
# @handle_update.handle() # ————
handle_idiom_matcher = on_regex(
r"^(?P<idiom>[\u4e00-\u9fa5]{4})$",
rule=game_is_running,
block=True,
priority=14,
)
@handle_idiom.handle() @handle_idiom_matcher.handle()
async def _(matcher: Matcher, user_id: UserId, matched: Dict[str, Any] = RegexDict()): async def _(
matcher: Matcher,
uninfo: Uninfo,
user_id: UserId,
matched: Dict[str, Any] = RegexDict(),
):
game = games[user_id] game = games[user_id]
set_timeout(matcher, user_id) set_timeout(matcher, user_id)
@@ -207,15 +236,18 @@ async def _(matcher: Matcher, user_id: UserId, matched: Dict[str, Any] = RegexDi
if result in [GuessResult.WIN, GuessResult.LOSS]: if result in [GuessResult.WIN, GuessResult.LOSS]:
stop_game(user_id) stop_game(user_id)
msg = Text( await UniMessage.template(
( (
"恭喜猜出了成语!" "恭喜{user}猜出了成语!"
if result == GuessResult.WIN if result == GuessResult.WIN
else "很遗憾,没有人猜出来呢" else "很遗憾,没有人猜出来呢"
) )
+ "\n{}".format(game.result) + "\n{result}\n{image}"
) + Image(raw=await run_sync(game.draw)()) ).format(
await msg.send() user="" if uninfo.scene.is_private else At("user", uninfo.user.id),
result=game.result,
image=Image(raw=await run_sync(game.draw)()),
).send()
elif result == GuessResult.DUPLICATE: elif result == GuessResult.DUPLICATE:
await matcher.finish("你已经猜过这个成语了呢") await matcher.finish("你已经猜过这个成语了呢")
@@ -227,9 +259,11 @@ async def _(matcher: Matcher, user_id: UserId, matched: Dict[str, Any] = RegexDi
await UniMessage.image(raw=await run_sync(game.draw)()).send() await UniMessage.image(raw=await run_sync(game.draw)()).send()
handle_update_pinyin = on_alconna( # ————
handle_update_pinyin_matcher = on_alconna(
Alconna( Alconna(
"更正成语拼音", "handle_update_pinyin",
Args["idiom", str, ""], Args["idiom", str, ""],
Args["pinyin1", str, ""], Args["pinyin1", str, ""],
Args["pinyin2", str, ""], Args["pinyin2", str, ""],
@@ -237,6 +271,8 @@ handle_update_pinyin = on_alconna(
Args["pinyin4", str, ""], Args["pinyin4", str, ""],
), ),
aliases=( aliases=(
"更正拼音",
"更正成语拼音",
"更新猜成语词库拼音", "更新猜成语词库拼音",
"猜成语更新拼音", "猜成语更新拼音",
"更新猜成语拼音", "更新猜成语拼音",
@@ -250,7 +286,7 @@ handle_update_pinyin = on_alconna(
) )
@handle_update_pinyin.handle() @handle_update_pinyin_matcher.handle()
async def _( async def _(
result: Arparma, result: Arparma,
): ):
@@ -262,28 +298,23 @@ async def _(
and (pinyin3 := result.main_args["pinyin3"]) and (pinyin3 := result.main_args["pinyin3"])
and (pinyin4 := result.main_args["pinyin4"]) and (pinyin4 := result.main_args["pinyin4"])
): ):
await handle_update_pinyin.finish( await handle_update_pinyin_matcher.finish(
"用法:更正成语拼音 <成语> <拼音1> <拼音2> <拼音3> <拼音4>" "用法:更正拼音 <成语> <拼音1> <拼音2> <拼音3> <拼音4>"
) )
if idiom not in HANDLE_LEGAL_PHRASES: if idiom not in HANDLE_LEGAL_PHRASES:
await handle_update_pinyin.finish( await handle_update_pinyin_matcher.finish(
"未在词库中找到该成语,请使用 `新成语 <成语>` 来添加成语" "未在词库中找到该成语,请使用 `新成语 <成语>` 来添加成语"
) )
_, explanation, pinyin_now = wordbase_updater( _, explanation, pinyin_now = wordbase_updater(
idiom, idiom,
explanation=None, explanation=None,
pinyin=[ pinyin=v_to_u([pinyin1, pinyin2, pinyin3, pinyin4]),
pinyin1,
pinyin2,
pinyin3,
pinyin4,
],
hard=None, hard=None,
) )
await handle_update_idiom.finish( await handle_update_pinyin_matcher.finish(
"成功修改:{}\n当前词库总数:{}个,普通模式成语:{}\n当前成语信息如下:{}".format( "成功修改:{}\n当前词库总数:{}个,普通模式成语:{}\n当前成语信息如下:{}".format(
idiom, idiom,
len(HANDLE_LEGAL_PHRASES), len(HANDLE_LEGAL_PHRASES),
@@ -293,9 +324,11 @@ async def _(
) )
handle_update_idiom = on_alconna( # ————
handle_update_idiom_matcher = on_alconna(
Alconna( Alconna(
"新成语", "handle_update_idiom",
Option( Option(
"-e|--explanation", "-e|--explanation",
default="", default="",
@@ -308,7 +341,7 @@ handle_update_idiom = on_alconna(
), ),
Args["idiom", str, ""], Args["idiom", str, ""],
), ),
aliases=("新增成语", "猜成语新增成语"), aliases=("成语", "增成语", "猜成语新增成语"),
use_cmd_start=True, use_cmd_start=True,
permission=SUPERUSER, permission=SUPERUSER,
block=True, block=True,
@@ -316,13 +349,13 @@ handle_update_idiom = on_alconna(
) )
@handle_update_idiom.handle() @handle_update_idiom_matcher.handle()
async def _( async def _(
result: Arparma, result: Arparma,
): ):
if not (idiom := result.main_args["idiom"]): if not (idiom := result.main_args["idiom"]):
await handle_update_idiom.finish( await handle_update_idiom_matcher.finish(
"用法:新成语 <成语> [-e|--explanation <释义>] [-d|--difficult]" "用法:新成语 <成语> [-e|--explanation <释义>] [-d|--difficult]"
) )
@@ -333,7 +366,7 @@ async def _(
hard=(hard := result.options["difficult"].value), hard=(hard := result.options["difficult"].value),
) )
await handle_update_idiom.finish( await handle_update_idiom_matcher.finish(
"成功{}[{}词汇]{}\n当前词库总数:{}个,普通模式成语:{}\n当前成语信息如下:{}".format( "成功{}[{}词汇]{}\n当前词库总数:{}个,普通模式成语:{}\n当前成语信息如下:{}".format(
"修改" if existance else "新增", "修改" if existance else "新增",
"困难" if hard else "普通", "困难" if hard else "普通",
@@ -345,9 +378,11 @@ async def _(
) )
handle_answer = on_alconna( # ————
handle_answer_matcher = on_alconna(
Alconna( Alconna(
"成语答案", "handle_answer",
Option( Option(
"-g|--group", "-g|--group",
default="Now", default="Now",
@@ -359,7 +394,7 @@ handle_answer = on_alconna(
action=store_true, action=store_true,
), ),
), ),
aliases=("handle-answer", "猜成语答案"), aliases=("成语答案", "猜成语答案"),
# rule=game_is_running, # rule=game_is_running,
use_cmd_start=True, use_cmd_start=True,
permission=SUPERUSER, permission=SUPERUSER,
@@ -368,37 +403,43 @@ handle_answer = on_alconna(
) )
@handle_answer.handle() @handle_answer_matcher.handle()
async def _( async def _(
result: Arparma, result: Arparma,
user_id: UserId, user_id: UserId,
bot: Bot,
): ):
if result.options["list"].value: if handle_config.handle_superuser_get_answer:
await handle_answer.finish(
UniMessage.text( if result.options["list"].value:
"\n".join( await handle_answer_matcher.finish(
"{},答案“{}".format(i.split("_")[-1], j.idiom) UniMessage.text(
for i, j in games.items() "\n".join(
"`{}`,答案“{}".format(i, j.idiom) for i, j in games.items()
)
) )
) )
) return
return
try: try:
if result.options["group"].args["group"] == "Now": if result.options["group"].args["group"] == "Now":
session_numstr = user_id
else:
await handle_answer_matcher.finish(
UniMessage.text("暂不支持指定群组") # TODO: 指定群组获取答案
)
except:
session_numstr = user_id session_numstr = user_id
else:
session_numstr = "qq_OneBot V11_{}_{}".format(
bot.self_id, result.options["group"].args["group"]
)
except:
session_numstr = user_id
if session_numstr in games.keys(): if session_numstr in games.keys():
await handle_answer.finish(UniMessage.text(games[session_numstr].idiom)) await handle_answer_matcher.finish(
UniMessage.text(games[session_numstr].idiom)
)
else:
await handle_answer_matcher.finish(
UniMessage.text("{} 不存在开局的游戏".format(session_numstr))
)
else: else:
await handle_answer.finish( await handle_answer_matcher.finish(UniMessage.text("本机器人不许可超管作弊"))
UniMessage.text("{} 不存在开局的游戏".format(session_numstr))
)
@@ -4,7 +4,13 @@ from pydantic import BaseModel
class Config(BaseModel): class Config(BaseModel):
handle_strict_mode: bool = False handle_strict_mode: bool = False
"""开启严格模式,强制校验成语是否在词库中"""
handle_color_enhance: bool = False handle_color_enhance: bool = False
"""生成的图片采用更鲜艳的颜色"""
handle_superuser_get_answer: bool = True
"""超级用户可以查看当局答案"""
handle_require_tome: bool = True
"""玩家必须@机器人"""
handle_config = get_plugin_config(Config) handle_config = get_plugin_config(Config)
@@ -2718,7 +2718,7 @@
] ]
}, },
"八方呼应": { "八方呼应": {
"explanation": "呼应彼此声气相通。泛指周围、各地。形容各方面互通声气,互相配合。", "explanation": "呼应彼此声气相通。八方:泛指周围、各地。形容各方面互通声气,互相配合。",
"pinyin": [ "pinyin": [
"ba1", "ba1",
"fang1", "fang1",
@@ -9189,7 +9189,7 @@
] ]
}, },
"奔走呼号": { "奔走呼号": {
"explanation": "奔走奔跑号叫喊。一面奔跑,一面呼唤。形容处于困境而求援。", "explanation": "奔走奔跑号叫喊。一面奔跑,一面呼唤。形容处于困境而求援。",
"pinyin": [ "pinyin": [
"ben1", "ben1",
"zou3", "zou3",
@@ -81081,7 +81081,7 @@
] ]
}, },
"呼庚呼癸": { "呼庚呼癸": {
"explanation": "庚、癸军粮的隐语。原是军中乞粮的隐语指向人借钱。", "explanation": "庚、癸军粮的隐语。原是军中乞粮的隐语,现指向人借钱。",
"pinyin": [ "pinyin": [
"hu1", "hu1",
"geng1", "geng1",
@@ -81090,7 +81090,7 @@
] ]
}, },
"呼来喝去": { "呼来喝去": {
"explanation": "呼、喝大声喊叫之即来,喝之即去。形容随意驱使。", "explanation": "呼、喝大声喊叫,呼之即来,喝之即去。形容随意驱使。",
"pinyin": [ "pinyin": [
"hu1", "hu1",
"lai2", "lai2",
@@ -205353,7 +205353,7 @@
] ]
}, },
"息息相关": { "息息相关": {
"explanation": "息呼吸时进出的气◆吸也相互关联。形容彼此的关系非常密切。", "explanation": "息呼吸时进出的气,相关:相互关联。形容彼此的关系非常密切。",
"pinyin": [ "pinyin": [
"xi1", "xi1",
"xi1", "xi1",
@@ -4,9 +4,6 @@ from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import Dict, List, Tuple, TypedDict, Optional from typing import Dict, List, Tuple, TypedDict, Optional
# from watchdog.observers import Observer
# from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from PIL import ImageFont from PIL import ImageFont
from PIL.Image import Image as IMG from PIL.Image import Image as IMG
from PIL.ImageFont import FreeTypeFont from PIL.ImageFont import FreeTypeFont
@@ -39,35 +36,13 @@ HANDLE_ANSWER_PHRASES: Dict[str, IdiomEntry] = json.load(
handle_answer_path.open("r", encoding="utf-8") handle_answer_path.open("r", encoding="utf-8")
) )
# class LegalPhrasesModifiedHandler(FileSystemEventHandler):
# """
# Handler for resource file changes
# """
# def on_modified(self, event): def v_to_u(v_strings: List[str]) -> List[str]:
# print(f"{event.src_path} modified, reloading resource...") """
# if "idioms.txt" in event.src_path: 将韵母的v转为ü
# global LEGAL_PHRASES """
# LEGAL_PHRASES = [
# idiom.strip()
# for idiom in idiom_path.open("r", encoding="utf-8").readlines()
# ]
# elif "answers.json" in event.src_path:
# global ANSWER_PHRASES
# ANSWER_PHRASES = json.load(
# answer_path.open("r", encoding="utf-8")
# )
return [v_str.replace("v", "ü") for v_str in v_strings]
# Observer().schedule(
# LegalPhrasesModifiedHandler(),
# data_dir,
# recursive=False,
# event_filter=FileModifiedEvent,
# )
# 答案转换器
# json.dump({ i["word"]:{"explanation":i["explanation"], "pinyin":(lambda x : [i for j in pinyin(x, style=Style.TONE3, v_to_u=True) for i in j])(i["word"])} for i in json.load(open("answers_hard-old.json",encoding="utf-8"))},open("answers.json", "w",encoding="utf-8"), ensure_ascii=False, indent=4)
def wordbase_updater( def wordbase_updater(
@@ -152,6 +127,7 @@ def wordbase_updater(
def legal_idiom(word: str) -> bool: def legal_idiom(word: str) -> bool:
return word in HANDLE_LEGAL_PHRASES return word in HANDLE_LEGAL_PHRASES
def random_idiom(is_hard: bool = False) -> Tuple[str, str]: def random_idiom(is_hard: bool = False) -> Tuple[str, str]:
answer = random.choice(HANDLE_LEGAL_PHRASES if is_hard else HANDLE_COMMON_PHRASES) answer = random.choice(HANDLE_LEGAL_PHRASES if is_hard else HANDLE_COMMON_PHRASES)
return answer, HANDLE_ANSWER_PHRASES[answer]["explanation"] return answer, HANDLE_ANSWER_PHRASES[answer]["explanation"]