2.3.0,基于FSQ的完整流式音符信息传输支持。

This commit is contained in:
2025-04-11 16:31:54 +08:00
parent b4e16353ec
commit 23bf69619b
12 changed files with 847 additions and 141 deletions

1
.gitignore vendored
View File

@ -11,6 +11,7 @@
/*.mcpack
/*.bdx
/*.msq
/*.fsq
/*.json
/*.mcstructure
.mscbackup

View File

@ -22,8 +22,8 @@ The Licensor of Musicreater("this project") is Eilles Wan, bgArray.
# 若需转载或借鉴 许可声明请查看仓库目录下的 License.md
__version__ = "2.2.4"
__vername__ = "MSQ 流式适配与校验增强,新增 NBS 音色表"
__version__ = "2.3.0"
__vername__ = "FSQ完全流式音符读写适配"
__author__ = (
("金羿", "Eilles"),
("诸葛亮与八卦阵", "bgArray"),
@ -53,8 +53,9 @@ __all__ = [
# 操作性函数
"natural_curve",
"straight_line",
"load_decode_msq_metainfo",
"load_decode_musicsequence_metainfo",
"load_decode_msq_flush_release",
"load_decode_fsq_flush_release",
"guess_deviation",
]

View File

@ -138,10 +138,17 @@ class MusicSequenceDecodeError(MSCTBaseException):
super().__init__("解码音符序列文件时出现问题", *args)
class MusicSequenceTypeError(MSCTBaseException):
"""音乐序列类型错误"""
def __init__(self, *args):
"""无法识别音符序列字节码的类型"""
super().__init__("错误的音符序列字节类型", *args)
class MusicSequenceVerificationFailed(MusicSequenceDecodeError):
"""音乐序列校验失败"""
def __init__(self, *args):
"""音符序列文件与其校验值不一致"""
super().__init__("音符序列文件校验失败", *args)

View File

@ -32,6 +32,7 @@ The Licensor of Musicreater("this project") is Eilles Wan, bgArray.
import os
import math
from itertools import chain
import mido
@ -231,33 +232,197 @@ class MusicSequence:
bytes_buffer_in: bytes,
verify: bool = True,
):
"""从字节码导入音乐序列"""
"""
从字节码导入音乐序列,目前支持 MSQ 第二、三版和 FSQ 第一版。
group_1 = int.from_bytes(bytes_buffer_in[4:6], "big")
group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False)
Paramaters
==========
bytes_buffer_in: bytes
字节码
verify: bool
是否进行校验(仅支持第三版 MSQ 格式 及 第一版 FSQ 格式)
high_quantity = bool(group_2 & 0b1000000000000000)
# print(group_2, high_quantity)
"""
music_name_ = bytes_buffer_in[8 : (stt_index := 8 + (group_1 >> 10))].decode(
"GB18030"
)
if bytes_buffer_in[:4] == b"MSQ!":
channels_: MineNoteChannelType = empty_midi_channels(default_staff=[])
total_note_count = 0
if verify:
_header_index = stt_index
_total_verify_code = 0
group_1 = int.from_bytes(bytes_buffer_in[4:6], "big", signed=False)
group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False)
for channel_index in channels_.keys():
channel_note_count = 0
_channel_start_index = stt_index
high_quantity = bool(group_2 & 0b1000000000000000)
# print(group_2, high_quantity)
music_name_ = bytes_buffer_in[
8 : (stt_index := 8 + (group_1 >> 10))
].decode("GB18030")
channels_: MineNoteChannelType = empty_midi_channels(default_staff=[])
total_note_count = 0
if verify:
_header_index = stt_index
_total_verify_code = 0
for channel_index in channels_.keys():
channel_note_count = 0
_channel_start_index = stt_index
for i in range(
int.from_bytes(
bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big"
)
):
try:
end_index = (
stt_index
+ 13
+ high_quantity
+ (bytes_buffer_in[stt_index] >> 2)
)
channels_[channel_index].append(
MineNote.decode(
code_buffer=bytes_buffer_in[stt_index:end_index],
is_high_time_precision=high_quantity,
)
)
channel_note_count += 1
stt_index = end_index
except Exception as _err:
# print(channels_)
raise MusicSequenceDecodeError(
_err, "当前全部通道数据:", channels_
)
if verify:
if (
_count_verify := xxh3_64(
channel_note_count.to_bytes(4, "big", signed=False),
seed=3,
)
).digest() != (
_original_code := bytes_buffer_in[stt_index : stt_index + 8]
):
raise MusicSequenceVerificationFailed(
"通道 {} 音符数量校验失败:{} -> `{}`;原始为 `{}`".format(
channel_index,
channel_note_count,
_count_verify.digest(),
_original_code,
)
)
if (
_channel_verify := xxh3_64(
bytes_buffer_in[_channel_start_index:stt_index],
seed=channel_note_count,
)
).digest() != (
_original_code := bytes_buffer_in[
stt_index + 8 : stt_index + 16
]
):
raise MusicSequenceVerificationFailed(
"通道 {} 音符数据校验失败:`{}`;原始为 `{}`".format(
channel_index,
_channel_verify.digest(),
_original_code,
)
)
_total_verify_code ^= (
_count_verify.intdigest() ^ _channel_verify.intdigest()
)
total_note_count += channel_note_count
stt_index += 16
if verify:
if (
_total_verify_res := xxh3_128(
_total_verify := (
xxh3_64(
bytes_buffer_in[0:_header_index],
seed=total_note_count,
).intdigest()
^ _total_verify_code
).to_bytes(8, "big"),
seed=total_note_count,
).digest()
) != (_original_code := bytes_buffer_in[stt_index:]):
raise MusicSequenceVerificationFailed(
"全曲最终校验失败。全曲音符数:{},全曲校验码异或结果:`{}` -> `{}`;原始为 `{}`".format(
total_note_count,
_total_verify,
_total_verify_res,
_original_code,
)
)
return cls(
name_of_music=music_name_,
channels_of_notes=channels_,
music_note_count=total_note_count,
minimum_volume_of_music=(group_1 & 0b1111111111) / 1000,
deviation_value=(
(-1 if group_2 & 0b100000000000000 else 1)
* (group_2 & 0b11111111111111)
/ 1000
),
)
elif bytes_buffer_in[:4] == b"FSQ!":
group_1 = int.from_bytes(bytes_buffer_in[4:6], "big", signed=False)
group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False)
high_quantity = bool(group_2 & 0b1000000000000000)
# print(group_2, high_quantity)
music_name_ = bytes_buffer_in[
8 : (stt_index := 8 + (group_1 >> 10))
].decode("GB18030")
total_note_count = int.from_bytes(
bytes_buffer_in[stt_index : (stt_index := stt_index + 5)],
"big",
signed=False,
)
if verify:
_total_verify_code = xxh3_64(
bytes_buffer_in[0:stt_index],
seed=total_note_count,
).intdigest()
_t6_buffer = _t2_buffer = 0
_channel_inst_chart: Dict[str, Dict[str, int]] = {}
channels_: MineNoteChannelType = empty_midi_channels(default_staff=[])
for i in range(total_note_count):
if verify:
if (
i % 100 == 0
) and i: # 每 100 个音符之后的。也就是 0~99 后的开始100~199 后开始……
if (
_now_vf := xxh32(
_t6_buffer.to_bytes(1, "big", signed=False),
seed=_t2_buffer,
)
).digest() != (
_original_code := bytes_buffer_in[
stt_index : (stt_index := stt_index + 4)
]
):
raise MusicSequenceVerificationFailed(
"音符数据校验失败,当前进度: {} 当前校验为:`{}`;原始为 `{}`".format(
i,
_now_vf.digest(),
_original_code,
)
)
_total_verify_code ^= _now_vf.intdigest()
_t6_buffer = _t2_buffer = 0
_t6_buffer ^= bytes_buffer_in[stt_index + 5]
_t2_buffer ^= bytes_buffer_in[stt_index + 1]
else:
if (i % 100 == 0) and i:
stt_index += 4
for i in range(
int.from_bytes(
bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big"
)
):
try:
end_index = (
stt_index
@ -265,96 +430,173 @@ class MusicSequence:
+ high_quantity
+ (bytes_buffer_in[stt_index] >> 2)
)
channels_[channel_index].append(
MineNote.decode(
code_buffer=bytes_buffer_in[stt_index:end_index],
is_high_time_precision=high_quantity,
)
_read_note = MineNote.decode(
code_buffer=bytes_buffer_in[stt_index:end_index],
is_high_time_precision=high_quantity,
)
channel_note_count += 1
stt_index = end_index
except Exception as _err:
print(channels_)
raise MusicSequenceDecodeError(_err)
# print(bytes_buffer_in[stt_index:end_index])
raise MusicSequenceDecodeError(
_err, "所截取的音符码:", bytes_buffer_in[stt_index:end_index]
)
if _read_note.sound_name in _channel_inst_chart:
_channel_inst_chart[_read_note.sound_name]["CNT"] += 1
else:
if len(_channel_inst_chart) >= 16:
_channel_inst_chart[_read_note.sound_name] = min(
_channel_inst_chart.values(), key=lambda x: x["CNT"]
) # 此处是指针式内存引用
_channel_inst_chart[_read_note.sound_name] = {
"CNT": 1,
"INDEX": len(_channel_inst_chart),
}
channels_[_channel_inst_chart[_read_note.sound_name]["INDEX"]].append(
_read_note
)
if verify:
if (
_count_verify := xxh3_64(
channel_note_count.to_bytes(4, "big", signed=False),
seed=3,
)
).digest() != (
_original_code := bytes_buffer_in[stt_index : stt_index + 8]
):
_total_verify_res := xxh3_128(
(_total_verify := _total_verify_code.to_bytes(8, "big")),
seed=total_note_count,
).digest()
) != (_original_code := bytes_buffer_in[stt_index:]):
raise MusicSequenceVerificationFailed(
"通道 {} 音符数量校验失败:{} -> `{}`;原始为 `{}`".format(
channel_index,
channel_note_count,
_count_verify.digest(),
"全曲最终校验失败。全曲音符数:{},全曲校验码异或结果:`{}` -> `{}`;原始为 `{}`".format(
total_note_count,
_total_verify,
_total_verify_res,
_original_code,
)
)
if (
_channel_verify := xxh3_64(
bytes_buffer_in[_channel_start_index:stt_index],
seed=channel_note_count,
return cls(
name_of_music=music_name_,
channels_of_notes=channels_,
music_note_count=total_note_count,
minimum_volume_of_music=(group_1 & 0b1111111111) / 1000,
deviation_value=(
(-1 if group_2 & 0b100000000000000 else 1)
* (group_2 & 0b11111111111111)
/ 1000
),
)
elif bytes_buffer_in[:4] == b"MSQ@":
group_1 = int.from_bytes(bytes_buffer_in[4:6], "big")
group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False)
high_quantity = bool(group_2 & 0b1000000000000000)
# print(group_2, high_quantity)
music_name_ = bytes_buffer_in[
8 : (stt_index := 8 + (group_1 >> 10))
].decode("GB18030")
channels_: MineNoteChannelType = empty_midi_channels(default_staff=[])
for channel_index in channels_.keys():
for i in range(
int.from_bytes(
bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big"
)
).digest() != (
_original_code := bytes_buffer_in[stt_index + 8 : stt_index + 16]
):
raise MusicSequenceVerificationFailed(
"通道 {} 音符数据校验失败:`{}`;原始为 `{}`".format(
channel_index,
_channel_verify.digest(),
_original_code,
try:
end_index = (
stt_index
+ 13
+ high_quantity
+ (bytes_buffer_in[stt_index] >> 2)
)
)
_total_verify_code ^= (
_count_verify.intdigest() ^ _channel_verify.intdigest()
)
total_note_count += channel_note_count
stt_index += 16
channels_[channel_index].append(
MineNote.decode(
code_buffer=bytes_buffer_in[stt_index:end_index],
is_high_time_precision=high_quantity,
)
)
stt_index = end_index
except:
print(channels_)
raise
if verify:
if (
_total_verify_res := xxh3_128(
_total_verify := (
xxh3_64(
bytes_buffer_in[0:_header_index],
seed=total_note_count,
).intdigest()
^ _total_verify_code
).to_bytes(8, "big"),
seed=total_note_count,
).digest()
) != (_original_code := bytes_buffer_in[stt_index:]):
raise MusicSequenceVerificationFailed(
"全曲最终校验失败。全曲音符数:{},全曲校验码异或和:`{}` -> `{}`;原始为 `{}`".format(
total_note_count,
_total_verify,
_total_verify_res,
_original_code,
)
)
return cls(
name_of_music=music_name_,
channels_of_notes=channels_,
minimum_volume_of_music=(group_1 & 0b1111111111) / 1000,
deviation_value=(
(-1 if group_2 & 0b100000000000000 else 1)
* (group_2 & 0b11111111111111)
/ 1000
),
)
return cls(
name_of_music=music_name_,
channels_of_notes=channels_,
minimum_volume_of_music=(group_1 & 0b1111111111) / 1000,
deviation_value=(
(-1 if group_2 & 0b100000000000000 else 1)
* (group_2 & 0b11111111111111)
/ 1000
),
)
elif bytes_buffer_in[:4] == b"MSQ#":
group_1 = int.from_bytes(bytes_buffer_in[4:6], "big")
music_name_ = bytes_buffer_in[
8 : (stt_index := 8 + (group_1 >> 10))
].decode("utf-8")
channels_: MineNoteChannelType = empty_midi_channels(default_staff=[])
for channel_index in channels_.keys():
for i in range(
int.from_bytes(
bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big"
)
):
try:
end_index = stt_index + 14 + (bytes_buffer_in[stt_index] >> 2)
channels_[channel_index].append(
MineNote.decode(bytes_buffer_in[stt_index:end_index])
)
stt_index = end_index
except:
print(channels_)
raise
return cls(
name_of_music=music_name_,
channels_of_notes=channels_,
minimum_volume_of_music=(group_1 & 0b1111111111) / 1000,
deviation_value=int.from_bytes(bytes_buffer_in[6:8], "big", signed=True)
/ 1000,
)
else:
raise MusicSequenceTypeError(
"输入的二进制字节码不是合法的音符序列格式,无法解码,码头前 10 字节为:",
bytes_buffer_in[:10],
)
def encode_dump(
self,
flowing_codec_support: bool = False,
include_displacement: bool = True,
high_time_precision: bool = True,
) -> bytes:
"""将音乐序列转为二进制字节码"""
"""
将音乐序列转为二进制字节码
Parameters
==========
flowing_codec_support: bool
流式编解码支持,默认为不启用(当启用时,其编码格式应为 FSQ 格式,否则应为 MSQ 格式)
请注意,非对流式有特殊要求的情况下,请不要启用此格式项;
FSQ 格式会损失通道信息,不应作为 MusicSequence 的基本存储格式使用。
include_displacement: bool
是否包含声像位移,默认包含
high_time_precision: bool
是否使用高精度时间,默认使用
Returns
=======
转换的字节码数据:
bytes
"""
# (已废弃)
# 第一版的码头: MSQ# 字串编码: UTF-8
# 第一版 MSQ 的码头: MSQ# 字串编码: UTF-8
# 第一版格式
# 音乐名称长度 6 位 支持到 63
# 最小音量 minimum_volume 10 位 最大支持 1023 即三位小数
@ -381,9 +623,10 @@ class MusicSequence:
# bytes_buffer += note_.encode()
# (已废弃)
# 第二版的码头: MSQ@ 字串编码: GB18030
# 第二版 MSQ 的码头: MSQ@ 字串编码: GB18030
# 第三版的码头: MSQ! 字串编码: GB18030 大端字节序
# 第三版 MSQ 的码头: MSQ! 字串编码: GB18030 大端字节序
# 第一版 FSQ 的码头: FSQ!
# 音乐名称长度 6 位 支持到 63
# 最小音量 minimum_volume 10 位 最大支持 1023 即三位小数
@ -396,11 +639,11 @@ class MusicSequence:
# 音乐名称 music_name 长度最多 63 支持到 31 个中文字符 或 63 个西文字符
bytes_buffer = (
b"MSQ!"
(b"FSQ!" if flowing_codec_support else b"MSQ!")
+ (
(len(r := self.music_name.encode("GB18030")) << 10) # 音乐名称长度
+ round(self.minimum_volume * 1000) # 最小音量
).to_bytes(2, "big")
).to_bytes(2, "big", signed=False)
+ (
(
(
@ -416,44 +659,83 @@ class MusicSequence:
+ r
)
# 此上是音符序列的元信息,接下来是多通道的音符序列
if flowing_codec_support:
# FSQ 在 MSQ 第三版的基础上增加了一个占 5 字节的全曲音符总数
bytes_buffer += self.total_note_count.to_bytes(5, "big", signed=False)
# 每个通道的开头是 32 位的 序列长度 共 4 字节
# 接下来根据这个序列的长度来读取音符数据
# 若启用“高精度”,则每个音符皆添加一个字节,用于存储音符时间控制精度偏移
# 此值每增加 1则音符向后播放时长增加 1/1250 秒
# 高精度功能在 MineNote 类实现
# (第三版新增)每个通道结尾包含一个 128 位的 XXHASH 校验值,用以标识该通道结束
# 在这 128 位里,前 64 位是该通道音符数的 XXHASH64 校验值,以 3 作为种子值
# 后 64 位是整个通道全部字节串的 XXHASH64 校验值(包括通道开头的音符数),以 该通道音符数 作为种子值
_final_hash_codec = xxh3_64(
bytes_buffer, seed=self.total_note_count
).intdigest()
for channel_index, note_list in self.channels.items():
channel_buffer = len_buffer = len(note_list).to_bytes(
4, "big", signed=False
)
for note_ in note_list:
channel_buffer += note_.encode(
is_high_time_precision=high_time_precision
if flowing_codec_support:
# 此上是音符序列的元信息,接下来是音符序列
__counting = 0
_t6_buffer = 0
_t2_buffer = 0
# 流式音符序列单通道序列FSQ 文件格式
for _note in sorted(
chain(*self.channels.values()), key=lambda x: x.start_tick
):
if __counting >= 100:
_now_hash_codec_verifier = xxh32(
_t6_buffer.to_bytes(1, "big", signed=False),
seed=_t2_buffer,
)
bytes_buffer += _now_hash_codec_verifier.digest()
_final_hash_codec ^= _now_hash_codec_verifier.intdigest()
_t6_buffer = 0
_t2_buffer = 0
__counting = 0
bytes_buffer += (
__single_buffer := _note.encode(
is_displacement_included=include_displacement,
is_high_time_precision=high_time_precision,
)
)
_now_hash_codec_spliter = xxh3_64(len_buffer, seed=3)
_now_hash_codec_verifier = xxh3_64(
channel_buffer, seed=int.from_bytes(len_buffer, "big", signed=False)
)
_t6_buffer ^= __single_buffer[5]
_t2_buffer ^= __single_buffer[1]
__counting += 1
bytes_buffer += channel_buffer
bytes_buffer += (
_now_hash_codec_spliter.digest() + _now_hash_codec_verifier.digest()
)
else:
_final_hash_codec ^= (
_now_hash_codec_spliter.intdigest()
^ _now_hash_codec_verifier.intdigest()
)
# 常规序列多通道MSQ 文件格式
# 每个通道的开头是 32 位的 序列长度 共 4 字节
# 接下来根据这个序列的长度来读取音符数据
# 若启用“高精度”,则每个音符皆添加一个字节,用于存储音符时间控制精度偏移
# 此值每增加 1则音符向后播放时长增加 1/1250 秒
# 高精度功能在 MineNote 类实现
# (第三版新增)每个通道结尾包含一个 128 位的 XXHASH 校验值,用以标识该通道结束
# 在这 128 位里,前 64 位是该通道音符数的 XXHASH64 校验值,以 3 作为种子值
# 后 64 位是整个通道全部字节串的 XXHASH64 校验值(包括通道开头的音符数),以 该通道音符数 作为种子值
for channel_index, note_list in self.channels.items():
channel_buffer = len_buffer = len(note_list).to_bytes(
4, "big", signed=False
)
for note_ in note_list:
channel_buffer += note_.encode(
is_displacement_included=include_displacement,
is_high_time_precision=high_time_precision,
)
_now_hash_codec_spliter = xxh3_64(len_buffer, seed=3)
_now_hash_codec_verifier = xxh3_64(
channel_buffer, seed=int.from_bytes(len_buffer, "big", signed=False)
)
bytes_buffer += channel_buffer
bytes_buffer += (
_now_hash_codec_spliter.digest() + _now_hash_codec_verifier.digest()
)
_final_hash_codec ^= (
_now_hash_codec_spliter.intdigest()
^ _now_hash_codec_verifier.intdigest()
)
# 在所有音符通道表示完毕之后,由一个 128 位的 XXHASH 校验值,用以标识文件结束并校验
# 该 128 位的校验值是对于前述所有校验值的异或所得值之 XXHASH128 校验值,以 全曲音符总数 作为种子值

View File

@ -177,6 +177,7 @@ class MineNote:
将数据打包为字节码
:param is_displacement_included:`bool` 是否包含声像偏移数据,默认为**是**
:param is_high_time_precision:`bool` 是否启用高精度,默认为**是**
:return bytes 打包好的字节码
"""

View File

@ -32,7 +32,7 @@ from typing import (
BinaryIO,
)
from xxhash import xxh3_64, xxh3_128
from xxhash import xxh32, xxh3_64, xxh3_128
from .constants import (
MC_INSTRUMENT_BLOCKS_TABLE,
@ -390,11 +390,11 @@ def soundID_to_blockID(
return MC_INSTRUMENT_BLOCKS_TABLE.get(sound_id, (default_block,))[0]
def load_decode_msq_metainfo(
def load_decode_musicsequence_metainfo(
buffer_in: BinaryIO,
) -> Tuple[str, float, float, bool, int]:
"""
以流的方式解码MSQ音乐序列元信息
以流的方式解码音乐序列元信息
Parameters
----------
@ -429,6 +429,44 @@ def load_decode_msq_metainfo(
)
def load_decode_fsq_flush_release(
buffer_in: BinaryIO,
starter_index: int,
high_quantity_note: bool,
) -> Generator[MineNote, Any, None]:
""" """
if buffer_in.tell() != starter_index:
buffer_in.seek(starter_index, 0)
total_note_count = int.from_bytes(
buffer_in.read(5),
"big",
signed=False,
)
for i in range(total_note_count):
if (i % 100 == 0) and i:
buffer_in.read(4)
try:
_note_bytes_length = (
12 + high_quantity_note + ((_first_byte := (buffer_in.read(1)))[0] >> 2)
)
yield MineNote.decode(
code_buffer=_first_byte + buffer_in.read(_note_bytes_length),
is_high_time_precision=high_quantity_note,
)
except Exception as _err:
# print(bytes_buffer_in[stt_index:end_index])
raise MusicSequenceDecodeError(
_err,
"所截取的音符码之首个字节:",
_first_byte,
)
def load_decode_msq_flush_release(
buffer_in: BinaryIO,
starter_index: int,
@ -448,7 +486,8 @@ def load_decode_msq_flush_release(
Returns
-------
Generator[Tuple[int, MineNote], Any, None]
以流的方式返回解码后的音符序列
以流的方式返回解码后的音符序列,每次返回一个元组
元组中包含两个元素,第一个元素为音符所在通道的索引,第二个元素为音符对象
Raises
------
@ -460,7 +499,8 @@ def load_decode_msq_flush_release(
# _total_verify = xxh3_64(buffer_in.read(starter_index), seed=total_note_count)
# buffer_in.seek(starter_index, 0)
buffer_in.seek(starter_index)
if buffer_in.tell() != starter_index:
buffer_in.seek(starter_index, 0)
_bytes_buffer_in = buffer_in.read()
# int.from_bytes(_bytes_buffer_in[0 : 4], "big")

View File

@ -1,3 +1,37 @@
# FSQ 文件格式
还在设计
FSQ 文件是 · 库存储音符序列的一种格式取自 **F**lowing Music **S**e**q**uence 之名
现在 · 库及其上游软件使用的是在 MSQ 第三版 的基础上进行流式的兼容性变动
## FSQ 第一版
第一版的码头是 `FSQ!` 这一版中所有的**字符串**皆以 _**GB18030**_ 编码进行编解码**数值**皆是以 _**大端字节序**_ 存储的无符号整数
码头是文件前四个字节的内容这一部分内容是可读的 ASCII 字串因此这一版的文件前四个字节的内容必为 `FSQ!`
因为这一版本是在 MSQ 第三版的基础上演变而来的因此取自 MSQ 码头的 `MSQ!` 而改作 `FSQ!`
### 文件头
FSQ 第一版的文件头是在 MSQ 第三版的文件头的基础上增加了一个占 5 字节的全曲音符总数
也就是说 MSQ 第三版一致的在这一元信息中**音乐名称长度****最小音量**合计共 2 字节**高精度音符时间控制启用****总音调偏移**合计共 2 字节因此**音乐名称**为任意长度前四字节内容均为固定而最后增加 5 字节作为全曲音符总数
### 音符序列
FSQ 格式不包含音符的通道信息在读取处理时默认将同一乐器的音符视为同一通道也就是说仅存在一个序列其中每个音符的信息存储方式与 MSQ 第三版一致
音符序列的存储顺序是按照音符的**开始时间**进行排序的
但是注意有可能一个较长的音符的开始到结束时间内还包含有音符此时如有要适配的读取器还请继续读取直到下一个音符的开始时间大于此较长音符的结束时间
在每 100 个音符后插入一段 32 位的 XXHASH32 校验码其所校验的内容为这一百个音符中每个音符的第 6 个字节彼此异或之结果种子值为这一百个音符中每个音符的第 2 个字节的彼此异或结果
若最后不满足 100 个音符则不插入上述校验码
### 文件校验
在所有有效数据之后包含一个 128 位的校验值用以标识整个文件结束的同时验证整个文件的完整性
128 位的校验值是 包括码头在内的元信息的 XXHASH64 校验值种子值是全曲音符数 对于前述所有 XXHASH32 校验值彼此异或的异或 所得值之 XXHASH128 校验值 全曲音符总数 作为种子值

View File

@ -1,12 +1,12 @@
# MSQ 文件格式
MSQ 文件是 · 存储音符序列的一种格式取自 MusicSeQuence
MSQ 文件是 · 存储音符序列的一种格式取自 **M**usic**S**e**Q**uence 类之名
现在 · 及其上游软件使用的是在 第二版 的基础上增设验功能的 MSQ 第三版
现在 · 及其上游软件使用的是在 第二版 的基础上增设验功能的 MSQ 第三版
## MSQ 第三版
第二版的码头是 `MSQ@` 这一版中所有的**字符串** _**GB18030**_ 编码进行编解码**数值****_大端序_**存储
第二版的码头是 `MSQ@` 这一版中所有的**字符串** _**GB18030**_ 编码进行编解码**数值**皆是以 _**大端字节**_ 存储的无符号整数
MSQ 第三版的码头是 `MSQ!`
@ -48,7 +48,7 @@ MSQ 第三版的码头是 `MSQ!`。
| **乐器名称** | sound_name | 依据先前定义 | 最多可支持 31 个中文字符 63 个西文字符其长度取决于先前获知的 乐器名称长度 的定义 |
| **声像位移**非必含 | position_displacement | 共三个值每个值 16 48 | 若前述**是否启用声像位移**已启用则此值启用三个值分别代表 xyz 轴上的偏移每个值支持数值 0~65535注意这里每个 1 代表最小音量的 0.001 个单位即取值是此处表示数字的千分倍 |
### 序列验
### 序列
_第三版新增_
@ -56,7 +56,7 @@ _第三版新增_
在这 128 位里 64 位是该通道音符数的 XXHASH64 校验值 3 作为种子值
64 位是整个通道全部字节串的 XXHASH64 校验值包括通道开头的音符数 该通道音符数 作为种子值
### 文件验
### 文件
_第三版新增_

39
example_fsq_opera.py Normal file
View File

@ -0,0 +1,39 @@
import Musicreater
from Musicreater.utils import (
load_decode_fsq_flush_release,
load_decode_musicsequence_metainfo,
)
from rich.pretty import pprint
msc_seq = Musicreater.MusicSequence.from_mido(
Musicreater.mido.MidiFile(
"./resource/测试片段.mid",
),
"TEST-测试片段",
)
pprint("音乐源取入成功:")
pprint(msc_seq)
with open("test.fsq", "wb") as f:
f.write(fsq_bytes := msc_seq.encode_dump(flowing_codec_support=True))
with open("test.fsq", "rb") as f:
msc_seq_r = Musicreater.MusicSequence.load_decode(f.read(), verify=True)
pprint("FSQ 传入类成功:")
pprint(msc_seq_r)
with open("test.fsq", "rb") as f:
pprint("流式 FSQ 元数据:")
pprint(metas := load_decode_musicsequence_metainfo(f))
pprint("流式 FSQ 音符序列:")
cnt = 0
for i in load_decode_fsq_flush_release(f, metas[-1], metas[-2]):
pprint(
i,
)
cnt += 1
pprint(f"{cnt} 个音符")

View File

@ -1,5 +1,8 @@
import Musicreater
from Musicreater.utils import load_decode_msq_flush_release, load_decode_msq_metainfo
from Musicreater.utils import (
load_decode_msq_flush_release,
load_decode_musicsequence_metainfo,
)
from rich.pretty import pprint
@ -25,7 +28,7 @@ pprint(msc_seq_r)
with open("test.msq", "rb") as f:
pprint("流式 MSQ 元数据:")
pprint(metas := load_decode_msq_metainfo(f))
pprint(metas := load_decode_musicsequence_metainfo(f))
pprint("流式 MSQ 音符序列:")
for i in load_decode_msq_flush_release(f, metas[-1], metas[-2]):
pprint(i)

View File

@ -0,0 +1,259 @@
import time
from itertools import chain
import random
from rich.console import Console
from rich.progress import Progress
from rich.table import Table
from multiprocessing import freeze_support, Pool, Process
console = Console()
# gening_stst = {"NOWIDX": 0, "DATA": {}}
# 生成单个字典的函数(用于多进程)
def generate_single_dict(args):
dict_id, dict_size = args
# if dict_id:
# console.print(
# f"字典 {dict_id + 1} 大小 {dict_size} 生成中...",
# )
# else:
# console.print(
# f"\n字典 {dict_id + 1} 大小 {dict_size} 生成中...",
# )
# final_d = {}
# gening_stst["DATA"][dict_id] = 0
# for i in range(dict_size):
# final_d[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))]
# gening_stst["DATA"][dict_id] += 1
return dict_id, {
i: [random.randint(0, 1000) for _ in range(random.randint(10000, 90000))]
for i in range(dict_size)
}
# return dict_id, final_d
# 合并函数定义
def chain_merging(dict_info: dict):
return sorted(chain(*dict_info.values()))
def seq_merging(dict_info: dict):
return sorted([i for sub in dict_info.values() for i in sub])
def summing(*_):
k = []
for i in _:
k += i
return k
def plus_merging(dict_info: dict):
return sorted(summing(*dict_info.values()))
if __name__ == "__main__":
freeze_support() # Windows系统需要这个调用
# 测试配置
dict_size = 50 # 每个字典的键值对数量
num_tests = 50 # 测试次数
function_list = [chain_merging, seq_merging, plus_merging]
# dict_list = []
results = {func.__name__: [] for func in function_list}
# 多进程生成多个字典
with Progress() as progress:
task = progress.add_task("[green]进行速度测试...", total=num_tests)
# gen_task = progress.add_task("[cyan] - 生成测试数据...", total=num_tests)
with Pool() as pool:
args_list = [
(
i,
dict_size,
)
for i in range(num_tests)
]
# def disp_work():
# while gening_stst["NOWIDX"] < num_tests:
# progress.update(
# gen_task,
# advance=1,
# description=f"[cyan]正在生成 {gening_stst['DATA']['NOWIDX']}/{dict_size -1}",
# # description="正在生成..."+console._render_buffer(
# # console.render(table,),
# # ),
# )
# Process(target=disp_work).start()
for result in pool.imap_unordered(generate_single_dict, args_list):
# dict_list.append(result)
progress.update(
task,
advance=1,
description=f"[cyan]正在测试 {result[0] + 1}/{num_tests}",
# description="正在生成..."+console._render_buffer(
# console.render(table,),
# ),
# refresh=True,
)
# gening_stst["NOWIDX"] += 1
# for _ in range(num_tests):
# 随机选择字典和打乱函数顺序
# current_dict = generate_single_dict((_, dict_size))
# progress.update(
# test_task,
# advance=1,
# # description=f"[cyan]正在测试 {_}/{num_tests -1}",
# # description="正在测试..."+console._render_buffer(
# # console.render(table,progress.console.options),
# # ),
# # refresh=True,
# )
# rangen_task = progress.add_task(
# "[green]正在生成测试数据...",
# total=dict_size,
# )
# current_dict = {}
# desc = "正在生成序列 {}/{}".format("{}",dict_size-1)
# for i in range(dict_size):
# # print("正在生成第", i, "个序列",end="\r",flush=True)
# progress.update(rangen_task, advance=1, description=desc.format(i))
# current_dict[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))]
shuffled_funcs = random.sample(function_list, len(function_list))
# table.rows
# table.columns = fine_column
# progress.live
# progress.console._buffer.extend(progress.console.render(table))
# for j in progress.console.render(table,progress.console.options):
# progress.console._buffer.insert(0,j)
for i, func in enumerate(shuffled_funcs):
start = time.perf_counter()
func(result[1])
elapsed = time.perf_counter() - start
results[func.__name__].append(elapsed)
# gening_stst["NOWIDX"] = num_tests
# fine_column = table.columns.copy()
# for func in function_list:
# name = func.__name__
# table.add_row(
# name,
# f"-",
# f"-",
# f"-",
# f"-",
# )
# # proc_pool = []
# 测试执行部分(保持顺序执行)
# with Progress() as progress:
# # progress.live.update(table, refresh=True)
# # progress.live.process_renderables([table],)
# # print([console._render_buffer(
# # console.render(table,),
# # )])
# # progress.console._buffer.extend(progress.console.render(table))
# test_task = progress.add_task("[cyan]进行速度测试...", total=num_tests)
# for _ in range(num_tests):
# # 随机选择字典和打乱函数顺序
# # current_dict = generate_single_dict((_, dict_size))
# progress.update(
# test_task,
# advance=1,
# description=f"[cyan]正在测试 {_}/{num_tests -1}",
# # description="正在测试..."+console._render_buffer(
# # console.render(table,progress.console.options),
# # ),
# # refresh=True,
# )
# rangen_task = progress.add_task(
# "[green]正在生成测试数据...",
# total=dict_size,
# )
# current_dict = {}
# desc = "正在生成序列 {}/{}".format("{}",dict_size-1)
# for i in range(dict_size):
# # print("正在生成第", i, "个序列",end="\r",flush=True)
# progress.update(rangen_task, advance=1, description=desc.format(i))
# current_dict[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))]
# shuffled_funcs = random.sample(function_list, len(function_list))
# # table.rows
# # table.columns = fine_column
# # progress.live
# # progress.console._buffer.extend(progress.console.render(table))
# # for j in progress.console.render(table,progress.console.options):
# # progress.console._buffer.insert(0,j)
# for i, func in enumerate(shuffled_funcs):
# start = time.perf_counter()
# func(current_dict)
# elapsed = time.perf_counter() - start
# results[func.__name__].append(elapsed)
# times = results[func.__name__]
# avg_time = sum(times) / len(times)
# min_time = min(times)
# max_time = max(times)
# table.columns[0]
# table.columns[0]._cells[i] = func.__name__
# table.columns[1]._cells[i] = f"{avg_time:.5f}"
# table.columns[2]._cells[i] = f"{min_time:.5f}"
# table.columns[3]._cells[i] = f"{max_time:.5f}"
# table.columns[4]._cells[i] = str(len(times))
# progress.update(test_task, advance=0.5)
# 结果展示部分
# 结果表格
table = Table(title="\n[cyan]性能测试结果", show_header=True, header_style="bold")
table.add_column("函数名称", style="dim", width=15)
table.add_column("平均耗时 (秒)", justify="right")
table.add_column("最小耗时 (秒)", justify="right")
table.add_column("最大耗时 (秒)", justify="right")
table.add_column("测试次数", justify="right")
for i, func in enumerate(function_list):
name = func.__name__
times = results[name]
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
table.add_row(
name,
f"{avg_time:.5f}",
f"{min_time:.5f}",
f"{max_time:.5f}",
str(len(times)),
)
# table.columns[0]._cells[i] = name
# table.columns[1]._cells[i] = f"{avg_time:.5f}"
# table.columns[2]._cells[i] = f"{min_time:.5f}"
# table.columns[3]._cells[i] = f"{max_time:.5f}"
# table.columns[4]._cells[i] = str(len(times))
console.print(table)

View File

@ -0,0 +1,39 @@
import time
from itertools import chain
import random
print("生成序列中")
fine_dict = {}
for i in range(50):
print("正在生成第", i, "个序列",end="\r",flush=True)
fine_dict[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))]
print("序列生成完成")
def chain_merging(dict_info: dict):
return sorted(chain(*dict_info.values()))
def seq_merging(dict_info: dict):
return sorted([i for sub in dict_info.values() for i in sub])
def summing(*_):
k = []
for i in _:
k += i
return k
def plus_merging(dict_info: dict):
return sorted(summing(*dict_info.values()))
function_list = [chain_merging, seq_merging, plus_merging]
for func in function_list:
print("正在使用",func.__name__,"函数",)
start = time.time()
func(fine_dict)
print("耗时",time.time() - start)
print("结束")