mirror of
				https://github.com/LiteyukiStudio/LiteyukiBot.git
				synced 2025-10-31 04:26:23 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			94 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			94 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import time
 | |
| from typing import Any
 | |
| 
 | |
| from liteyuki.utils.message.html_tool import template2image
 | |
| from .common import MessageEventModel, msg_db
 | |
| from liteyuki.utils.base.language import Language
 | |
| from liteyuki.utils.base.resource import get_path
 | |
| from liteyuki.utils.message.npl import convert_seconds_to_time
 | |
| from contextvars import ContextVar
 | |
| 
 | |
| 
 | |
| async def count_msg_by_bot_id(bot_id: str) -> int:
 | |
|     condition = " AND bot_id = ?"
 | |
|     condition_args = [bot_id]
 | |
| 
 | |
|     msg_rows = msg_db.where_all(
 | |
|         MessageEventModel(),
 | |
|         condition,
 | |
|         *condition_args
 | |
|     )
 | |
| 
 | |
|     return len(msg_rows)
 | |
| 
 | |
| 
 | |
| async def get_stat_msg_image(duration: int, period: int, group_id: str = None, bot_id: str = None,
 | |
|                              ulang: Language = Language()) -> bytes:
 | |
|     """
 | |
|     获取统计消息
 | |
|     Args:
 | |
|         ctx:
 | |
|         ulang:
 | |
|         bot_id:
 | |
|         group_id:
 | |
|         duration: 统计时间,单位秒
 | |
|         period: 统计周期,单位秒
 | |
| 
 | |
|     Returns:
 | |
|         tuple: [int,], [int,] 两个列表,分别为周期中心时间戳和消息数量
 | |
|     """
 | |
|     now = int(time.time())
 | |
|     start_time = (now - duration)
 | |
| 
 | |
|     condition = "time > ?"
 | |
|     condition_args = [start_time]
 | |
| 
 | |
|     if group_id:
 | |
|         condition += " AND group_id = ?"
 | |
|         condition_args.append(group_id)
 | |
|     if bot_id:
 | |
|         condition += " AND bot_id = ?"
 | |
|         condition_args.append(bot_id)
 | |
| 
 | |
|     msg_rows = msg_db.where_all(
 | |
|         MessageEventModel(),
 | |
|         condition,
 | |
|         *condition_args
 | |
|     )
 | |
|     timestamps = []
 | |
|     msg_count = []
 | |
|     msg_rows.sort(key=lambda x: x.time)
 | |
| 
 | |
|     start_time = max(msg_rows[0].time, start_time)
 | |
| 
 | |
|     for i in range(start_time, now, period):
 | |
|         timestamps.append(i + period // 2)
 | |
|         msg_count.append(0)
 | |
| 
 | |
|     for msg in msg_rows:
 | |
|         period_start_time = start_time + (msg.time - start_time) // period * period
 | |
|         period_center_time = period_start_time + period // 2
 | |
|         index = timestamps.index(period_center_time)
 | |
|         msg_count[index] += 1
 | |
| 
 | |
|     templates = {
 | |
|         "data": [
 | |
|             {
 | |
|                 "name": ulang.get("stat.message")
 | |
|                         + f"    Period {convert_seconds_to_time(period)}" + f"    Duration {convert_seconds_to_time(duration)}"
 | |
|                         + (f"    Group {group_id}" if group_id else "") + (f"    Bot {bot_id}" if bot_id else ""),
 | |
|                 "times": timestamps,
 | |
|                 "counts": msg_count
 | |
|             }
 | |
|         ]
 | |
|     }
 | |
| 
 | |
|     return await template2image(get_path("templates/stat_msg.html"), templates, debug=True)
 | |
| 
 | |
|     # if not timestamps or period_start_time != timestamps[-1]:
 | |
|     #     timestamps.append(period_start_time)
 | |
|     #     msg_count.append(1)
 | |
|     # else:
 | |
|     #     msg_count[-1] += 1
 | |
|     #
 |