From 23bf69619bd4ff4a63b350daf8b09e3884e0a0f2 Mon Sep 17 00:00:00 2001 From: EillesWan Date: Fri, 11 Apr 2025 16:31:54 +0800 Subject: [PATCH] =?UTF-8?q?2.3.0=EF=BC=8C=E5=9F=BA=E4=BA=8EFSQ=E7=9A=84?= =?UTF-8?q?=E5=AE=8C=E6=95=B4=E6=B5=81=E5=BC=8F=E9=9F=B3=E7=AC=A6=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E4=BC=A0=E8=BE=93=E6=94=AF=E6=8C=81=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + Musicreater/__init__.py | 7 +- Musicreater/exceptions.py | 9 +- Musicreater/main.py | 530 ++++++++++++++----- Musicreater/subclass.py | 1 + Musicreater/utils.py | 50 +- docs/FSQ文件格式.md | 36 +- docs/MSQ文件格式.md | 10 +- example_fsq_opera.py | 39 ++ example_msq_opera.py | 7 +- resources/test/enfasted_list_merging_test.py | 259 +++++++++ resources/test/list_merging_text.py | 39 ++ 12 files changed, 847 insertions(+), 141 deletions(-) create mode 100644 example_fsq_opera.py create mode 100644 resources/test/enfasted_list_merging_test.py create mode 100644 resources/test/list_merging_text.py diff --git a/.gitignore b/.gitignore index 447aee5..341a149 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ /*.mcpack /*.bdx /*.msq +/*.fsq /*.json /*.mcstructure .mscbackup diff --git a/Musicreater/__init__.py b/Musicreater/__init__.py index 06d760c..e831591 100644 --- a/Musicreater/__init__.py +++ b/Musicreater/__init__.py @@ -22,8 +22,8 @@ The Licensor of Musicreater("this project") is Eilles Wan, bgArray. # 若需转载或借鉴 许可声明请查看仓库目录下的 License.md -__version__ = "2.2.4" -__vername__ = "MSQ 流式适配与校验增强,新增 NBS 音色表" +__version__ = "2.3.0" +__vername__ = "FSQ完全流式音符读写适配" __author__ = ( ("金羿", "Eilles"), ("诸葛亮与八卦阵", "bgArray"), @@ -53,8 +53,9 @@ __all__ = [ # 操作性函数 "natural_curve", "straight_line", - "load_decode_msq_metainfo", + "load_decode_musicsequence_metainfo", "load_decode_msq_flush_release", + "load_decode_fsq_flush_release", "guess_deviation", ] diff --git a/Musicreater/exceptions.py b/Musicreater/exceptions.py index 70965e9..a725da4 100644 --- a/Musicreater/exceptions.py +++ b/Musicreater/exceptions.py @@ -138,10 +138,17 @@ class MusicSequenceDecodeError(MSCTBaseException): super().__init__("解码音符序列文件时出现问题", *args) +class MusicSequenceTypeError(MSCTBaseException): + """音乐序列类型错误""" + + def __init__(self, *args): + """无法识别音符序列字节码的类型""" + super().__init__("错误的音符序列字节类型", *args) + + class MusicSequenceVerificationFailed(MusicSequenceDecodeError): """音乐序列校验失败""" def __init__(self, *args): """音符序列文件与其校验值不一致""" super().__init__("音符序列文件校验失败", *args) - diff --git a/Musicreater/main.py b/Musicreater/main.py index 2fdffda..6c2c65b 100644 --- a/Musicreater/main.py +++ b/Musicreater/main.py @@ -32,6 +32,7 @@ The Licensor of Musicreater("this project") is Eilles Wan, bgArray. import os import math +from itertools import chain import mido @@ -231,33 +232,197 @@ class MusicSequence: bytes_buffer_in: bytes, verify: bool = True, ): - """从字节码导入音乐序列""" + """ + 从字节码导入音乐序列,目前支持 MSQ 第二、三版和 FSQ 第一版。 - group_1 = int.from_bytes(bytes_buffer_in[4:6], "big") - group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False) + Paramaters + ========== + bytes_buffer_in: bytes + 字节码 + verify: bool + 是否进行校验(仅支持第三版 MSQ 格式 及 第一版 FSQ 格式) - high_quantity = bool(group_2 & 0b1000000000000000) - # print(group_2, high_quantity) + """ - music_name_ = bytes_buffer_in[8 : (stt_index := 8 + (group_1 >> 10))].decode( - "GB18030" - ) + if bytes_buffer_in[:4] == b"MSQ!": - channels_: MineNoteChannelType = empty_midi_channels(default_staff=[]) - total_note_count = 0 - if verify: - _header_index = stt_index - _total_verify_code = 0 + group_1 = int.from_bytes(bytes_buffer_in[4:6], "big", signed=False) + group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False) - for channel_index in channels_.keys(): - channel_note_count = 0 - _channel_start_index = stt_index + high_quantity = bool(group_2 & 0b1000000000000000) + # print(group_2, high_quantity) + + music_name_ = bytes_buffer_in[ + 8 : (stt_index := 8 + (group_1 >> 10)) + ].decode("GB18030") + + channels_: MineNoteChannelType = empty_midi_channels(default_staff=[]) + total_note_count = 0 + if verify: + _header_index = stt_index + _total_verify_code = 0 + + for channel_index in channels_.keys(): + channel_note_count = 0 + _channel_start_index = stt_index + + for i in range( + int.from_bytes( + bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big" + ) + ): + try: + end_index = ( + stt_index + + 13 + + high_quantity + + (bytes_buffer_in[stt_index] >> 2) + ) + channels_[channel_index].append( + MineNote.decode( + code_buffer=bytes_buffer_in[stt_index:end_index], + is_high_time_precision=high_quantity, + ) + ) + channel_note_count += 1 + stt_index = end_index + except Exception as _err: + # print(channels_) + raise MusicSequenceDecodeError( + _err, "当前全部通道数据:", channels_ + ) + if verify: + if ( + _count_verify := xxh3_64( + channel_note_count.to_bytes(4, "big", signed=False), + seed=3, + ) + ).digest() != ( + _original_code := bytes_buffer_in[stt_index : stt_index + 8] + ): + raise MusicSequenceVerificationFailed( + "通道 {} 音符数量校验失败:{} -> `{}`;原始为 `{}`".format( + channel_index, + channel_note_count, + _count_verify.digest(), + _original_code, + ) + ) + if ( + _channel_verify := xxh3_64( + bytes_buffer_in[_channel_start_index:stt_index], + seed=channel_note_count, + ) + ).digest() != ( + _original_code := bytes_buffer_in[ + stt_index + 8 : stt_index + 16 + ] + ): + raise MusicSequenceVerificationFailed( + "通道 {} 音符数据校验失败:`{}`;原始为 `{}`".format( + channel_index, + _channel_verify.digest(), + _original_code, + ) + ) + _total_verify_code ^= ( + _count_verify.intdigest() ^ _channel_verify.intdigest() + ) + total_note_count += channel_note_count + stt_index += 16 + + if verify: + if ( + _total_verify_res := xxh3_128( + _total_verify := ( + xxh3_64( + bytes_buffer_in[0:_header_index], + seed=total_note_count, + ).intdigest() + ^ _total_verify_code + ).to_bytes(8, "big"), + seed=total_note_count, + ).digest() + ) != (_original_code := bytes_buffer_in[stt_index:]): + raise MusicSequenceVerificationFailed( + "全曲最终校验失败。全曲音符数:{},全曲校验码异或结果:`{}` -> `{}`;原始为 `{}`".format( + total_note_count, + _total_verify, + _total_verify_res, + _original_code, + ) + ) + + return cls( + name_of_music=music_name_, + channels_of_notes=channels_, + music_note_count=total_note_count, + minimum_volume_of_music=(group_1 & 0b1111111111) / 1000, + deviation_value=( + (-1 if group_2 & 0b100000000000000 else 1) + * (group_2 & 0b11111111111111) + / 1000 + ), + ) + + elif bytes_buffer_in[:4] == b"FSQ!": + + group_1 = int.from_bytes(bytes_buffer_in[4:6], "big", signed=False) + group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False) + + high_quantity = bool(group_2 & 0b1000000000000000) + # print(group_2, high_quantity) + + music_name_ = bytes_buffer_in[ + 8 : (stt_index := 8 + (group_1 >> 10)) + ].decode("GB18030") + + total_note_count = int.from_bytes( + bytes_buffer_in[stt_index : (stt_index := stt_index + 5)], + "big", + signed=False, + ) + + if verify: + _total_verify_code = xxh3_64( + bytes_buffer_in[0:stt_index], + seed=total_note_count, + ).intdigest() + _t6_buffer = _t2_buffer = 0 + + _channel_inst_chart: Dict[str, Dict[str, int]] = {} + channels_: MineNoteChannelType = empty_midi_channels(default_staff=[]) + + for i in range(total_note_count): + if verify: + if ( + i % 100 == 0 + ) and i: # 每 100 个音符之后的。也就是 0~99 后的开始,100~199 后开始…… + if ( + _now_vf := xxh32( + _t6_buffer.to_bytes(1, "big", signed=False), + seed=_t2_buffer, + ) + ).digest() != ( + _original_code := bytes_buffer_in[ + stt_index : (stt_index := stt_index + 4) + ] + ): + raise MusicSequenceVerificationFailed( + "音符数据校验失败,当前进度: {} 当前校验为:`{}`;原始为 `{}`".format( + i, + _now_vf.digest(), + _original_code, + ) + ) + _total_verify_code ^= _now_vf.intdigest() + _t6_buffer = _t2_buffer = 0 + _t6_buffer ^= bytes_buffer_in[stt_index + 5] + _t2_buffer ^= bytes_buffer_in[stt_index + 1] + else: + if (i % 100 == 0) and i: + stt_index += 4 - for i in range( - int.from_bytes( - bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big" - ) - ): try: end_index = ( stt_index @@ -265,96 +430,173 @@ class MusicSequence: + high_quantity + (bytes_buffer_in[stt_index] >> 2) ) - channels_[channel_index].append( - MineNote.decode( - code_buffer=bytes_buffer_in[stt_index:end_index], - is_high_time_precision=high_quantity, - ) + _read_note = MineNote.decode( + code_buffer=bytes_buffer_in[stt_index:end_index], + is_high_time_precision=high_quantity, ) - channel_note_count += 1 stt_index = end_index except Exception as _err: - print(channels_) - raise MusicSequenceDecodeError(_err) + # print(bytes_buffer_in[stt_index:end_index]) + raise MusicSequenceDecodeError( + _err, "所截取的音符码:", bytes_buffer_in[stt_index:end_index] + ) + + if _read_note.sound_name in _channel_inst_chart: + _channel_inst_chart[_read_note.sound_name]["CNT"] += 1 + else: + if len(_channel_inst_chart) >= 16: + _channel_inst_chart[_read_note.sound_name] = min( + _channel_inst_chart.values(), key=lambda x: x["CNT"] + ) # 此处是指针式内存引用 + _channel_inst_chart[_read_note.sound_name] = { + "CNT": 1, + "INDEX": len(_channel_inst_chart), + } + channels_[_channel_inst_chart[_read_note.sound_name]["INDEX"]].append( + _read_note + ) if verify: if ( - _count_verify := xxh3_64( - channel_note_count.to_bytes(4, "big", signed=False), - seed=3, - ) - ).digest() != ( - _original_code := bytes_buffer_in[stt_index : stt_index + 8] - ): + _total_verify_res := xxh3_128( + (_total_verify := _total_verify_code.to_bytes(8, "big")), + seed=total_note_count, + ).digest() + ) != (_original_code := bytes_buffer_in[stt_index:]): raise MusicSequenceVerificationFailed( - "通道 {} 音符数量校验失败:{} -> `{}`;原始为 `{}`".format( - channel_index, - channel_note_count, - _count_verify.digest(), + "全曲最终校验失败。全曲音符数:{},全曲校验码异或结果:`{}` -> `{}`;原始为 `{}`".format( + total_note_count, + _total_verify, + _total_verify_res, _original_code, ) ) - if ( - _channel_verify := xxh3_64( - bytes_buffer_in[_channel_start_index:stt_index], - seed=channel_note_count, + + return cls( + name_of_music=music_name_, + channels_of_notes=channels_, + music_note_count=total_note_count, + minimum_volume_of_music=(group_1 & 0b1111111111) / 1000, + deviation_value=( + (-1 if group_2 & 0b100000000000000 else 1) + * (group_2 & 0b11111111111111) + / 1000 + ), + ) + + elif bytes_buffer_in[:4] == b"MSQ@": + + group_1 = int.from_bytes(bytes_buffer_in[4:6], "big") + group_2 = int.from_bytes(bytes_buffer_in[6:8], "big", signed=False) + + high_quantity = bool(group_2 & 0b1000000000000000) + # print(group_2, high_quantity) + + music_name_ = bytes_buffer_in[ + 8 : (stt_index := 8 + (group_1 >> 10)) + ].decode("GB18030") + channels_: MineNoteChannelType = empty_midi_channels(default_staff=[]) + for channel_index in channels_.keys(): + for i in range( + int.from_bytes( + bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big" ) - ).digest() != ( - _original_code := bytes_buffer_in[stt_index + 8 : stt_index + 16] ): - raise MusicSequenceVerificationFailed( - "通道 {} 音符数据校验失败:`{}`;原始为 `{}`".format( - channel_index, - _channel_verify.digest(), - _original_code, + try: + end_index = ( + stt_index + + 13 + + high_quantity + + (bytes_buffer_in[stt_index] >> 2) ) - ) - _total_verify_code ^= ( - _count_verify.intdigest() ^ _channel_verify.intdigest() - ) - total_note_count += channel_note_count - stt_index += 16 + channels_[channel_index].append( + MineNote.decode( + code_buffer=bytes_buffer_in[stt_index:end_index], + is_high_time_precision=high_quantity, + ) + ) + stt_index = end_index + except: + print(channels_) + raise - if verify: - if ( - _total_verify_res := xxh3_128( - _total_verify := ( - xxh3_64( - bytes_buffer_in[0:_header_index], - seed=total_note_count, - ).intdigest() - ^ _total_verify_code - ).to_bytes(8, "big"), - seed=total_note_count, - ).digest() - ) != (_original_code := bytes_buffer_in[stt_index:]): - raise MusicSequenceVerificationFailed( - "全曲最终校验失败。全曲音符数:{},全曲校验码异或和:`{}` -> `{}`;原始为 `{}`".format( - total_note_count, - _total_verify, - _total_verify_res, - _original_code, - ) - ) + return cls( + name_of_music=music_name_, + channels_of_notes=channels_, + minimum_volume_of_music=(group_1 & 0b1111111111) / 1000, + deviation_value=( + (-1 if group_2 & 0b100000000000000 else 1) + * (group_2 & 0b11111111111111) + / 1000 + ), + ) - return cls( - name_of_music=music_name_, - channels_of_notes=channels_, - minimum_volume_of_music=(group_1 & 0b1111111111) / 1000, - deviation_value=( - (-1 if group_2 & 0b100000000000000 else 1) - * (group_2 & 0b11111111111111) - / 1000 - ), - ) + elif bytes_buffer_in[:4] == b"MSQ#": + + group_1 = int.from_bytes(bytes_buffer_in[4:6], "big") + + music_name_ = bytes_buffer_in[ + 8 : (stt_index := 8 + (group_1 >> 10)) + ].decode("utf-8") + channels_: MineNoteChannelType = empty_midi_channels(default_staff=[]) + for channel_index in channels_.keys(): + for i in range( + int.from_bytes( + bytes_buffer_in[stt_index : (stt_index := stt_index + 4)], "big" + ) + ): + try: + end_index = stt_index + 14 + (bytes_buffer_in[stt_index] >> 2) + channels_[channel_index].append( + MineNote.decode(bytes_buffer_in[stt_index:end_index]) + ) + stt_index = end_index + except: + print(channels_) + raise + + return cls( + name_of_music=music_name_, + channels_of_notes=channels_, + minimum_volume_of_music=(group_1 & 0b1111111111) / 1000, + deviation_value=int.from_bytes(bytes_buffer_in[6:8], "big", signed=True) + / 1000, + ) + + else: + raise MusicSequenceTypeError( + "输入的二进制字节码不是合法的音符序列格式,无法解码,码头前 10 字节为:", + bytes_buffer_in[:10], + ) def encode_dump( self, + flowing_codec_support: bool = False, + include_displacement: bool = True, high_time_precision: bool = True, ) -> bytes: - """将音乐序列转为二进制字节码""" + """ + 将音乐序列转为二进制字节码 + + Parameters + ========== + + flowing_codec_support: bool + 流式编解码支持,默认为不启用(当启用时,其编码格式应为 FSQ 格式,否则应为 MSQ 格式) + 请注意,非对流式有特殊要求的情况下,请不要启用此格式项; + FSQ 格式会损失通道信息,不应作为 MusicSequence 的基本存储格式使用。 + include_displacement: bool + 是否包含声像位移,默认包含 + high_time_precision: bool + 是否使用高精度时间,默认使用 + + Returns + ======= + 转换的字节码数据: + bytes + """ # (已废弃) - # 第一版的码头: MSQ# 字串编码: UTF-8 + # 第一版 MSQ 的码头: MSQ# 字串编码: UTF-8 # 第一版格式 # 音乐名称长度 6 位 支持到 63 # 最小音量 minimum_volume 10 位 最大支持 1023 即三位小数 @@ -381,9 +623,10 @@ class MusicSequence: # bytes_buffer += note_.encode() # (已废弃) - # 第二版的码头: MSQ@ 字串编码: GB18030 + # 第二版 MSQ 的码头: MSQ@ 字串编码: GB18030 - # 第三版的码头: MSQ! 字串编码: GB18030 大端字节序 + # 第三版 MSQ 的码头: MSQ! 字串编码: GB18030 大端字节序 + # 第一版 FSQ 的码头: FSQ! # 音乐名称长度 6 位 支持到 63 # 最小音量 minimum_volume 10 位 最大支持 1023 即三位小数 @@ -396,11 +639,11 @@ class MusicSequence: # 音乐名称 music_name 长度最多 63 支持到 31 个中文字符 或 63 个西文字符 bytes_buffer = ( - b"MSQ!" + (b"FSQ!" if flowing_codec_support else b"MSQ!") + ( (len(r := self.music_name.encode("GB18030")) << 10) # 音乐名称长度 + round(self.minimum_volume * 1000) # 最小音量 - ).to_bytes(2, "big") + ).to_bytes(2, "big", signed=False) + ( ( ( @@ -416,44 +659,83 @@ class MusicSequence: + r ) - # 此上是音符序列的元信息,接下来是多通道的音符序列 + if flowing_codec_support: + # FSQ 在 MSQ 第三版的基础上增加了一个占 5 字节的全曲音符总数 + bytes_buffer += self.total_note_count.to_bytes(5, "big", signed=False) - # 每个通道的开头是 32 位的 序列长度 共 4 字节 - # 接下来根据这个序列的长度来读取音符数据 - - # 若启用“高精度”,则每个音符皆添加一个字节,用于存储音符时间控制精度偏移 - # 此值每增加 1,则音符向后播放时长增加 1/1250 秒 - # 高精度功能在 MineNote 类实现 - - # (第三版新增)每个通道结尾包含一个 128 位的 XXHASH 校验值,用以标识该通道结束 - # 在这 128 位里,前 64 位是该通道音符数的 XXHASH64 校验值,以 3 作为种子值 - # 后 64 位是整个通道全部字节串的 XXHASH64 校验值(包括通道开头的音符数),以 该通道音符数 作为种子值 _final_hash_codec = xxh3_64( bytes_buffer, seed=self.total_note_count ).intdigest() - for channel_index, note_list in self.channels.items(): - channel_buffer = len_buffer = len(note_list).to_bytes( - 4, "big", signed=False - ) - for note_ in note_list: - channel_buffer += note_.encode( - is_high_time_precision=high_time_precision + if flowing_codec_support: + # 此上是音符序列的元信息,接下来是音符序列 + __counting = 0 + _t6_buffer = 0 + _t2_buffer = 0 + + # 流式音符序列,单通道序列,FSQ 文件格式 + for _note in sorted( + chain(*self.channels.values()), key=lambda x: x.start_tick + ): + if __counting >= 100: + _now_hash_codec_verifier = xxh32( + _t6_buffer.to_bytes(1, "big", signed=False), + seed=_t2_buffer, + ) + bytes_buffer += _now_hash_codec_verifier.digest() + _final_hash_codec ^= _now_hash_codec_verifier.intdigest() + + _t6_buffer = 0 + _t2_buffer = 0 + __counting = 0 + + bytes_buffer += ( + __single_buffer := _note.encode( + is_displacement_included=include_displacement, + is_high_time_precision=high_time_precision, + ) ) - _now_hash_codec_spliter = xxh3_64(len_buffer, seed=3) - _now_hash_codec_verifier = xxh3_64( - channel_buffer, seed=int.from_bytes(len_buffer, "big", signed=False) - ) + _t6_buffer ^= __single_buffer[5] + _t2_buffer ^= __single_buffer[1] + __counting += 1 - bytes_buffer += channel_buffer - bytes_buffer += ( - _now_hash_codec_spliter.digest() + _now_hash_codec_verifier.digest() - ) + else: - _final_hash_codec ^= ( - _now_hash_codec_spliter.intdigest() - ^ _now_hash_codec_verifier.intdigest() - ) + # 常规序列,多通道,MSQ 文件格式 + + # 每个通道的开头是 32 位的 序列长度 共 4 字节 + # 接下来根据这个序列的长度来读取音符数据 + + # 若启用“高精度”,则每个音符皆添加一个字节,用于存储音符时间控制精度偏移 + # 此值每增加 1,则音符向后播放时长增加 1/1250 秒 + # 高精度功能在 MineNote 类实现 + + # (第三版新增)每个通道结尾包含一个 128 位的 XXHASH 校验值,用以标识该通道结束 + # 在这 128 位里,前 64 位是该通道音符数的 XXHASH64 校验值,以 3 作为种子值 + # 后 64 位是整个通道全部字节串的 XXHASH64 校验值(包括通道开头的音符数),以 该通道音符数 作为种子值 + for channel_index, note_list in self.channels.items(): + channel_buffer = len_buffer = len(note_list).to_bytes( + 4, "big", signed=False + ) + for note_ in note_list: + channel_buffer += note_.encode( + is_displacement_included=include_displacement, + is_high_time_precision=high_time_precision, + ) + _now_hash_codec_spliter = xxh3_64(len_buffer, seed=3) + _now_hash_codec_verifier = xxh3_64( + channel_buffer, seed=int.from_bytes(len_buffer, "big", signed=False) + ) + + bytes_buffer += channel_buffer + bytes_buffer += ( + _now_hash_codec_spliter.digest() + _now_hash_codec_verifier.digest() + ) + + _final_hash_codec ^= ( + _now_hash_codec_spliter.intdigest() + ^ _now_hash_codec_verifier.intdigest() + ) # 在所有音符通道表示完毕之后,由一个 128 位的 XXHASH 校验值,用以标识文件结束并校验 # 该 128 位的校验值是对于前述所有校验值的异或所得值之 XXHASH128 校验值,以 全曲音符总数 作为种子值 diff --git a/Musicreater/subclass.py b/Musicreater/subclass.py index bb126e4..b61c3bb 100644 --- a/Musicreater/subclass.py +++ b/Musicreater/subclass.py @@ -177,6 +177,7 @@ class MineNote: 将数据打包为字节码 :param is_displacement_included:`bool` 是否包含声像偏移数据,默认为**是** + :param is_high_time_precision:`bool` 是否启用高精度,默认为**是** :return bytes 打包好的字节码 """ diff --git a/Musicreater/utils.py b/Musicreater/utils.py index 466d7a8..cdeb600 100644 --- a/Musicreater/utils.py +++ b/Musicreater/utils.py @@ -32,7 +32,7 @@ from typing import ( BinaryIO, ) -from xxhash import xxh3_64, xxh3_128 +from xxhash import xxh32, xxh3_64, xxh3_128 from .constants import ( MC_INSTRUMENT_BLOCKS_TABLE, @@ -390,11 +390,11 @@ def soundID_to_blockID( return MC_INSTRUMENT_BLOCKS_TABLE.get(sound_id, (default_block,))[0] -def load_decode_msq_metainfo( +def load_decode_musicsequence_metainfo( buffer_in: BinaryIO, ) -> Tuple[str, float, float, bool, int]: """ - 以流的方式解码MSQ音乐序列元信息 + 以流的方式解码音乐序列元信息 Parameters ---------- @@ -429,6 +429,44 @@ def load_decode_msq_metainfo( ) +def load_decode_fsq_flush_release( + buffer_in: BinaryIO, + starter_index: int, + high_quantity_note: bool, +) -> Generator[MineNote, Any, None]: + """ """ + + if buffer_in.tell() != starter_index: + buffer_in.seek(starter_index, 0) + + total_note_count = int.from_bytes( + buffer_in.read(5), + "big", + signed=False, + ) + + for i in range(total_note_count): + if (i % 100 == 0) and i: + buffer_in.read(4) + + try: + _note_bytes_length = ( + 12 + high_quantity_note + ((_first_byte := (buffer_in.read(1)))[0] >> 2) + ) + + yield MineNote.decode( + code_buffer=_first_byte + buffer_in.read(_note_bytes_length), + is_high_time_precision=high_quantity_note, + ) + except Exception as _err: + # print(bytes_buffer_in[stt_index:end_index]) + raise MusicSequenceDecodeError( + _err, + "所截取的音符码之首个字节:", + _first_byte, + ) + + def load_decode_msq_flush_release( buffer_in: BinaryIO, starter_index: int, @@ -448,7 +486,8 @@ def load_decode_msq_flush_release( Returns ------- Generator[Tuple[int, MineNote], Any, None] - 以流的方式返回解码后的音符序列 + 以流的方式返回解码后的音符序列,每次返回一个元组 + 元组中包含两个元素,第一个元素为音符所在通道的索引,第二个元素为音符对象 Raises ------ @@ -460,7 +499,8 @@ def load_decode_msq_flush_release( # _total_verify = xxh3_64(buffer_in.read(starter_index), seed=total_note_count) # buffer_in.seek(starter_index, 0) - buffer_in.seek(starter_index) + if buffer_in.tell() != starter_index: + buffer_in.seek(starter_index, 0) _bytes_buffer_in = buffer_in.read() # int.from_bytes(_bytes_buffer_in[0 : 4], "big") diff --git a/docs/FSQ文件格式.md b/docs/FSQ文件格式.md index 92212d3..16738da 100644 --- a/docs/FSQ文件格式.md +++ b/docs/FSQ文件格式.md @@ -1,3 +1,37 @@ # FSQ 文件格式 -还在设计 \ No newline at end of file +FSQ 文件是 音·创 库存储音符序列的一种格式,取自 **F**lowing Music **S**e**q**uence 之名。 + +现在 音·创 库及其上游软件使用的是在 MSQ 第三版 的基础上进行流式的兼容性变动。 + +## FSQ 第一版 + +第一版的码头是 `FSQ!` ,这一版中,所有的**字符串**皆以 _**GB18030**_ 编码进行编解码,**数值**皆是以 _**大端字节序**_ 存储的无符号整数。 + +码头是文件前四个字节的内容,这一部分内容是可读的 ASCII 字串。因此,这一版的文件前四个字节的内容必为 `FSQ!`。 + +因为这一版本是在 MSQ 第三版的基础上演变而来的,因此取自 MSQ 码头的 `MSQ!` 而改作 `FSQ!`。 + +### 文件头 + +FSQ 第一版的文件头是在 MSQ 第三版的文件头的基础上增加了一个占 5 字节的全曲音符总数。 + +也就是说,与 MSQ 第三版一致的,在这一元信息中,**音乐名称长度**和**最小音量**合计共 2 字节;**高精度音符时间控制启用**和**总音调偏移**合计共 2 字节;因此,除**音乐名称**为任意长度,前四字节内容均为固定。而最后增加 5 字节作为全曲音符总数。 + +### 音符序列 + +FSQ 格式不包含音符的通道信息,在读取处理时默认将同一乐器的音符视为同一通道。也就是说,仅存在一个序列。其中每个音符的信息存储方式与 MSQ 第三版一致。 + +音符序列的存储顺序是按照音符的**开始时间**进行排序的。 + +但是注意!有可能一个较长的音符的开始到结束时间内还包含有音符,此时如有要适配的读取器,还请继续读取直到下一个音符的开始时间大于此较长音符的结束时间。 + +在每 100 个音符后,插入一段 32 位的 XXHASH32 校验码,其所校验的内容为这一百个音符中每个音符的第 6 个字节彼此异或之结果,种子值为这一百个音符中每个音符的第 2 个字节的彼此异或结果。 + +若最后不满足 100 个音符,则不插入上述校验码。 + +### 文件校验 + +在所有有效数据之后,包含一个 128 位的校验值,用以标识整个文件结束的同时,验证整个文件的完整性。 + +该 128 位的校验值是 包括码头在内的元信息的 XXHASH64 校验值(种子值是全曲音符数) 对于前述所有 XXHASH32 校验值彼此异或的异或 所得值之 XXHASH128 校验值,以 全曲音符总数 作为种子值。 diff --git a/docs/MSQ文件格式.md b/docs/MSQ文件格式.md index 95f48b0..099eeb2 100644 --- a/docs/MSQ文件格式.md +++ b/docs/MSQ文件格式.md @@ -1,12 +1,12 @@ # MSQ 文件格式 -MSQ 文件是 音·创 存储音符序列的一种格式,取自 MusicSeQuence。 +MSQ 文件是 音·创 库存储音符序列的一种格式,取自 **M**usic**S**e**Q**uence 类之名。 -现在 音·创 及其上游软件使用的是在 第二版 的基础上增设验证功能的 MSQ 第三版。 +现在 音·创 库及其上游软件使用的是在 第二版 的基础上增设校验功能的 MSQ 第三版。 ## MSQ 第三版 -第二版的码头是 `MSQ@` ,这一版中,所有的**字符串**以 _**GB18030**_ 编码进行编解码,**数值**以**_大端序_**存储。 +第二版的码头是 `MSQ@` ,这一版中,所有的**字符串**皆以 _**GB18030**_ 编码进行编解码,**数值**皆是以 _**大端字节序**_ 存储的无符号整数。 MSQ 第三版的码头是 `MSQ!`。 @@ -48,7 +48,7 @@ MSQ 第三版的码头是 `MSQ!`。 | **乐器名称** | sound_name | 依据先前定义 | 最多可支持 31 个中文字符 或 63 个西文字符,其长度取决于先前获知的 “乐器名称长度” 的定义 | | **声像位移**(非必含) | position_displacement | 共三个值,每个值 16 位 共 48 位 | 若前述**是否启用声像位移**已启用,则此值启用;三个值分别代表 x、y、z 轴上的偏移,每个值支持数值 0~65535,注意,这里每个 1 代表最小音量的 0.001 个单位,即取值是此处表示数字的千分倍 | -### 序列验证 +### 序列校验 _第三版新增_ @@ -56,7 +56,7 @@ _第三版新增_ 在这 128 位里,前 64 位是该通道音符数的 XXHASH64 校验值,以 3 作为种子值。 后 64 位是整个通道全部字节串的 XXHASH64 校验值(包括通道开头的音符数),以 该通道音符数 作为种子值。 -### 文件验证 +### 文件校验 _第三版新增_ diff --git a/example_fsq_opera.py b/example_fsq_opera.py new file mode 100644 index 0000000..16a8c0a --- /dev/null +++ b/example_fsq_opera.py @@ -0,0 +1,39 @@ +import Musicreater +from Musicreater.utils import ( + load_decode_fsq_flush_release, + load_decode_musicsequence_metainfo, +) + +from rich.pretty import pprint + +msc_seq = Musicreater.MusicSequence.from_mido( + Musicreater.mido.MidiFile( + "./resource/测试片段.mid", + ), + "TEST-测试片段", +) + +pprint("音乐源取入成功:") +pprint(msc_seq) + +with open("test.fsq", "wb") as f: + f.write(fsq_bytes := msc_seq.encode_dump(flowing_codec_support=True)) + +with open("test.fsq", "rb") as f: + msc_seq_r = Musicreater.MusicSequence.load_decode(f.read(), verify=True) + +pprint("FSQ 传入类成功:") +pprint(msc_seq_r) + + +with open("test.fsq", "rb") as f: + pprint("流式 FSQ 元数据:") + pprint(metas := load_decode_musicsequence_metainfo(f)) + pprint("流式 FSQ 音符序列:") + cnt = 0 + for i in load_decode_fsq_flush_release(f, metas[-1], metas[-2]): + pprint( + i, + ) + cnt += 1 + pprint(f"共 {cnt} 个音符") diff --git a/example_msq_opera.py b/example_msq_opera.py index 55cafb7..7a410b8 100644 --- a/example_msq_opera.py +++ b/example_msq_opera.py @@ -1,5 +1,8 @@ import Musicreater -from Musicreater.utils import load_decode_msq_flush_release, load_decode_msq_metainfo +from Musicreater.utils import ( + load_decode_msq_flush_release, + load_decode_musicsequence_metainfo, +) from rich.pretty import pprint @@ -25,7 +28,7 @@ pprint(msc_seq_r) with open("test.msq", "rb") as f: pprint("流式 MSQ 元数据:") - pprint(metas := load_decode_msq_metainfo(f)) + pprint(metas := load_decode_musicsequence_metainfo(f)) pprint("流式 MSQ 音符序列:") for i in load_decode_msq_flush_release(f, metas[-1], metas[-2]): pprint(i) diff --git a/resources/test/enfasted_list_merging_test.py b/resources/test/enfasted_list_merging_test.py new file mode 100644 index 0000000..ad5c231 --- /dev/null +++ b/resources/test/enfasted_list_merging_test.py @@ -0,0 +1,259 @@ +import time +from itertools import chain +import random +from rich.console import Console +from rich.progress import Progress +from rich.table import Table +from multiprocessing import freeze_support, Pool, Process + +console = Console() + +# gening_stst = {"NOWIDX": 0, "DATA": {}} + + +# 生成单个字典的函数(用于多进程) +def generate_single_dict(args): + dict_id, dict_size = args + # if dict_id: + # console.print( + # f"字典 {dict_id + 1} 大小 {dict_size} 生成中...", + # ) + # else: + # console.print( + # f"\n字典 {dict_id + 1} 大小 {dict_size} 生成中...", + # ) + # final_d = {} + # gening_stst["DATA"][dict_id] = 0 + # for i in range(dict_size): + # final_d[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))] + # gening_stst["DATA"][dict_id] += 1 + return dict_id, { + i: [random.randint(0, 1000) for _ in range(random.randint(10000, 90000))] + for i in range(dict_size) + } + # return dict_id, final_d + + +# 合并函数定义 +def chain_merging(dict_info: dict): + return sorted(chain(*dict_info.values())) + + +def seq_merging(dict_info: dict): + return sorted([i for sub in dict_info.values() for i in sub]) + + +def summing(*_): + k = [] + for i in _: + k += i + return k + + +def plus_merging(dict_info: dict): + return sorted(summing(*dict_info.values())) + + +if __name__ == "__main__": + freeze_support() # Windows系统需要这个调用 + + # 测试配置 + dict_size = 50 # 每个字典的键值对数量 + num_tests = 50 # 测试次数 + + function_list = [chain_merging, seq_merging, plus_merging] + # dict_list = [] + results = {func.__name__: [] for func in function_list} + + # 多进程生成多个字典 + with Progress() as progress: + task = progress.add_task("[green]进行速度测试...", total=num_tests) + # gen_task = progress.add_task("[cyan] - 生成测试数据...", total=num_tests) + with Pool() as pool: + args_list = [ + ( + i, + dict_size, + ) + for i in range(num_tests) + ] + + # def disp_work(): + # while gening_stst["NOWIDX"] < num_tests: + # progress.update( + # gen_task, + # advance=1, + # description=f"[cyan]正在生成 {gening_stst['DATA']['NOWIDX']}/{dict_size -1}", + # # description="正在生成..."+console._render_buffer( + # # console.render(table,), + # # ), + # ) + + # Process(target=disp_work).start() + + for result in pool.imap_unordered(generate_single_dict, args_list): + # dict_list.append(result) + progress.update( + task, + advance=1, + description=f"[cyan]正在测试 {result[0] + 1}/{num_tests}", + # description="正在生成..."+console._render_buffer( + # console.render(table,), + # ), + # refresh=True, + ) + + # gening_stst["NOWIDX"] += 1 + + # for _ in range(num_tests): + # 随机选择字典和打乱函数顺序 + # current_dict = generate_single_dict((_, dict_size)) + # progress.update( + # test_task, + # advance=1, + # # description=f"[cyan]正在测试 {_}/{num_tests -1}", + # # description="正在测试..."+console._render_buffer( + # # console.render(table,progress.console.options), + # # ), + # # refresh=True, + # ) + + # rangen_task = progress.add_task( + # "[green]正在生成测试数据...", + # total=dict_size, + # ) + # current_dict = {} + # desc = "正在生成序列 {}/{}".format("{}",dict_size-1) + + # for i in range(dict_size): + # # print("正在生成第", i, "个序列",end="\r",flush=True) + # progress.update(rangen_task, advance=1, description=desc.format(i)) + # current_dict[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))] + + shuffled_funcs = random.sample(function_list, len(function_list)) + # table.rows + # table.columns = fine_column + # progress.live + # progress.console._buffer.extend(progress.console.render(table)) + # for j in progress.console.render(table,progress.console.options): + # progress.console._buffer.insert(0,j) + + for i, func in enumerate(shuffled_funcs): + + start = time.perf_counter() + func(result[1]) + elapsed = time.perf_counter() - start + results[func.__name__].append(elapsed) + # gening_stst["NOWIDX"] = num_tests + + # fine_column = table.columns.copy() + + # for func in function_list: + # name = func.__name__ + + # table.add_row( + # name, + # f"-", + # f"-", + # f"-", + # f"-", + # ) + + # # proc_pool = [] + + # 测试执行部分(保持顺序执行) + # with Progress() as progress: + # # progress.live.update(table, refresh=True) + # # progress.live.process_renderables([table],) + # # print([console._render_buffer( + # # console.render(table,), + # # )]) + # # progress.console._buffer.extend(progress.console.render(table)) + # test_task = progress.add_task("[cyan]进行速度测试...", total=num_tests) + + # for _ in range(num_tests): + # # 随机选择字典和打乱函数顺序 + # # current_dict = generate_single_dict((_, dict_size)) + # progress.update( + # test_task, + # advance=1, + # description=f"[cyan]正在测试 {_}/{num_tests -1}", + # # description="正在测试..."+console._render_buffer( + # # console.render(table,progress.console.options), + # # ), + # # refresh=True, + # ) + + # rangen_task = progress.add_task( + # "[green]正在生成测试数据...", + # total=dict_size, + # ) + # current_dict = {} + # desc = "正在生成序列 {}/{}".format("{}",dict_size-1) + + # for i in range(dict_size): + # # print("正在生成第", i, "个序列",end="\r",flush=True) + # progress.update(rangen_task, advance=1, description=desc.format(i)) + # current_dict[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))] + + # shuffled_funcs = random.sample(function_list, len(function_list)) + # # table.rows + # # table.columns = fine_column + # # progress.live + # # progress.console._buffer.extend(progress.console.render(table)) + # # for j in progress.console.render(table,progress.console.options): + # # progress.console._buffer.insert(0,j) + + # for i, func in enumerate(shuffled_funcs): + + # start = time.perf_counter() + # func(current_dict) + # elapsed = time.perf_counter() - start + # results[func.__name__].append(elapsed) + + # times = results[func.__name__] + # avg_time = sum(times) / len(times) + # min_time = min(times) + # max_time = max(times) + + # table.columns[0] + + # table.columns[0]._cells[i] = func.__name__ + # table.columns[1]._cells[i] = f"{avg_time:.5f}" + # table.columns[2]._cells[i] = f"{min_time:.5f}" + # table.columns[3]._cells[i] = f"{max_time:.5f}" + # table.columns[4]._cells[i] = str(len(times)) + + # progress.update(test_task, advance=0.5) + + # 结果展示部分 + + # 结果表格 + table = Table(title="\n[cyan]性能测试结果", show_header=True, header_style="bold") + table.add_column("函数名称", style="dim", width=15) + table.add_column("平均耗时 (秒)", justify="right") + table.add_column("最小耗时 (秒)", justify="right") + table.add_column("最大耗时 (秒)", justify="right") + table.add_column("测试次数", justify="right") + + for i, func in enumerate(function_list): + name = func.__name__ + times = results[name] + avg_time = sum(times) / len(times) + min_time = min(times) + max_time = max(times) + + table.add_row( + name, + f"{avg_time:.5f}", + f"{min_time:.5f}", + f"{max_time:.5f}", + str(len(times)), + ) + # table.columns[0]._cells[i] = name + # table.columns[1]._cells[i] = f"{avg_time:.5f}" + # table.columns[2]._cells[i] = f"{min_time:.5f}" + # table.columns[3]._cells[i] = f"{max_time:.5f}" + # table.columns[4]._cells[i] = str(len(times)) + + console.print(table) diff --git a/resources/test/list_merging_text.py b/resources/test/list_merging_text.py new file mode 100644 index 0000000..f816efb --- /dev/null +++ b/resources/test/list_merging_text.py @@ -0,0 +1,39 @@ +import time +from itertools import chain +import random + +print("生成序列中") + +fine_dict = {} + +for i in range(50): + print("正在生成第", i, "个序列",end="\r",flush=True) + fine_dict[i] = [random.randint(0, 1000) for _ in range(random.randint(10000, 99999))] + +print("序列生成完成") + +def chain_merging(dict_info: dict): + return sorted(chain(*dict_info.values())) + +def seq_merging(dict_info: dict): + return sorted([i for sub in dict_info.values() for i in sub]) + +def summing(*_): + k = [] + for i in _: + k += i + return k + +def plus_merging(dict_info: dict): + return sorted(summing(*dict_info.values())) + +function_list = [chain_merging, seq_merging, plus_merging] + + +for func in function_list: + print("正在使用",func.__name__,"函数",) + start = time.time() + func(fine_dict) + print("耗时",time.time() - start) + +print("结束") \ No newline at end of file