fix(archive): unable to preview (#7843)
* fix(archive): unrecognition zip * feat(archive): add tree for zip meta * fix bug * refactor(archive): meta cache time use Link Expiration first * feat(archive): return sort policy in meta (#2) * refactor * perf(archive): reduce new network requests --------- Co-authored-by: KirCute_ECT <951206789@qq.com>
This commit is contained in:
@ -13,6 +13,7 @@ import (
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/pkg/http_range"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type FileStream struct {
|
||||
@ -189,6 +190,7 @@ func NewSeekableStream(fs FileStream, link *model.Link) (*SeekableStream, error)
|
||||
|
||||
if ss.Link.RangeReadCloser != nil {
|
||||
ss.rangeReadCloser = ss.Link.RangeReadCloser
|
||||
ss.Add(ss.rangeReadCloser)
|
||||
return &ss, nil
|
||||
}
|
||||
if len(ss.Link.URL) > 0 {
|
||||
@ -197,6 +199,7 @@ func NewSeekableStream(fs FileStream, link *model.Link) (*SeekableStream, error)
|
||||
return nil, err
|
||||
}
|
||||
ss.rangeReadCloser = rrc
|
||||
ss.Add(rrc)
|
||||
return &ss, nil
|
||||
}
|
||||
}
|
||||
@ -248,8 +251,6 @@ func (ss *SeekableStream) Read(p []byte) (n int, err error) {
|
||||
return 0, nil
|
||||
}
|
||||
ss.Reader = io.NopCloser(rc)
|
||||
ss.Closers.Add(rc)
|
||||
|
||||
}
|
||||
return ss.Reader.Read(p)
|
||||
}
|
||||
@ -337,10 +338,62 @@ type RangeReadReadAtSeeker struct {
|
||||
ss *SeekableStream
|
||||
masterOff int64
|
||||
readers []*readerCur
|
||||
*headCache
|
||||
}
|
||||
|
||||
type FileReadAtSeeker struct {
|
||||
ss *SeekableStream
|
||||
type headCache struct {
|
||||
*readerCur
|
||||
bufs [][]byte
|
||||
}
|
||||
|
||||
func (c *headCache) read(p []byte) (n int, err error) {
|
||||
pL := len(p)
|
||||
logrus.Debugf("headCache read_%d", pL)
|
||||
if c.cur < int64(pL) {
|
||||
bufL := int64(pL) - c.cur
|
||||
buf := make([]byte, bufL)
|
||||
lr := io.LimitReader(c.reader, bufL)
|
||||
off := 0
|
||||
for c.cur < int64(pL) {
|
||||
n, err = lr.Read(buf[off:])
|
||||
off += n
|
||||
c.cur += int64(n)
|
||||
if err == io.EOF && n == int(bufL) {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
c.bufs = append(c.bufs, buf)
|
||||
}
|
||||
n = 0
|
||||
if c.cur >= int64(pL) {
|
||||
for i := 0; n < pL; i++ {
|
||||
buf := c.bufs[i]
|
||||
r := len(buf)
|
||||
if n+r > pL {
|
||||
r = pL - n
|
||||
}
|
||||
n += copy(p[n:], buf[:r])
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
func (r *headCache) close() error {
|
||||
for i := range r.bufs {
|
||||
r.bufs[i] = nil
|
||||
}
|
||||
r.bufs = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) InitHeadCache() {
|
||||
if r.ss.Link.MFile == nil && r.masterOff == 0 {
|
||||
reader := r.readers[0]
|
||||
r.readers = r.readers[1:]
|
||||
r.headCache = &headCache{readerCur: reader}
|
||||
}
|
||||
}
|
||||
|
||||
func NewReadAtSeeker(ss *SeekableStream, offset int64, forceRange ...bool) (SStreamReadAtSeeker, error) {
|
||||
@ -351,27 +404,23 @@ func NewReadAtSeeker(ss *SeekableStream, offset int64, forceRange ...bool) (SStr
|
||||
}
|
||||
return &FileReadAtSeeker{ss: ss}, nil
|
||||
}
|
||||
var r io.Reader
|
||||
var err error
|
||||
r := &RangeReadReadAtSeeker{
|
||||
ss: ss,
|
||||
masterOff: offset,
|
||||
}
|
||||
if offset != 0 || utils.IsBool(forceRange...) {
|
||||
if offset < 0 || offset > ss.GetSize() {
|
||||
return nil, errors.New("offset out of range")
|
||||
}
|
||||
r, err = ss.RangeRead(http_range.Range{Start: offset, Length: -1})
|
||||
_, err := r.getReaderAtOffset(offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if rc, ok := r.(io.Closer); ok {
|
||||
ss.Closers.Add(rc)
|
||||
}
|
||||
} else {
|
||||
r = ss
|
||||
rc := &readerCur{reader: ss, cur: offset}
|
||||
r.readers = append(r.readers, rc)
|
||||
}
|
||||
return &RangeReadReadAtSeeker{
|
||||
ss: ss,
|
||||
masterOff: offset,
|
||||
readers: []*readerCur{{reader: r, cur: offset}},
|
||||
}, nil
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) GetRawStream() *SeekableStream {
|
||||
@ -379,38 +428,71 @@ func (r *RangeReadReadAtSeeker) GetRawStream() *SeekableStream {
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) getReaderAtOffset(off int64) (*readerCur, error) {
|
||||
var rc *readerCur
|
||||
for _, reader := range r.readers {
|
||||
if reader.cur == -1 {
|
||||
continue
|
||||
}
|
||||
if reader.cur == off {
|
||||
// logrus.Debugf("getReaderAtOffset match_%d", off)
|
||||
return reader, nil
|
||||
}
|
||||
if reader.cur > 0 && off >= reader.cur && (rc == nil || reader.cur < rc.cur) {
|
||||
rc = reader
|
||||
}
|
||||
}
|
||||
reader, err := r.ss.RangeRead(http_range.Range{Start: off, Length: -1})
|
||||
if rc != nil && off-rc.cur <= utils.MB {
|
||||
n, err := utils.CopyWithBufferN(utils.NullWriter{}, rc.reader, off-rc.cur)
|
||||
rc.cur += n
|
||||
if err == io.EOF && rc.cur == off {
|
||||
err = nil
|
||||
}
|
||||
if err == nil {
|
||||
logrus.Debugf("getReaderAtOffset old_%d", off)
|
||||
return rc, nil
|
||||
}
|
||||
rc.cur = -1
|
||||
}
|
||||
logrus.Debugf("getReaderAtOffset new_%d", off)
|
||||
|
||||
// Range请求不能超过文件大小,有些云盘处理不了就会返回整个文件
|
||||
reader, err := r.ss.RangeRead(http_range.Range{Start: off, Length: r.ss.GetSize() - off})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c, ok := reader.(io.Closer); ok {
|
||||
r.ss.Closers.Add(c)
|
||||
}
|
||||
rc := &readerCur{reader: reader, cur: off}
|
||||
rc = &readerCur{reader: reader, cur: off}
|
||||
r.readers = append(r.readers, rc)
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) ReadAt(p []byte, off int64) (int, error) {
|
||||
if off == 0 && r.headCache != nil {
|
||||
return r.headCache.read(p)
|
||||
}
|
||||
rc, err := r.getReaderAtOffset(off)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
num := 0
|
||||
n, num := 0, 0
|
||||
for num < len(p) {
|
||||
n, err := rc.reader.Read(p[num:])
|
||||
n, err = rc.reader.Read(p[num:])
|
||||
rc.cur += int64(n)
|
||||
num += n
|
||||
if err != nil {
|
||||
return num, err
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if err == io.EOF {
|
||||
// io.EOF是reader读取完了
|
||||
rc.cur = -1
|
||||
// yeka/zip包 没有处理EOF,我们要兼容
|
||||
// https://github.com/yeka/zip/blob/03d6312748a9d6e0bc0c9a7275385c09f06d9c14/reader.go#L433
|
||||
if num == len(p) {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
return num, nil
|
||||
return num, err
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
@ -437,6 +519,9 @@ func (r *RangeReadReadAtSeeker) Seek(offset int64, whence int) (int64, error) {
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) Read(p []byte) (n int, err error) {
|
||||
if r.masterOff == 0 && r.headCache != nil {
|
||||
return r.headCache.read(p)
|
||||
}
|
||||
rc, err := r.getReaderAtOffset(r.masterOff)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -448,9 +533,16 @@ func (r *RangeReadReadAtSeeker) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (r *RangeReadReadAtSeeker) Close() error {
|
||||
if r.headCache != nil {
|
||||
r.headCache.close()
|
||||
}
|
||||
return r.ss.Close()
|
||||
}
|
||||
|
||||
type FileReadAtSeeker struct {
|
||||
ss *SeekableStream
|
||||
}
|
||||
|
||||
func (f *FileReadAtSeeker) GetRawStream() *SeekableStream {
|
||||
return f.ss
|
||||
}
|
||||
|
Reference in New Issue
Block a user