🎨 从同步改为异步

This commit is contained in:
Nanaloveyuki
2025-07-30 16:19:49 +08:00
parent 71d5dcaff3
commit 02d3f8a5ba
8 changed files with 315 additions and 269 deletions

View File

@ -50,20 +50,19 @@ DEFAULT_CONFIG = {
"file_level": "DEBUG",
"file_name": "latest.log",
"file_path": "./logs",
"file_format": "{asctime} {levelname} | {prefix}{message}",
"file_format": "{asctime} {levelname} | {prefix}{message}{suffix}",
"file_encoding": "utf-8",
"enable_console": True,
"enable_file": True,
"console_color": True,
"console_level": "DEBUG",
"console_format": "{time} {levelname} | {prefix}{message}",
"console_prefix": "Auto",
"console_format": "{set_color(time, await get_config('time_color'))} {levelname} | {prefix}{message}{suffix}",
"console_encoding": "utf-8",
"asctime_format": "%Y-%m-%d %H:%M:%S",
"time_format": "%H:%M:%S",
"date_format": "%Y-%m-%d",
"weekday_format": "%A",
"level_name": {"DEBUG": "DEBUG", "INFO": "INFO", "WARN": "WARN", "ERRO": "ERRO", "CRIT": "CRIT"},
"level_nickname": {"DEBUG": "DEBUG", "INFO": "INFO", "WARN": "WARN", "ERRO": "ERRO", "CRIT": "CRIT"},
"level_color": {"DEBUG": "#c1d5ff", "INFO": "#c1ffff", "WARN": "#fff600", "ERRO": "#ffa000", "CRIT": "#ff8181"},
"time_color": "#28ffb6",
}

View File

View File

@ -1,180 +0,0 @@
"""
py-logiliteal的格式化工具,用于格式化日志输出
py-logiliteal's formatter, used to format log output
"""
# encoding = utf-8
# python 3.13.5
from .configs import get_config
from typing import Any, Optional
from .time import get_asctime, get_time, get_weekday, get_date
from .styles import set_color, set_bg_color
from .regex import (
process_links,
process_markdown_formats,
process_html_styles,
process_special_tags,
process_color_formatting
)
from .placeholder import process_placeholder, SafeDict
if get_config("time_color") is None:
time_color = "#28ffb6"
else:
time_color = get_config("time_color")
def fmt_level(level: str) -> int:
"""
格式化日志级别
Format log level
:param level: 日志级别 Log level
:return: 格式化后的日志级别 Formatted log level
"""
level_map = {
"DEBUG": 0,
"INFO": 10,
"WARN": 20,
"ERRO": 30,
"CRIT": 40,
"UNKN": 50
}
return level_map.get(level.upper(), 50)
def fmt_level_number(level: int) -> str:
"""
格式化日志级别数字
Format log level number
:param level: 日志级别数字 Log level number
:return: 格式化后的日志级别 Formatted log level
"""
if level < 10:
return "DEBUG"
elif level < 20:
return "INFO"
elif level < 30:
return "WARN"
elif level < 40:
return "ERRO"
elif level < 50:
return "CRIT"
else:
return "UNKN"
def fmt_placeholder(message: Any, use_date_color: bool = True) -> str:
"""
格式化占位符
Format placeholder
:param message: 消息内容 Message content
:return: 格式化后的消息 Formatted message
"""
if not isinstance(message, str):
message = str(message)
context = {
"asctime": set_color(get_asctime(), time_color) if use_date_color else get_asctime(),
"time": set_color(get_time(), time_color) if use_date_color else get_time(),
"weekday": set_color(get_weekday(), time_color) if use_date_color else get_weekday(),
"date": set_color(get_date(), time_color) if use_date_color else get_date(),
}
return process_placeholder(message, context)
def fmt_message(
message: Any,
no_placeholder: bool = False,
no_style: bool = False,
no_process: bool = False,
no_tags: bool = False,
no_links: bool = False,
no_markdown: bool = False,
no_html: bool = False,
) -> str:
"""
格式化消息内容
Format message content
:param message: 消息内容 Message content
:return: 格式化后的消息 Formatted message
"""
def process_color_tags(msg: str, no_process: bool = False) -> str:
processed = process_color_formatting(
process_special_tags(
process_html_styles(
process_markdown_formats(
process_links(msg, no_links),
no_markdown
),
no_html
),
no_tags
),
no_process
)
return processed
processed_message = str(message)
if not no_placeholder:
processed_message = fmt_placeholder(processed_message)
if not no_style:
processed_message = process_color_tags(processed_message)
if no_process:
return message
return processed_message
def fmt_level_name(level_name: str) -> str:
if get_config("console_color") != True:
return level_name
level_name_nick_map = get_config("level_name")
if level_name in level_name_nick_map:
_lnn = level_name_nick_map[level_name]
level_color_map = get_config("level_color")
if level_name in level_color_map:
if level_name == "DEBUG":
return set_bg_color(set_color(_lnn, level_color_map[level_name]), "#34495e")
return set_color(_lnn, level_color_map[level_name])
return set_color(_lnn)
return "UNKN"
def fmt_console(level: int, message: Any, prefix: str | None = None) -> Optional[str]:
"""
格式化控制台输出
Format console output
:param level: 日志级别 Log level
:param message: 消息内容 Message content
:return: 格式化后的消息 Formatted message
"""
console_level = get_config("console_level")
if level != -1 and level < fmt_level(console_level):
return None
fmt = get_config("console_format")
prefix = prefix or ""
return fmt_placeholder(fmt).format(
levelname = fmt_level_name(fmt_level_number(level)),
prefix = fmt_message(prefix, no_placeholder=True),
message = fmt_message(message, no_placeholder=True)
)
def fmt_file(level: int, message: Any, prefix: str | None = None) -> Optional[str]:
"""
格式化文件输出
Format file output
:param level: 日志级别 Log level
:param message: 消息内容 Message content
:return: 格式化后的消息 Formatted message
"""
fl = get_config("file_level")
fmt = get_config("file_format")
if level != -1 and level < fmt_level(fl):
return None
if prefix is None:
prefix = ""
fmt = fmt_placeholder(fmt, use_date_color=False)
return f"{fmt.format(
levelname = fmt_level_number(level),
prefix = fmt_message(prefix, no_placeholder=True, no_style=True),
message = fmt_message(message, no_placeholder=True, no_style=True)
)}\n"

View File

@ -0,0 +1,283 @@
"""
格式化工具
Formatting tools
"""
from .time import get_asctime, get_time, get_weekday, get_date
from .styles import set_color
from .configs import get_config
from .regex import process_color_formatting, process_html_styles, process_links, process_markdown_formats, process_special_tags
# 占位符
_placeholder: dict[str, str | bytes] = {
"asctime": "{get_asctime()}",
"time": "{get_time()}",
"weekday": "{get_weekday()}",
"date": "{get_date()}",
"levelname": "{await fmt_level_number_to_name(level_number)}",
"level_number": "{await fmt_level_name_to_number(levelname)}",
"message": "{message}",
"prefix": "{prefix}",
"suffix": "{suffix}",
}
# 日志级别映射
_level_name_map: dict[str, int | float] = {
"DEBUG": 0,
"INFO": 10,
"WARN": 20,
"ERRO": 30,
"CRIT": 40,
"UNKN": -1,
}
async def fmt_level_number_to_name(level_number: int | float) -> str:
"""
格式化日志级别数字为名称
Format log level number to name\n
Args:
level_number (int | float): 日志级别数字
Returns:
str: 日志级别名称
"""
if not level_number in range(0, 50):
return _level_name_map.get(-1)
if 0 <= level_number <= 9:
return _level_name_map.get(0)
elif 10 <= level_number <= 19:
return _level_name_map.get(10)
elif 20 <= level_number <= 29:
return _level_name_map.get(20)
elif 30 <= level_number <= 39:
return _level_name_map.get(30)
elif 40 <= level_number <= 49:
return _level_name_map.get(40)
return _level_name_map.get(-1)
async def fmt_level_name_to_number(level_name: str) -> int:
"""
格式化日志级别名称为数字
Format log level name to number\n
Args:
level_name (str): 日志级别名称
Returns:
int: 日志级别数字
"""
return _level_name_map.get(level_name.upper(), -1)
async def fmt_level_list(level_number: int | float,
level_name: str in _level_name_map.keys(),
) -> dict:
"""
格式化日志级别数据字典
Format log level data dict\n
Args:
level_number (int | float): 日志级别数字
level_name (str): 日志级别名称
Returns:
dict: 日志级别数据字典
"""
return {
"level_number": level_number,
"level_nickname": get_config("level_nickname").get(level_name.upper(), level_name),
"level_name": level_name.upper(),
}
async def get_level_number(level_list: dict) -> int | float:
"""
获取日志级别数字
Get log level number
Args:
level_list (dict[str, int | float]): 日志级别数据字典
Returns:
int | float: 日志级别数字
"""
return level_list.get("level_number")
async def get_level_name(level_list: dict) -> str:
"""
获取日志级别名称
Get log level name
Args:
level_list (dict[str, int | float]): 日志级别数据字典
Returns:
str: 日志级别名称
"""
return level_list.get("level_name", "UNKN")
async def get_level_nickname(level_list: dict) -> str:
"""
获取日志级别昵称
Get log level nickname
Args:
level_list (dict[str, int | float]): 日志级别数据字典
Returns:
str: 日志级别昵称
"""
return level_list.get("level_nickname", "UNKN")
async def fmt_placeholder(
message: str = "",
no_placeholder: bool = False,
**kwargs
) -> str:
"""
格式化占位符
Format placeholder\n
Args:
message (str, optional): 日志消息. Defaults to "".
no_placeholder (bool, optional): 不使用占位符. Defaults to False.
Returns:
str: 格式化后的日志消息或原始消息
"""
if no_placeholder:
return message
for key, value in kwargs.items():
if key in _placeholder:
message = message.replace(_placeholder[key], str(value))
else:
message = message.replace(f"{{{key}}}", str(value))
return message
async def add_placeholder(key: str, value: str | bytes) -> None:
"""
添加新的占位符
Add new placeholder\n
Args:
key (str): 占位符键
value (str | bytes): 占位符值
"""
global _placeholder
_placeholder[key] = value
async def get_placeholder(key: str | None = None) -> str | dict[str, str | bytes]:
"""
获取占位符
Get placeholder\n
Args:
key (str | None, optional): 占位符键. Defaults to None.
Returns:
str | dict[str, str | bytes]: 占位符值或占位符字典
"""
if key is None:
return _placeholder
return _placeholder.get(key, "None") if key in _placeholder else "None"
async def remove_placeholder(key: str) -> bool:
"""
删除占位符
Remove placeholder\n
Args:
key (str): 占位符键
Returns:
bool: 删除成功返回True, 删除失败返回False
"""
try:
if key in _placeholder:
del _placeholder[key]
return True
else:
return False
except Exception:
return False
async def fmt_regex(message: str,
no_regex: bool = False,
no_color: bool = False,
no_html: bool = False,
no_link: bool = False,
no_markdown: bool = False,
no_special_tags: bool = False,
) -> str:
"""
格式化正则表达式
Format regex\n
Args:
message (str): 日志消息
Returns:
str: 格式化后的日志消息
"""
if no_regex | (no_color and no_html and no_link and no_markdown and no_special_tags):
return message
return await process_color_formatting(
await process_html_styles(
await process_links(
await process_markdown_formats(
await process_special_tags(message, no_process=no_special_tags),
no_process=no_markdown,
),
no_process=no_link
),
no_process=no_html
),
no_process=no_color
)
async def fmt_content(
message: str = "",
no_placeholder: bool = False,
no_regex: bool = False,
no_color: bool = False,
no_html: bool = False,
no_link: bool = False,
no_markdown: bool = False,
no_special_tags: bool = False,
) -> str:
"""
格式化内容
Format content\n
Args:
message (str): 日志消息
Returns:
str: 格式化后的日志消息
"""
message = await fmt_placeholder(message, no_placeholder=no_placeholder)
message = await fmt_regex(message, no_regex=no_regex, no_color=no_color, no_html=no_html, no_link=no_link, no_markdown=no_markdown, no_special_tags=no_special_tags)
return message
async def fmt_console(
message: str = "",
no_placeholder: bool = False,
no_regex: bool = False,
no_color: bool = False,
no_html: bool = False,
no_link: bool = False,
no_markdown: bool = False,
no_special_tags: bool = False,
) -> str:
"""
格式化控制台内容
Format console content\n
Args:
message (str): 日志消息
Returns:
str: 格式化后的日志消息
"""
message = await fmt_placeholder(message, no_placeholder=no_placeholder)
message = await fmt_regex(message, no_regex=no_regex, no_color=no_color, no_html=no_html, no_link=no_link, no_markdown=no_markdown, no_special_tags=no_special_tags)
return message
async def fmt_file(
message: str = "",
no_placeholder: bool = False,
no_regex: bool = True,
no_color: bool = False,
no_html: bool = False,
no_link: bool = False,
no_markdown: bool = False,
no_special_tags: bool = False,
) -> str:
"""
格式化文件内容
Format file content\n
Args:
message (str): 日志消息
Returns:
str: 格式化后的日志消息
"""
message = await fmt_placeholder(message, no_placeholder=no_placeholder)
message = await fmt_regex(message, no_regex=no_regex, no_color=no_color, no_html=no_html, no_link=no_link, no_markdown=no_markdown, no_special_tags=no_special_tags)
return message

View File

@ -1,62 +0,0 @@
"""
占位符处理工具
"""
from typing import Dict, Any, Optional
import re
class SafeDict(Dict[str, Any]):
"""安全的字典类,用于处理缺失键的占位符替换"""
def __missing__(self, key: str) -> str:
return f"{{{key}}}"
def process_placeholder(text: str, context: Optional[Dict[str, Any]] = None) -> str:
"""处理文本中的占位符替换
Args:
text: 包含占位符的原始文本
context: 用于替换占位符的上下文字典
Returns:
替换后的文本
"""
if not context:
return text
# 处理简单占位符 {key}
safe_context = SafeDict(**context)
text = text.format_map(safe_context)
# 处理条件占位符 {{if condition}}content{{endif}}
def replace_condition(match):
condition = match.group(1).strip()
content = match.group(2).strip()
# 简单条件解析仅支持key存在性检查
if condition.startswith('!'):
key = condition[1:].strip()
return '' if key in context else content
return content if condition in context else ''
text = re.sub(r'\{\{if\s+(.*?)\}\}([\s\S]*?)\{\{endif\}\}', replace_condition, text, flags=re.IGNORECASE)
# 处理循环占位符 {{for item in list}}content{{endfor}}
def replace_loop(match):
items = match.group(1).strip().split(' in ')
if len(items) != 2:
return match.group(0)
item_name, list_name = items
content = match.group(2).strip()
if list_name not in context or not isinstance(context[list_name], list):
return ''
result = []
for item in context[list_name]:
item_context = {item_name: item}
result.append(process_placeholder(content, item_context))
return ''.join(result)
text = re.sub(r'\{\{for\s+(.*?)\}\}([\s\S]*?)\{\{endfor\}\}', replace_loop, text, flags=re.IGNORECASE)
return text

View File

@ -6,7 +6,7 @@ from collections import deque
from .styles import set_color, set_bg_color
def process_links(text: str, no_process: bool = False) -> str:
async def process_links(text: str, no_process: bool = False) -> str:
"""处理链接标签(HTML和Markdown格式)"""
if no_process:
return text
@ -36,11 +36,11 @@ def process_links(text: str, no_process: bool = False) -> str:
for placeholder, (url, text_content) in placeholders.items():
ansi_link = f'\033]8;;{url}\033\\{set_color("\033[4m" + text_content, "#5f93ff")}\033]8;;\033\\'
text = text.replace(placeholder, ansi_link)
return text
def process_markdown_formats(text: str, no_process: bool = False) -> str:
async def process_markdown_formats(text: str, no_process: bool = False) -> str:
"""处理Markdown格式"""
if no_process:
return text
@ -55,7 +55,7 @@ def process_markdown_formats(text: str, no_process: bool = False) -> str:
return text
def process_html_styles(text: str, no_process: bool = False) -> str:
async def process_html_styles(text: str, no_process: bool = False) -> str:
"""处理HTML样式标签"""
if no_process:
return text
@ -74,7 +74,7 @@ def process_html_styles(text: str, no_process: bool = False) -> str:
return text
def process_special_tags(text: str, no_process: bool = False) -> str:
async def process_special_tags(text: str, no_process: bool = False) -> str:
"""处理特殊标签(换行、重置、段落)"""
if no_process:
return text
@ -87,7 +87,7 @@ def process_special_tags(text: str, no_process: bool = False) -> str:
return text
def process_color_formatting(text: str, no_process: bool = False) -> str:
async def process_color_formatting(text: str, no_process: bool = False) -> str:
"""处理颜色标签"""
if no_process:
return text

View File

@ -8,7 +8,7 @@ py-logiliteal's style tools, used to format log output
from typing import Union, Optional
def _get_hex_to_ansi(hex_color: str|None) -> Union[Optional[str], None]:
async def _get_hex_to_ansi(hex_color: str | None) -> Union[Optional[str], None]:
"""
将16进制颜色值转换为ANSI转义序列
Convert hex color value to ANSI escape sequence
@ -22,7 +22,7 @@ def _get_hex_to_ansi(hex_color: str|None) -> Union[Optional[str], None]:
r, g, b = int(hex_color[1:3], 16), int(hex_color[3:5], 16), int(hex_color[5:7], 16)
return f"\033[38;2;{r};{g};{b}m"
def set_color(text: str|None, color: str|None) -> str:
async def set_color(text: str | None, color: str | None) -> str:
"""
设置文本颜色
Set text color
@ -34,12 +34,12 @@ def set_color(text: str|None, color: str|None) -> str:
return ""
if not color:
return text
ansi = _get_hex_to_ansi(color)
ansi = await _get_hex_to_ansi(color)
if not ansi:
return text
return f"{ansi}{text}\033[0m"
def set_bg_color(text: str|None, color: str|None) -> str:
async def set_bg_color(text: str | None, color: str | None) -> str:
"""
设置文本背景颜色
Set text background color
@ -51,13 +51,20 @@ def set_bg_color(text: str|None, color: str|None) -> str:
return ""
if not color:
return text
ansi = _get_hex_to_ansi(color)
ansi = await _get_hex_to_ansi(color)
if not ansi:
return text
ansi = ansi.replace("38;", "48;")
return f"{ansi}{text}\033[0m"
def set_style(text: str|None, bold: bool = False, italic: bool = False, underline: bool = False, strikethrough: bool = False, reverse: bool = False) -> str:
async def set_style(
text: str | None,
bold: bool = False,
italic: bool = False,
underline: bool = False,
strikethrough: bool = False,
reverse: bool = False
) -> str:
"""
设置文本样式
Set text style

View File

@ -9,44 +9,43 @@ Time utility module, used for time formatting and output, caching time formattin
from datetime import datetime
from .configs import get_config
def get_asctime() -> str:
async def get_asctime() -> str:
"""
获取当前时间(YYYY-MM-DD HH:MM:SS),并缓存格式化结果
Get current time(YYYY-MM-DD HH:MM:SS) and cache formatted result
:return: 格式化后的时间 Formatted time
"""
return _get_time(get_config("asctime_format"))
return await _get_time(get_config("asctime_format"))
def get_time() -> str:
async def get_time() -> str:
"""
获取当前时间(HH:MM:SS),并缓存格式化结果
Get current time(HH:MM:SS) and cache formatted result
:return: 格式化后的时间 Formatted time
"""
return _get_time(get_config("time_format"))
return await _get_time(get_config("time_format"))
def get_date() -> str:
async def get_date() -> str:
"""
获取当前日期(YYYY-MM-DD),并缓存格式化结果
Get current date(YYYY-MM-DD) and cache formatted result
:return: 格式化后的日期 Formatted date
"""
return _get_time(get_config("date_format"))
return await _get_time(get_config("date_format"))
def get_weekday() -> str:
async def get_weekday() -> str:
"""
获取当前日期(星期几),并缓存格式化结果
Get current date(weekday) and cache formatted result
:return: 格式化后的星期几 Formatted weekday
"""
return _get_time(get_config("weekday_format"))
return await _get_time(get_config("weekday_format"))
def _get_time(fmt: str) -> str:
async def _get_time(fmt: str) -> str:
"""
获取当前时间(根据指定格式),并缓存格式化结果
Get current time(based on specified format) and cache formatted result
:param fmt: 时间格式 Time format
:return: 格式化后的时间 Formatted time
"""
cache_time = datetime.now().strftime(fmt)
return cache_time
return datetime.now().strftime(fmt)