mirror of
https://github.com/ChenXu233/nonebot_plugin_dialectlist.git
synced 2025-09-06 20:46:25 +00:00
🔥 ♻️ 暂时停止图片支持+依照词云重写
This commit is contained in:
@ -1,185 +1,265 @@
|
||||
import re
|
||||
import time
|
||||
from typing import Tuple, Union
|
||||
from datetime import datetime, timedelta
|
||||
from nonebot import require
|
||||
|
||||
from nonebot import on_command, require
|
||||
require("nonebot_plugin_chatrecorder")
|
||||
require("nonebot_plugin_apscheduler")
|
||||
require("nonebot_plugin_userinfo")
|
||||
require("nonebot_plugin_alconna")
|
||||
require("nonebot_plugin_cesaa")
|
||||
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
|
||||
import nonebot_plugin_saa as saa
|
||||
|
||||
from typing import Tuple, Union, Optional, List
|
||||
from datetime import datetime, timedelta
|
||||
from arclet.alconna import ArparmaBehavior
|
||||
from arclet.alconna.arparma import Arparma
|
||||
|
||||
from nonebot import on_command, get_driver
|
||||
from nonebot.log import logger
|
||||
from nonebot.params import Command, CommandArg, Arg, Depends
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters.onebot import V11Bot, V12Bot, V11Event, V12Event, V11Message, V12Message # type: ignore
|
||||
from nonebot import get_driver
|
||||
from nonebot.adapters import Bot, Event, Message
|
||||
from nonebot.params import Arg, Depends
|
||||
from nonebot.permission import SUPERUSER
|
||||
from nonebot.plugin import PluginMetadata, inherit_supported_adapters
|
||||
from nonebot.typing import T_State
|
||||
from nonebot_plugin_alconna import (
|
||||
Alconna,
|
||||
AlconnaMatch,
|
||||
AlconnaMatcher,
|
||||
AlconnaQuery,
|
||||
Args,
|
||||
Match,
|
||||
Option,
|
||||
Query,
|
||||
image_fetch,
|
||||
on_alconna,
|
||||
store_true,
|
||||
)
|
||||
|
||||
try:
|
||||
from zoneinfo import ZoneInfo
|
||||
except ImportError:
|
||||
from backports.zoneinfo import ZoneInfo # type: ignore
|
||||
|
||||
require("nonebot_plugin_chatrecorder")
|
||||
from nonebot_plugin_chatrecorder import get_message_records
|
||||
|
||||
from .function import *
|
||||
from .config import plugin_config
|
||||
from nonebot_plugin_userinfo import EventUserInfo, UserInfo, get_user_info
|
||||
from nonebot_plugin_session import Session, SessionIdType, extract_session
|
||||
from nonebot_plugin_cesaa import get_messages_plain_text
|
||||
|
||||
|
||||
ranks = on_command(
|
||||
"群话痨排行榜",
|
||||
aliases={
|
||||
"今日群话痨排行榜",
|
||||
"昨日群话痨排行榜",
|
||||
"本周群话痨排行榜",
|
||||
"上周群话痨排行榜",
|
||||
"本月群话痨排行榜",
|
||||
"年度群话痨排行榜",
|
||||
"历史群话痨排行榜",
|
||||
},
|
||||
priority=6,
|
||||
block=True,
|
||||
# from . import migrations #抄词云的部分代码,还不知道这有什么用
|
||||
# from .function import *
|
||||
from .config import Config, plugin_config
|
||||
from .utils import (
|
||||
get_datetime_fromisoformat_with_timezone,
|
||||
get_datetime_now_with_timezone,
|
||||
got_rank,
|
||||
msg_counter,
|
||||
persist_id2user_id,
|
||||
)
|
||||
|
||||
with open(os.path.dirname(__file__) + "/usage.md") as f:
|
||||
usage = f.read()
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="B话排行榜",
|
||||
description="调查群U的B话数量,以一定的顺序排序后排序出来。",
|
||||
usage=usage,
|
||||
homepage="https://github.com/ChenXu233/nonebot_plugin_dialectlist",
|
||||
type="application",
|
||||
supported_adapters=inherit_supported_adapters(
|
||||
"nonebot_plugin_chatrecorder", "nonebot_plugin_saa", "nonebot_plugin_alconna"
|
||||
),
|
||||
config=Config,
|
||||
# extra={"orm_version_location": migrations},
|
||||
)
|
||||
|
||||
|
||||
@ranks.handle()
|
||||
# 抄的词云,不过真的很适合B话榜。
|
||||
class SameTime(ArparmaBehavior):
|
||||
def operate(self, interface: Arparma):
|
||||
type = interface.query("type")
|
||||
time = interface.query("time")
|
||||
if type is None and time:
|
||||
interface.behave_fail()
|
||||
|
||||
|
||||
rank_cmd = on_alconna(
|
||||
Alconna(
|
||||
"B话榜",
|
||||
Args["type?", ["今日", "昨日", "本周", "上周", "本月", "上月", "年度", "历史"]][
|
||||
"time?", str
|
||||
],
|
||||
behaviors=[SameTime()],
|
||||
),
|
||||
use_cmd_start=True,
|
||||
)
|
||||
|
||||
|
||||
def wrapper(slot: Union[int, str], content: Optional[str]) -> str:
|
||||
if slot == "type" and content:
|
||||
return content
|
||||
return "" # pragma: no cover
|
||||
|
||||
|
||||
rank_cmd.shortcut(
|
||||
r"(?P<type>今日|昨日|本周|上周|本月|上月|年度|历史)B话榜",
|
||||
{
|
||||
"prefix": True,
|
||||
"command": "B话榜 ",
|
||||
"wrapper": wrapper,
|
||||
"args": ["{type}"],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def parse_datetime(key: str):
|
||||
"""解析数字,并将结果存入 state 中"""
|
||||
|
||||
async def _key_parser(
|
||||
matcher: AlconnaMatcher,
|
||||
state: T_State,
|
||||
input: Union[datetime, Message] = Arg(key),
|
||||
):
|
||||
if isinstance(input, datetime):
|
||||
return
|
||||
|
||||
plaintext = input.extract_plain_text()
|
||||
try:
|
||||
state[key] = get_datetime_fromisoformat_with_timezone(plaintext)
|
||||
except ValueError:
|
||||
await matcher.reject_arg(key, "请输入正确的日期,不然我没法理解呢!")
|
||||
|
||||
return _key_parser
|
||||
|
||||
|
||||
# TODO 处理函数更新
|
||||
# 参考词云
|
||||
|
||||
|
||||
# 这段函数完全抄的词云
|
||||
@rank_cmd.handle()
|
||||
async def _group_message(
|
||||
matcher: Matcher,
|
||||
event: Union[
|
||||
V11Event.GroupMessageEvent,
|
||||
V12Event.GroupMessageEvent,
|
||||
V12Event.ChannelMessageEvent,
|
||||
],
|
||||
state: T_State,
|
||||
commands: Tuple[str, ...] = Command(),
|
||||
args: Union[V11Message, V11Message] = CommandArg(),
|
||||
type: Optional[str] = None,
|
||||
time: Optional[str] = None,
|
||||
):
|
||||
if isinstance(event, V11Event.GroupMessageEvent):
|
||||
logger.debug("handle command from onebotV11 adapter(qq)")
|
||||
elif isinstance(event, V12Event.GroupMessageEvent):
|
||||
logger.debug("handle command from onebotV12 adapter")
|
||||
|
||||
dt = get_datetime_now_with_timezone()
|
||||
command = commands[0]
|
||||
|
||||
if command == "群话痨排行榜":
|
||||
state["start"] = dt.replace(
|
||||
year=2000, month=1, day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
state["stop"] = dt
|
||||
elif command == "今日群话痨排行榜":
|
||||
if not type:
|
||||
await rank_cmd.finish(__plugin_meta__.usage)
|
||||
|
||||
dt = get_datetime_now_with_timezone()
|
||||
|
||||
if type == "今日":
|
||||
state["start"] = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
state["stop"] = dt
|
||||
elif command == "昨日群话痨排行榜":
|
||||
elif type == "昨日":
|
||||
state["stop"] = dt.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
state["start"] = state["stop"] - timedelta(days=1)
|
||||
elif command == "前日群话痨排行榜":
|
||||
state["stop"] = dt.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(days=1)
|
||||
state["start"] = state["stop"] - timedelta(days=1)
|
||||
elif command == "本周群话痨排行榜":
|
||||
elif type == "本周":
|
||||
state["start"] = dt.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(days=dt.weekday())
|
||||
state["stop"] = dt
|
||||
elif command == "上周群话痨排行榜":
|
||||
state["start"] = dt.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(days=dt.weekday() + 7)
|
||||
elif type == "上周":
|
||||
state["stop"] = dt.replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(days=dt.weekday())
|
||||
elif command == "本月群话痨排行榜":
|
||||
state["start"] = state["stop"] - timedelta(days=7)
|
||||
elif type == "本月":
|
||||
state["start"] = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||||
state["stop"] = dt
|
||||
elif command == "年度群话痨排行榜":
|
||||
elif type == "上月":
|
||||
state["stop"] = dt.replace(
|
||||
day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
) - timedelta(microseconds=1)
|
||||
state["start"] = state["stop"].replace(
|
||||
day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
elif type == "年度":
|
||||
state["start"] = dt.replace(
|
||||
month=1, day=1, hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
state["stop"] = dt
|
||||
elif command == "历史群话痨排行榜":
|
||||
plaintext = args.extract_plain_text().strip()
|
||||
match = re.match(r"^(.+?)(?:~(.+))?$", plaintext)
|
||||
if match:
|
||||
start = match.group(1)
|
||||
stop = match.group(2)
|
||||
try:
|
||||
state["start"] = get_datetime_fromisoformat_with_timezone(start)
|
||||
if stop:
|
||||
state["stop"] = get_datetime_fromisoformat_with_timezone(stop)
|
||||
else:
|
||||
# 如果没有指定结束日期,则认为是指查询这一天的数据
|
||||
state["start"] = state["start"].replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
state["stop"] = state["start"] + timedelta(days=1)
|
||||
except ValueError:
|
||||
await matcher.finish("请输入正确的日期,不然我没法理解呢!")
|
||||
else:
|
||||
pass
|
||||
elif type == "历史":
|
||||
if time:
|
||||
plaintext = time
|
||||
if match := re.match(r"^(.+?)(?:~(.+))?$", plaintext):
|
||||
start = match[1]
|
||||
stop = match[2]
|
||||
try:
|
||||
state["start"] = get_datetime_fromisoformat_with_timezone(start)
|
||||
if stop:
|
||||
state["stop"] = get_datetime_fromisoformat_with_timezone(stop)
|
||||
else:
|
||||
# 如果没有指定结束日期,则认为是所给日期的当天的词云
|
||||
state["start"] = state["start"].replace(
|
||||
hour=0, minute=0, second=0, microsecond=0
|
||||
)
|
||||
state["stop"] = state["start"] + timedelta(days=1)
|
||||
except ValueError:
|
||||
await rank_cmd.finish("请输入正确的日期,不然我没法理解呢!")
|
||||
|
||||
|
||||
@ranks.handle()
|
||||
async def _private_message(
|
||||
matcher: Matcher,
|
||||
event: Union[V11Event.PrivateMessageEvent, V12Event.PrivateMessageEvent],
|
||||
state: T_State,
|
||||
commands: Tuple[str, ...] = Command(),
|
||||
args: Union[V11Message, V12Message] = CommandArg(),
|
||||
):
|
||||
# TODO:支持私聊的查询
|
||||
await matcher.finish("暂不支持私聊查询,今后可能会添加这一项功能")
|
||||
|
||||
|
||||
@ranks.got(
|
||||
@rank_cmd.got(
|
||||
"start",
|
||||
prompt="请输入你要查询的起始日期(如 2022-01-01)",
|
||||
parameterless=[Depends(parse_datetime("start"))],
|
||||
)
|
||||
@ranks.got(
|
||||
@rank_cmd.got(
|
||||
"stop",
|
||||
prompt="请输入你要查询的结束日期(如 2022-02-22)",
|
||||
parameterless=[Depends(parse_datetime("stop"))],
|
||||
)
|
||||
async def handle_message(
|
||||
matcher: Matcher,
|
||||
bot: Union[V11Bot, V12Bot],
|
||||
event: Union[
|
||||
V11Event.GroupMessageEvent,
|
||||
V12Event.GroupMessageEvent,
|
||||
V12Event.ChannelMessageEvent,
|
||||
],
|
||||
stop: datetime = Arg(),
|
||||
async def handle_rank(
|
||||
bot: Bot,
|
||||
event: Event,
|
||||
session: Session = Depends(extract_session),
|
||||
start: datetime = Arg(),
|
||||
stop: datetime = Arg(),
|
||||
):
|
||||
st = time.time()
|
||||
|
||||
if plugin_config.dialectlist_excluded_self:
|
||||
bot_id = await bot.call_api("get_login_info")
|
||||
plugin_config.dialectlist_excluded_people.append(bot_id["user_id"])
|
||||
msg_list = await get_message_records(
|
||||
bot_ids=[str(bot.self_id)],
|
||||
platforms=['qq']
|
||||
if isinstance(event, V11Event.GroupMessageEvent)
|
||||
else [str(bot.platform)],
|
||||
group_ids=[str(event.group_id)]
|
||||
if isinstance(event, (V11Event.GroupMessageEvent, V12Event.GroupMessageEvent))
|
||||
else None,
|
||||
guild_ids=[str(event.guild_id)]
|
||||
if isinstance(event, V12Event.ChannelMessageEvent)
|
||||
else None,
|
||||
exclude_user_ids=plugin_config.dialectlist_excluded_people,
|
||||
time_start=start.astimezone(ZoneInfo("UTC")),
|
||||
time_stop=stop.astimezone(ZoneInfo("UTC")),
|
||||
"""生成词云"""
|
||||
messages = await get_message_records(
|
||||
session=session,
|
||||
id_type=SessionIdType.GROUP,
|
||||
include_bot_id=False,
|
||||
include_bot_type=False,
|
||||
types=["message"], # 排除机器人自己发的消息
|
||||
time_start=start,
|
||||
time_stop=stop,
|
||||
exclude_id1s=plugin_config.excluded_people,
|
||||
)
|
||||
for i in msg_list:
|
||||
logger.debug(i.plain_text)
|
||||
|
||||
if isinstance(event, V11Event.GroupMessageEvent):
|
||||
processer = V11GroupMsgProcesser(bot=bot, gid=str(event.group_id), msg_list=msg_list) # type: ignore
|
||||
elif isinstance(event, V12Event.GroupMessageEvent):
|
||||
processer = V12GroupMsgProcesser(bot=bot, gid=str(event.group_id), msg_list=msg_list) # type: ignore
|
||||
elif isinstance(event, V12Event.ChannelMessageEvent):
|
||||
processer = V12GuildMsgProcesser(bot=bot, gid=str(event.guild_id), msg_list=msg_list) # type: ignore
|
||||
else:
|
||||
raise NotImplementedError("没支持呢(())")
|
||||
rank = got_rank(msg_counter(messages))
|
||||
ids = await persist_id2user_id([int(i[0]) for i in rank])
|
||||
for i in range(len(rank)):
|
||||
rank[i][0] = str(ids[i])
|
||||
|
||||
msg = await processer.get_send_msg() # type: ignore
|
||||
await matcher.send(msg)
|
||||
string: str = ""
|
||||
nicknames: List = []
|
||||
for i in rank:
|
||||
if user_info := await get_user_info(bot, event, user_id=str(i[0])):
|
||||
(
|
||||
nicknames.append(user_info.user_displayname)
|
||||
if user_info.user_displayname
|
||||
else (
|
||||
nicknames.append(user_info.user_name)
|
||||
if user_info.user_name
|
||||
else nicknames.append(user_info.user_id)
|
||||
)
|
||||
)
|
||||
else:
|
||||
nicknames.append(None)
|
||||
logger.debug(nicknames)
|
||||
for i in range(len(rank)):
|
||||
index = i + 1
|
||||
nickname, chatdatanum = nicknames[i], rank[i][1]
|
||||
str_example = plugin_config.string_format.format(
|
||||
index=index, nickname=nickname, chatdatanum=chatdatanum
|
||||
)
|
||||
string += str_example
|
||||
|
||||
await saa.Text(string).finish(reply=True)
|
||||
|
@ -1,18 +1,29 @@
|
||||
from typing import Optional, Literal, List
|
||||
from nonebot import get_driver
|
||||
from pydantic import BaseModel, Extra
|
||||
from nonebot import get_driver, get_plugin_config
|
||||
from pydantic import BaseModel, field_validator
|
||||
|
||||
|
||||
class Config(BaseModel, extra=Extra.ignore):
|
||||
timezone: Optional[str]
|
||||
dialectlist_string_format: str = "第{index}名:\n{nickname},{chatdatanum}条消息\n" # 消息格式
|
||||
dialectlist_get_num: int = 5 # 获取人数数量
|
||||
dialectlist_visualization: bool = True # 是否可视化
|
||||
dialectlist_visualization_type: Literal["饼图", "圆环图", "柱状图"] = "圆环图" # 可视化方案
|
||||
dialectlist_font: str = "SimHei" # 字体格式
|
||||
dialectlist_excluded_people: List[str] = [] # 排除的人的QQ号
|
||||
dialectlist_excluded_self: bool = True
|
||||
class ScopedConfig(BaseModel):
|
||||
font: str = "SimHei" # 字体格式
|
||||
get_num: int = 5 # 获取人数数量
|
||||
timezone: Optional[str] = "Asia/Shanghai"
|
||||
excluded_self: bool = True
|
||||
string_format: str = "第{index}名:\n{nickname},{chatdatanum}条消息\n" # 消息格式
|
||||
visualization: bool = True # 是否可视化
|
||||
excluded_people: List[str] = [] # 排除的人的QQ号
|
||||
visualization_type: Literal["饼图", "圆环图", "柱状图"] = "圆环图" # 可视化方案
|
||||
|
||||
@field_validator("get_num")
|
||||
@classmethod
|
||||
def check_priority(cls, v: int) -> int:
|
||||
if v >= 1:
|
||||
return v
|
||||
raise ValueError("表中的人数必须大于一")
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
dialectlist: ScopedConfig = ScopedConfig()
|
||||
|
||||
|
||||
global_config = get_driver().config
|
||||
plugin_config = Config(**global_config.dict())
|
||||
plugin_config = get_plugin_config(Config).dialectlist
|
||||
|
@ -1,294 +0,0 @@
|
||||
import abc
|
||||
import pygal
|
||||
import unicodedata
|
||||
import requests
|
||||
from datetime import datetime
|
||||
|
||||
from typing import List, Dict, Union
|
||||
from pygal.style import Style
|
||||
|
||||
from nonebot import require
|
||||
from nonebot.log import logger
|
||||
from nonebot.params import Arg
|
||||
from nonebot.typing import T_State
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.adapters import Bot, Message
|
||||
from nonebot.adapters.onebot import V11Bot, V12Bot, V11Message, V12Message, V11MessageSegment, V12MessageSegment # type: ignore
|
||||
from nonebot.exception import ActionFailed
|
||||
|
||||
try:
|
||||
from zoneinfo import ZoneInfo
|
||||
except ImportError:
|
||||
from backports.zoneinfo import ZoneInfo # type: ignore
|
||||
|
||||
require("nonebot_plugin_chatrecorder")
|
||||
from nonebot_plugin_chatrecorder import get_message_records
|
||||
from nonebot_plugin_chatrecorder.model import MessageRecord
|
||||
|
||||
from .config import plugin_config
|
||||
|
||||
style = Style(font_family=plugin_config.dialectlist_font)
|
||||
|
||||
|
||||
def remove_control_characters(string: str) -> str:
|
||||
"""### 将字符串中的控制符去除
|
||||
|
||||
Args:
|
||||
string (str): 需要去除的字符串
|
||||
|
||||
Returns:
|
||||
(str): 经过处理的字符串
|
||||
"""
|
||||
return "".join(ch for ch in string if unicodedata.category(ch)[0] != "C")
|
||||
|
||||
|
||||
def parse_datetime(key: str):
|
||||
"""解析数字,并将结果存入 state 中"""
|
||||
|
||||
async def _key_parser(
|
||||
matcher: Matcher,
|
||||
state: T_State,
|
||||
input: Union[datetime, Union[V11Message, V12Message]] = Arg(key),
|
||||
):
|
||||
if isinstance(input, datetime):
|
||||
return
|
||||
|
||||
plaintext = input.extract_plain_text()
|
||||
try:
|
||||
state[key] = get_datetime_fromisoformat_with_timezone(plaintext)
|
||||
except ValueError:
|
||||
await matcher.reject_arg(key, "请输入正确的日期,不然我没法理解呢!")
|
||||
|
||||
return _key_parser
|
||||
|
||||
|
||||
def get_datetime_now_with_timezone() -> datetime:
|
||||
"""获取当前时间,并包含时区信息"""
|
||||
if plugin_config.timezone:
|
||||
return datetime.now(ZoneInfo(plugin_config.timezone))
|
||||
else:
|
||||
return datetime.now().astimezone()
|
||||
|
||||
|
||||
def get_datetime_fromisoformat_with_timezone(date_string: str) -> datetime:
|
||||
"""从 iso8601 格式字符串中获取时间,并包含时区信息"""
|
||||
if plugin_config.timezone:
|
||||
return datetime.fromisoformat(date_string).astimezone(
|
||||
ZoneInfo(plugin_config.timezone)
|
||||
)
|
||||
else:
|
||||
return datetime.fromisoformat(date_string).astimezone()
|
||||
|
||||
|
||||
def msg_counter(msg_list: List[MessageRecord]) -> Dict[str, int]:
|
||||
"""### 计算每个人的消息量
|
||||
|
||||
Args:
|
||||
msg_list (list[MessageRecord]): 需要处理的消息列表
|
||||
|
||||
Returns:
|
||||
(dict[str,int]): 处理后的消息数量字典,键为用户,值为消息数量
|
||||
"""
|
||||
|
||||
lst: Dict[str, int] = {}
|
||||
msg_len = len(msg_list)
|
||||
logger.info("wow , there are {} msgs to count !!!".format(msg_len))
|
||||
|
||||
for i in msg_list:
|
||||
try:
|
||||
lst[i.user_id] += 1
|
||||
except KeyError:
|
||||
lst[i.user_id] = 1
|
||||
|
||||
logger.debug(lst)
|
||||
|
||||
return lst
|
||||
|
||||
|
||||
def got_rank(msg_dict: Dict[str, int]) -> List[List[Union[str, int]]]:
|
||||
"""### 获得排行榜
|
||||
|
||||
Args:
|
||||
msg_dict (Dict[str,int]): 要处理的字典
|
||||
|
||||
Returns:
|
||||
List[Tuple[str,int]]: 排行榜列表(已排序)
|
||||
"""
|
||||
rank = []
|
||||
while len(rank) < plugin_config.dialectlist_get_num:
|
||||
try:
|
||||
max_key = max(msg_dict.items(), key=lambda x: x[1])
|
||||
rank.append(list(max_key))
|
||||
msg_dict.pop(max_key[0])
|
||||
except ValueError:
|
||||
rank.append(["null", 0])
|
||||
continue
|
||||
|
||||
return rank
|
||||
|
||||
|
||||
class MsgProcesser(abc.ABC):
|
||||
def __init__(self, bot: Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
if isinstance(bot, Bot):
|
||||
self.bot = bot
|
||||
else:
|
||||
self.bot = None
|
||||
self.gid = gid
|
||||
self.rank = got_rank(msg_counter(msg_list))
|
||||
|
||||
@abc.abstractmethod
|
||||
async def get_nickname_list(self) -> List:
|
||||
"""
|
||||
### 获得昵称
|
||||
#### 抽象原因
|
||||
要对onebot协议不同版本进行适配
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_head_portrait_urls(self) -> List:
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
async def get_send_msg(self) -> Message:
|
||||
raise NotImplementedError
|
||||
|
||||
async def get_msg(self) -> List[Union[str, bytes, None]]:
|
||||
str_msg = await self.render_template_msg()
|
||||
pic_msg = None
|
||||
if plugin_config.dialectlist_visualization:
|
||||
try:
|
||||
pic_msg = await self.render_template_pic()
|
||||
except OSError:
|
||||
plugin_config.dialectlist_visualization = False
|
||||
str_msg += "\n\n无法发送可视化图片,请检查是否安装GTK+,详细安装教程可见github\nhttps://github.com/tschoonj/GTK-for-Windows-Runtime-Environment-Installer \n若不想安装这个软件,再次使用这个指令不会显示这个提示"
|
||||
return [str_msg, pic_msg]
|
||||
|
||||
async def render_template_msg(self) -> str:
|
||||
"""渲染文字"""
|
||||
string: str = ""
|
||||
rank: List = self.rank
|
||||
nicknames: List = await self.get_nickname_list()
|
||||
for i in range(len(rank)):
|
||||
index = i + 1
|
||||
nickname, chatdatanum = nicknames[i], rank[i][1]
|
||||
str_example = plugin_config.dialectlist_string_format.format(
|
||||
index=index, nickname=nickname, chatdatanum=chatdatanum
|
||||
)
|
||||
string += str_example
|
||||
|
||||
return string
|
||||
|
||||
async def render_template_pic(self) -> bytes:
|
||||
if plugin_config.dialectlist_visualization_type == "圆环图":
|
||||
view = pygal.Pie(inner_radius=0.6, style=style)
|
||||
elif plugin_config.dialectlist_visualization_type == "饼图":
|
||||
view = pygal.Pie(style=style)
|
||||
else:
|
||||
view = pygal.Bar(style=style)
|
||||
|
||||
view.title = "消息可视化"
|
||||
for i, j in zip(self.rank, await self.get_nickname_list()): # type: ignore
|
||||
view.add(str(j), int(i[1]))
|
||||
|
||||
png: bytes = view.render_to_png() # type: ignore
|
||||
self.img = png
|
||||
return png
|
||||
|
||||
|
||||
class V11GroupMsgProcesser(MsgProcesser):
|
||||
def __init__(self, bot: V11Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
super().__init__(bot, gid, msg_list)
|
||||
self.bot = bot
|
||||
|
||||
async def get_nickname_list(self) -> List:
|
||||
nicknames = []
|
||||
for i in range(len(self.rank)):
|
||||
try:
|
||||
member_info = await self.bot.get_group_member_info(
|
||||
group_id=int(self.gid), user_id=int(self.rank[i][0]), no_cache=True
|
||||
)
|
||||
nickname=(
|
||||
member_info["nickname"]
|
||||
if not member_info["card"]
|
||||
else member_info["card"]
|
||||
)
|
||||
nicknames.append(remove_control_characters(nickname))
|
||||
except (ActionFailed,ValueError) as e:
|
||||
nicknames.append("{}这家伙不在群里了".format(self.rank[i][0]))
|
||||
|
||||
return nicknames
|
||||
|
||||
def get_head_portrait_urls(self) -> List:
|
||||
self.portrait_urls = [
|
||||
"http://q2.qlogo.cn/headimg_dl?dst_uin={}&spec=640".format(i[0])
|
||||
for i in self.rank
|
||||
]
|
||||
return self.portrait_urls
|
||||
|
||||
async def get_send_msg(self) -> V11Message:
|
||||
msgs: List = await self.get_msg()
|
||||
msg = V11Message()
|
||||
msg += V11MessageSegment.text(msgs[0]) # type: ignore
|
||||
msg += V11MessageSegment.image(msgs[1]) # type: ignore
|
||||
return msg
|
||||
|
||||
|
||||
class V12MsgProcesser(MsgProcesser):
|
||||
def __init__(self, bot: V12Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
super().__init__(bot, gid, msg_list)
|
||||
self.bot = bot
|
||||
|
||||
async def get_send_msg(self) -> V12Message:
|
||||
msgs: List = await self.get_msg()
|
||||
msg = V12Message()
|
||||
msg += V12MessageSegment.text(msgs[0]) # type: ignore
|
||||
msg += V12MessageSegment.image(msgs[1]) # type: ignore
|
||||
return msg
|
||||
|
||||
def get_head_portrait_urls(self) -> List:
|
||||
return super().get_head_portrait_urls()
|
||||
|
||||
|
||||
class V12GroupMsgProcesser(V12MsgProcesser):
|
||||
def __init__(self, bot: V12Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
super().__init__(bot, gid, msg_list)
|
||||
|
||||
async def get_nickname_list(self) -> List:
|
||||
nicknames = []
|
||||
for i in range(len(self.rank)):
|
||||
try:
|
||||
member_info = await self.bot.get_group_member_info(
|
||||
group_id=str(self.gid), user_id=str(self.rank[i][0]), no_cache=True
|
||||
)
|
||||
nickname=(
|
||||
member_info["user_displayname"]
|
||||
if member_info["user_displayname"]
|
||||
else member_info["user_name"]
|
||||
)
|
||||
nicknames.append(remove_control_characters(nickname))
|
||||
except ActionFailed as e:
|
||||
nicknames.append("{}这家伙不在群里了".format(self.rank[i][0]))
|
||||
return nicknames
|
||||
|
||||
|
||||
class V12GuildMsgProcesser(V12MsgProcesser):
|
||||
def __init__(self, bot: V12Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
super().__init__(bot, gid, msg_list)
|
||||
|
||||
async def get_nickname_list(self) -> List:
|
||||
nicknames = []
|
||||
for i in range(len(self.rank)):
|
||||
try:
|
||||
member_info = await self.bot.get_guild_member_info(
|
||||
guild_id=str(self.gid), user_id=str(self.rank[i][0]), no_cache=True
|
||||
)
|
||||
nickname=(
|
||||
member_info["user_displayname"]
|
||||
if member_info["user_displayname"]
|
||||
else member_info["user_name"]
|
||||
)
|
||||
nicknames.append(remove_control_characters(nickname))
|
||||
except ActionFailed as e:
|
||||
nicknames.append("{}这家伙不在群里了".format(self.rank[i][0]))
|
||||
return nicknames
|
0
nonebot_plugin_dialectlist/usage.md
Normal file
0
nonebot_plugin_dialectlist/usage.md
Normal file
442
nonebot_plugin_dialectlist/utils.py
Normal file
442
nonebot_plugin_dialectlist/utils.py
Normal file
@ -0,0 +1,442 @@
|
||||
import contextlib
|
||||
from datetime import datetime, time, tzinfo
|
||||
from typing import Optional, Dict, List, Union
|
||||
from zoneinfo import ZoneInfo
|
||||
from sqlalchemy import or_, select
|
||||
from sqlalchemy.sql import ColumnElement
|
||||
|
||||
from nonebot.log import logger
|
||||
from nonebot.params import Depends
|
||||
from nonebot.compat import model_dump
|
||||
from nonebot.matcher import Matcher
|
||||
|
||||
# from nonebot.permission import SUPERUSER
|
||||
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_saa import PlatformTarget, get_target
|
||||
from nonebot_plugin_session import Session, SessionLevel, extract_session
|
||||
from nonebot_plugin_session_orm import SessionModel
|
||||
from nonebot_plugin_userinfo import EventUserInfo, UserInfo
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
from nonebot_plugin_chatrecorder import MessageRecord
|
||||
|
||||
|
||||
from .config import plugin_config
|
||||
|
||||
|
||||
def get_datetime_now_with_timezone() -> datetime:
|
||||
"""获取当前时间,并包含时区信息"""
|
||||
if plugin_config.timezone:
|
||||
return datetime.now(ZoneInfo(plugin_config.timezone))
|
||||
else:
|
||||
return datetime.now().astimezone()
|
||||
|
||||
|
||||
def get_datetime_fromisoformat_with_timezone(date_string: str) -> datetime:
|
||||
"""从 ISO-8601 格式字符串中获取时间,并包含时区信息"""
|
||||
if not plugin_config.timezone:
|
||||
return datetime.fromisoformat(date_string).astimezone()
|
||||
raw = datetime.fromisoformat(date_string)
|
||||
return (
|
||||
raw.astimezone(ZoneInfo(plugin_config.timezone))
|
||||
if raw.tzinfo
|
||||
else raw.replace(tzinfo=ZoneInfo(plugin_config.timezone))
|
||||
)
|
||||
|
||||
|
||||
def time_astimezone(time: time, tz: Optional[tzinfo] = None) -> time:
|
||||
"""将 time 对象转换为指定时区的 time 对象
|
||||
|
||||
如果 tz 为 None,则转换为本地时区
|
||||
"""
|
||||
local_time = datetime.combine(datetime.today(), time)
|
||||
return local_time.astimezone(tz).timetz()
|
||||
|
||||
|
||||
def get_time_fromisoformat_with_timezone(time_string: str) -> time:
|
||||
"""从 iso8601 格式字符串中获取时间,并包含时区信息"""
|
||||
if not plugin_config.timezone:
|
||||
return time_astimezone(time.fromisoformat(time_string))
|
||||
raw = time.fromisoformat(time_string)
|
||||
return (
|
||||
time_astimezone(raw, ZoneInfo(plugin_config.timezone))
|
||||
if raw.tzinfo
|
||||
else raw.replace(tzinfo=ZoneInfo(plugin_config.timezone))
|
||||
)
|
||||
|
||||
|
||||
def get_time_with_scheduler_timezone(time: time) -> time:
|
||||
"""获取转换到 APScheduler 时区的时间"""
|
||||
return time_astimezone(time, scheduler.timezone)
|
||||
|
||||
|
||||
# 暂时不做考虑
|
||||
# def admin_permission():
|
||||
# permission = SUPERUSER
|
||||
# with contextlib.suppress(ImportError):
|
||||
# from nonebot.adapters.onebot.v11.permission import GROUP_ADMIN, GROUP_OWNER
|
||||
|
||||
# permission = permission | GROUP_ADMIN | GROUP_OWNER
|
||||
|
||||
# return permission
|
||||
|
||||
|
||||
async def ensure_group(matcher: Matcher, session: Session = Depends(extract_session)):
|
||||
"""确保在群组中使用"""
|
||||
if session.level not in [SessionLevel.LEVEL2, SessionLevel.LEVEL3]:
|
||||
await matcher.finish("请在群组中使用!")
|
||||
|
||||
|
||||
async def persist_id2user_id(ids: List) -> List[str]:
|
||||
whereclause: List[ColumnElement[bool]] = []
|
||||
whereclause.append(or_(*[SessionModel.id == id for id in ids]))
|
||||
statement = (
|
||||
select(SessionModel).where(*whereclause)
|
||||
# .join(SessionModel, SessionModel.id == MessageRecord.session_persist_id)
|
||||
)
|
||||
async with get_session() as db_session:
|
||||
records = (await db_session.scalars(statement)).all()
|
||||
return [i.id1 for i in records]
|
||||
|
||||
|
||||
def msg_counter(msg_list: List[MessageRecord]) -> Dict[str, int]:
|
||||
"""### 计算每个人的消息量
|
||||
|
||||
Args:
|
||||
msg_list (list[MessageRecord]): 需要处理的消息列表
|
||||
|
||||
Returns:
|
||||
(dict[str,int]): 处理后的消息数量字典,键为用户,值为消息数量
|
||||
"""
|
||||
|
||||
lst: Dict[str, int] = {}
|
||||
msg_len = len(msg_list)
|
||||
logger.info("wow , there are {} msgs to count !!!".format(msg_len))
|
||||
|
||||
for i in msg_list:
|
||||
logger.debug(i.session_persist_id)
|
||||
try:
|
||||
lst[str(i.session_persist_id)] += 1
|
||||
except KeyError:
|
||||
lst[str(i.session_persist_id)] = 1
|
||||
|
||||
logger.debug(lst)
|
||||
|
||||
return lst
|
||||
|
||||
|
||||
def got_rank(msg_dict: Dict[str, int]) -> List[List[Union[str, int]]]:
|
||||
"""### 获得排行榜
|
||||
|
||||
Args:
|
||||
msg_dict (Dict[str,int]): 要处理的字典
|
||||
|
||||
Returns:
|
||||
List[Tuple[str,int]]: 排行榜列表(已排序)
|
||||
"""
|
||||
rank = []
|
||||
while len(rank) < plugin_config.get_num:
|
||||
try:
|
||||
max_key = max(msg_dict.items(), key=lambda x: x[1])
|
||||
rank.append(list(max_key))
|
||||
msg_dict.pop(max_key[0])
|
||||
except ValueError:
|
||||
break
|
||||
|
||||
return rank
|
||||
|
||||
|
||||
# import abc
|
||||
# import pygal
|
||||
# import unicodedata
|
||||
# import requests
|
||||
# from datetime import datetime
|
||||
|
||||
# from typing import List, Dict, Union
|
||||
# from pygal.style import Style
|
||||
|
||||
# from nonebot import require
|
||||
# from nonebot.log import logger
|
||||
# from nonebot.params import Arg
|
||||
# from nonebot.typing import T_State
|
||||
# from nonebot.matcher import Matcher
|
||||
# from nonebot.adapters import Bot, Message
|
||||
# from nonebot.adapters.onebot import V11Bot, V12Bot, V11Message, V12Message, V11MessageSegment, V12MessageSegment # type: ignore
|
||||
# from nonebot.exception import ActionFailed
|
||||
|
||||
# try:
|
||||
# from zoneinfo import ZoneInfo
|
||||
# except ImportError:
|
||||
# from backports.zoneinfo import ZoneInfo # type: ignore
|
||||
|
||||
# require("nonebot_plugin_chatrecorder")
|
||||
# from nonebot_plugin_chatrecorder import get_message_records
|
||||
# from nonebot_plugin_chatrecorder.model import MessageRecord
|
||||
|
||||
# from .config import plugin_config
|
||||
|
||||
# style = Style(font_family=plugin_config.dialectlist_font)
|
||||
|
||||
|
||||
# def remove_control_characters(string: str) -> str:
|
||||
# """### 将字符串中的控制符去除
|
||||
|
||||
# Args:
|
||||
# string (str): 需要去除的字符串
|
||||
|
||||
# Returns:
|
||||
# (str): 经过处理的字符串
|
||||
# """
|
||||
# return "".join(ch for ch in string if unicodedata.category(ch)[0] != "C")
|
||||
|
||||
|
||||
# def parse_datetime(key: str):
|
||||
# """解析数字,并将结果存入 state 中"""
|
||||
|
||||
# async def _key_parser(
|
||||
# matcher: Matcher,
|
||||
# state: T_State,
|
||||
# input: Union[datetime, Union[V11Message, V12Message]] = Arg(key),
|
||||
# ):
|
||||
# if isinstance(input, datetime):
|
||||
# return
|
||||
|
||||
# plaintext = input.extract_plain_text()
|
||||
# try:
|
||||
# state[key] = get_datetime_fromisoformat_with_timezone(plaintext)
|
||||
# except ValueError:
|
||||
# await matcher.reject_arg(key, "请输入正确的日期,不然我没法理解呢!")
|
||||
|
||||
# return _key_parser
|
||||
|
||||
|
||||
# def get_datetime_now_with_timezone() -> datetime:
|
||||
# """获取当前时间,并包含时区信息"""
|
||||
# if plugin_config.timezone:
|
||||
# return datetime.now(ZoneInfo(plugin_config.timezone))
|
||||
# else:
|
||||
# return datetime.now().astimezone()
|
||||
|
||||
|
||||
# def get_datetime_fromisoformat_with_timezone(date_string: str) -> datetime:
|
||||
# """从 iso8601 格式字符串中获取时间,并包含时区信息"""
|
||||
# if plugin_config.timezone:
|
||||
# return datetime.fromisoformat(date_string).astimezone(
|
||||
# ZoneInfo(plugin_config.timezone)
|
||||
# )
|
||||
# else:
|
||||
# return datetime.fromisoformat(date_string).astimezone()
|
||||
|
||||
|
||||
# def msg_counter(msg_list: List[MessageRecord]) -> Dict[str, int]:
|
||||
# """### 计算每个人的消息量
|
||||
|
||||
# Args:
|
||||
# msg_list (list[MessageRecord]): 需要处理的消息列表
|
||||
|
||||
# Returns:
|
||||
# (dict[str,int]): 处理后的消息数量字典,键为用户,值为消息数量
|
||||
# """
|
||||
|
||||
# lst: Dict[str, int] = {}
|
||||
# msg_len = len(msg_list)
|
||||
# logger.info("wow , there are {} msgs to count !!!".format(msg_len))
|
||||
|
||||
# for i in msg_list:
|
||||
# try:
|
||||
# lst[i.user_id] += 1
|
||||
# except KeyError:
|
||||
# lst[i.user_id] = 1
|
||||
|
||||
# logger.debug(lst)
|
||||
|
||||
# return lst
|
||||
|
||||
|
||||
# def got_rank(msg_dict: Dict[str, int]) -> List[List[Union[str, int]]]:
|
||||
# """### 获得排行榜
|
||||
|
||||
# Args:
|
||||
# msg_dict (Dict[str,int]): 要处理的字典
|
||||
|
||||
# Returns:
|
||||
# List[Tuple[str,int]]: 排行榜列表(已排序)
|
||||
# """
|
||||
# rank = []
|
||||
# while len(rank) < plugin_config.dialectlist_get_num:
|
||||
# try:
|
||||
# max_key = max(msg_dict.items(), key=lambda x: x[1])
|
||||
# rank.append(list(max_key))
|
||||
# msg_dict.pop(max_key[0])
|
||||
# except ValueError:
|
||||
# rank.append(["null", 0])
|
||||
# continue
|
||||
|
||||
# return rank
|
||||
|
||||
|
||||
# class MsgProcesser(abc.ABC):
|
||||
# def __init__(self, bot: Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
# if isinstance(bot, Bot):
|
||||
# self.bot = bot
|
||||
# else:
|
||||
# self.bot = None
|
||||
# self.gid = gid
|
||||
# self.rank = got_rank(msg_counter(msg_list))
|
||||
|
||||
# @abc.abstractmethod
|
||||
# async def get_nickname_list(self) -> List:
|
||||
# """
|
||||
# ### 获得昵称
|
||||
# #### 抽象原因
|
||||
# 要对onebot协议不同版本进行适配
|
||||
# """
|
||||
# raise NotImplementedError
|
||||
|
||||
# @abc.abstractmethod
|
||||
# def get_head_portrait_urls(self) -> List:
|
||||
# raise NotImplementedError
|
||||
|
||||
# @abc.abstractmethod
|
||||
# async def get_send_msg(self) -> Message:
|
||||
# raise NotImplementedError
|
||||
|
||||
# async def get_msg(self) -> List[Union[str, bytes, None]]:
|
||||
# str_msg = await self.render_template_msg()
|
||||
# pic_msg = None
|
||||
# if plugin_config.dialectlist_visualization:
|
||||
# try:
|
||||
# pic_msg = await self.render_template_pic()
|
||||
# except OSError:
|
||||
# plugin_config.dialectlist_visualization = False
|
||||
# str_msg += "\n\n无法发送可视化图片,请检查是否安装GTK+,详细安装教程可见github\nhttps://github.com/tschoonj/GTK-for-Windows-Runtime-Environment-Installer \n若不想安装这个软件,再次使用这个指令不会显示这个提示"
|
||||
# return [str_msg, pic_msg]
|
||||
|
||||
# async def render_template_msg(self) -> str:
|
||||
# """渲染文字"""
|
||||
# string: str = ""
|
||||
# rank: List = self.rank
|
||||
# nicknames: List = await self.get_nickname_list()
|
||||
# for i in range(len(rank)):
|
||||
# index = i + 1
|
||||
# nickname, chatdatanum = nicknames[i], rank[i][1]
|
||||
# str_example = plugin_config.dialectlist_string_format.format(
|
||||
# index=index, nickname=nickname, chatdatanum=chatdatanum
|
||||
# )
|
||||
# string += str_example
|
||||
|
||||
# return string
|
||||
|
||||
# async def render_template_pic(self) -> bytes:
|
||||
# if plugin_config.dialectlist_visualization_type == "圆环图":
|
||||
# view = pygal.Pie(inner_radius=0.6, style=style)
|
||||
# elif plugin_config.dialectlist_visualization_type == "饼图":
|
||||
# view = pygal.Pie(style=style)
|
||||
# else:
|
||||
# view = pygal.Bar(style=style)
|
||||
|
||||
# view.title = "消息可视化"
|
||||
# for i, j in zip(self.rank, await self.get_nickname_list()): # type: ignore
|
||||
# view.add(str(j), int(i[1]))
|
||||
|
||||
# png: bytes = view.render_to_png() # type: ignore
|
||||
# self.img = png
|
||||
# return png
|
||||
|
||||
|
||||
# class V11GroupMsgProcesser(MsgProcesser):
|
||||
# def __init__(self, bot: V11Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
# super().__init__(bot, gid, msg_list)
|
||||
# self.bot = bot
|
||||
|
||||
# async def get_nickname_list(self) -> List:
|
||||
# nicknames = []
|
||||
# for i in range(len(self.rank)):
|
||||
# try:
|
||||
# member_info = await self.bot.get_group_member_info(
|
||||
# group_id=int(self.gid), user_id=int(self.rank[i][0]), no_cache=True
|
||||
# )
|
||||
# nickname=(
|
||||
# member_info["nickname"]
|
||||
# if not member_info["card"]
|
||||
# else member_info["card"]
|
||||
# )
|
||||
# nicknames.append(remove_control_characters(nickname))
|
||||
# except (ActionFailed,ValueError) as e:
|
||||
# nicknames.append("{}这家伙不在群里了".format(self.rank[i][0]))
|
||||
|
||||
# return nicknames
|
||||
|
||||
# def get_head_portrait_urls(self) -> List:
|
||||
# self.portrait_urls = [
|
||||
# "http://q2.qlogo.cn/headimg_dl?dst_uin={}&spec=640".format(i[0])
|
||||
# for i in self.rank
|
||||
# ]
|
||||
# return self.portrait_urls
|
||||
|
||||
# async def get_send_msg(self) -> V11Message:
|
||||
# msgs: List = await self.get_msg()
|
||||
# msg = V11Message()
|
||||
# msg += V11MessageSegment.text(msgs[0]) # type: ignore
|
||||
# msg += V11MessageSegment.image(msgs[1]) # type: ignore
|
||||
# return msg
|
||||
|
||||
|
||||
# class V12MsgProcesser(MsgProcesser):
|
||||
# def __init__(self, bot: V12Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
# super().__init__(bot, gid, msg_list)
|
||||
# self.bot = bot
|
||||
|
||||
# async def get_send_msg(self) -> V12Message:
|
||||
# msgs: List = await self.get_msg()
|
||||
# msg = V12Message()
|
||||
# msg += V12MessageSegment.text(msgs[0]) # type: ignore
|
||||
# msg += V12MessageSegment.image(msgs[1]) # type: ignore
|
||||
# return msg
|
||||
|
||||
# def get_head_portrait_urls(self) -> List:
|
||||
# return super().get_head_portrait_urls()
|
||||
|
||||
|
||||
# class V12GroupMsgProcesser(V12MsgProcesser):
|
||||
# def __init__(self, bot: V12Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
# super().__init__(bot, gid, msg_list)
|
||||
|
||||
# async def get_nickname_list(self) -> List:
|
||||
# nicknames = []
|
||||
# for i in range(len(self.rank)):
|
||||
# try:
|
||||
# member_info = await self.bot.get_group_member_info(
|
||||
# group_id=str(self.gid), user_id=str(self.rank[i][0]), no_cache=True
|
||||
# )
|
||||
# nickname=(
|
||||
# member_info["user_displayname"]
|
||||
# if member_info["user_displayname"]
|
||||
# else member_info["user_name"]
|
||||
# )
|
||||
# nicknames.append(remove_control_characters(nickname))
|
||||
# except ActionFailed as e:
|
||||
# nicknames.append("{}这家伙不在群里了".format(self.rank[i][0]))
|
||||
# return nicknames
|
||||
|
||||
|
||||
# class V12GuildMsgProcesser(V12MsgProcesser):
|
||||
# def __init__(self, bot: V12Bot, gid: str, msg_list: List[MessageRecord]) -> None:
|
||||
# super().__init__(bot, gid, msg_list)
|
||||
|
||||
# async def get_nickname_list(self) -> List:
|
||||
# nicknames = []
|
||||
# for i in range(len(self.rank)):
|
||||
# try:
|
||||
# member_info = await self.bot.get_guild_member_info(
|
||||
# guild_id=str(self.gid), user_id=str(self.rank[i][0]), no_cache=True
|
||||
# )
|
||||
# nickname=(
|
||||
# member_info["user_displayname"]
|
||||
# if member_info["user_displayname"]
|
||||
# else member_info["user_name"]
|
||||
# )
|
||||
# nicknames.append(remove_control_characters(nickname))
|
||||
# except ActionFailed as e:
|
||||
# nicknames.append("{}这家伙不在群里了".format(self.rank[i][0]))
|
||||
# return nicknames
|
Reference in New Issue
Block a user