feat: misc improvements about upload/copy/hash (#5045)
general: add createTime/updateTime support in webdav and some drivers general: add hash support in some drivers general: cross-storage rapid-upload support general: enhance upload to avoid local temp file if possible general: replace readseekcloser with File interface to speed upstream operations feat(aliyun_open): same as above feat(crypt): add hack for 139cloud Close #4934 Close #4819 baidu_netdisk needs to improve the upload code to support rapid-upload
This commit is contained in:
@ -2,6 +2,7 @@ package aria2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
@ -162,22 +163,27 @@ func (m *Monitor) Complete() error {
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to open file %s", file.Path)
|
||||
}
|
||||
stream := &model.FileStream{
|
||||
s := stream.FileStream{
|
||||
Obj: &model.Object{
|
||||
Name: path.Base(file.Path),
|
||||
Size: size,
|
||||
Modified: time.Now(),
|
||||
IsFolder: false,
|
||||
},
|
||||
ReadCloser: f,
|
||||
Mimetype: mimetype,
|
||||
Reader: f,
|
||||
Closers: utils.NewClosers(f),
|
||||
Mimetype: mimetype,
|
||||
}
|
||||
ss, err := stream.NewSeekableStream(s, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
relDir, err := filepath.Rel(m.tempDir, filepath.Dir(file.Path))
|
||||
if err != nil {
|
||||
log.Errorf("find relation directory error: %v", err)
|
||||
}
|
||||
newDistDir := filepath.Join(dstDirActualPath, relDir)
|
||||
return op.Put(tsk.Ctx, storage, newDistDir, stream, tsk.SetProgress)
|
||||
return op.Put(tsk.Ctx, storage, newDistDir, ss, tsk.SetProgress)
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ var (
|
||||
MetaNotFound = errors.New("meta not found")
|
||||
StorageNotFound = errors.New("storage not found")
|
||||
StreamIncomplete = errors.New("upload/download stream incomplete, possible network issue")
|
||||
StreamPeekFail = errors.New("StreamPeekFail")
|
||||
)
|
||||
|
||||
// NewErr wrap constant error with an extra message
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/alist-org/alist/v3/internal/driver"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
"github.com/alist-org/alist/v3/pkg/task"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
@ -94,9 +95,14 @@ func copyFileBetween2Storages(tsk *task.Task[uint64], srcStorage, dstStorage dri
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get [%s] link", srcFilePath)
|
||||
}
|
||||
stream, err := getFileStreamFromLink(tsk.Ctx, srcFile, link)
|
||||
fs := stream.FileStream{
|
||||
Obj: srcFile,
|
||||
Ctx: tsk.Ctx,
|
||||
}
|
||||
// any link provided is seekable
|
||||
ss, err := stream.NewSeekableStream(fs, link)
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get [%s] stream", srcFilePath)
|
||||
}
|
||||
return op.Put(tsk.Ctx, dstStorage, dstDirPath, stream, tsk.SetProgress, true)
|
||||
return op.Put(tsk.Ctx, dstStorage, dstDirPath, ss, tsk.SetProgress, true)
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/driver"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
@ -93,7 +92,7 @@ func Remove(ctx context.Context, path string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func PutDirectly(ctx context.Context, dstDirPath string, file *model.FileStream, lazyCache ...bool) error {
|
||||
func PutDirectly(ctx context.Context, dstDirPath string, file model.FileStreamer, lazyCache ...bool) error {
|
||||
err := putDirectly(ctx, dstDirPath, file, lazyCache...)
|
||||
if err != nil {
|
||||
log.Errorf("failed put %s: %+v", dstDirPath, err)
|
||||
@ -101,7 +100,7 @@ func PutDirectly(ctx context.Context, dstDirPath string, file *model.FileStream,
|
||||
return err
|
||||
}
|
||||
|
||||
func PutAsTask(dstDirPath string, file *model.FileStream) error {
|
||||
func PutAsTask(dstDirPath string, file model.FileStreamer) error {
|
||||
err := putAsTask(dstDirPath, file)
|
||||
if err != nil {
|
||||
log.Errorf("failed put %s: %+v", dstDirPath, err)
|
||||
|
@ -3,13 +3,12 @@ package fs
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/pkg/task"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
@ -18,7 +17,7 @@ var UploadTaskManager = task.NewTaskManager(3, func(tid *uint64) {
|
||||
})
|
||||
|
||||
// putAsTask add as a put task and return immediately
|
||||
func putAsTask(dstDirPath string, file *model.FileStream) error {
|
||||
func putAsTask(dstDirPath string, file model.FileStreamer) error {
|
||||
storage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get storage")
|
||||
@ -27,11 +26,12 @@ func putAsTask(dstDirPath string, file *model.FileStream) error {
|
||||
return errors.WithStack(errs.UploadNotSupported)
|
||||
}
|
||||
if file.NeedStore() {
|
||||
tempFile, err := utils.CreateTempFile(file, file.GetSize())
|
||||
_, err := file.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create temp file")
|
||||
}
|
||||
file.SetReadCloser(tempFile)
|
||||
//file.SetReader(tempFile)
|
||||
//file.SetTmpFile(tempFile)
|
||||
}
|
||||
UploadTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
|
||||
Name: fmt.Sprintf("upload %s to [%s](%s)", file.GetName(), storage.GetStorage().MountPath, dstDirActualPath),
|
||||
@ -43,7 +43,7 @@ func putAsTask(dstDirPath string, file *model.FileStream) error {
|
||||
}
|
||||
|
||||
// putDirect put the file and return after finish
|
||||
func putDirectly(ctx context.Context, dstDirPath string, file *model.FileStream, lazyCache ...bool) error {
|
||||
func putDirectly(ctx context.Context, dstDirPath string, file model.FileStreamer, lazyCache ...bool) error {
|
||||
storage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get storage")
|
||||
|
@ -1,73 +0,0 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/net"
|
||||
"github.com/alist-org/alist/v3/pkg/http_range"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"github.com/alist-org/alist/v3/server/common"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func getFileStreamFromLink(ctx context.Context, file model.Obj, link *model.Link) (*model.FileStream, error) {
|
||||
var rc io.ReadCloser
|
||||
var err error
|
||||
mimetype := utils.GetMimeType(file.GetName())
|
||||
if link.RangeReadCloser.RangeReader != nil {
|
||||
rc, err = link.RangeReadCloser.RangeReader(http_range.Range{Length: -1})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if link.ReadSeekCloser != nil {
|
||||
rc = link.ReadSeekCloser
|
||||
} else if link.Concurrency != 0 || link.PartSize != 0 {
|
||||
down := net.NewDownloader(func(d *net.Downloader) {
|
||||
d.Concurrency = link.Concurrency
|
||||
d.PartSize = link.PartSize
|
||||
})
|
||||
req := &net.HttpRequestParams{
|
||||
URL: link.URL,
|
||||
Range: http_range.Range{Length: -1},
|
||||
Size: file.GetSize(),
|
||||
HeaderRef: link.Header,
|
||||
}
|
||||
rc, err = down.Download(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
//TODO: add accelerator
|
||||
req, err := http.NewRequest(http.MethodGet, link.URL, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create request for %s", link.URL)
|
||||
}
|
||||
for h, val := range link.Header {
|
||||
req.Header[h] = val
|
||||
}
|
||||
res, err := common.HttpClient().Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get response for %s", link.URL)
|
||||
}
|
||||
mt := res.Header.Get("Content-Type")
|
||||
if mt != "" && strings.ToLower(mt) != "application/octet-stream" {
|
||||
mimetype = mt
|
||||
}
|
||||
rc = res.Body
|
||||
}
|
||||
// if can't get mimetype, use default application/octet-stream
|
||||
if mimetype == "" {
|
||||
mimetype = "application/octet-stream"
|
||||
}
|
||||
stream := &model.FileStream{
|
||||
Obj: file,
|
||||
ReadCloser: rc,
|
||||
Mimetype: mimetype,
|
||||
}
|
||||
return stream, nil
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
@ -22,13 +23,14 @@ type LinkArgs struct {
|
||||
}
|
||||
|
||||
type Link struct {
|
||||
URL string `json:"url"`
|
||||
Header http.Header `json:"header"` // needed header (for url) or response header(for data or writer)
|
||||
RangeReadCloser RangeReadCloser `json:"-"` // recommended way
|
||||
ReadSeekCloser io.ReadSeekCloser `json:"-"` // best for local,smb... file system, which exposes ReadSeekCloser
|
||||
URL string `json:"url"` // most common way
|
||||
Header http.Header `json:"header"` // needed header (for url)
|
||||
RangeReadCloser RangeReadCloserIF `json:"-"` // recommended way if can't use URL
|
||||
MFile File `json:"-"` // best for local,smb... file system, which exposes MFile
|
||||
|
||||
Expiration *time.Duration // local cache expire Duration
|
||||
IPCacheKey bool `json:"-"` // add ip to cache key
|
||||
|
||||
//for accelerating request, use multi-thread downloading
|
||||
Concurrency int `json:"concurrency"`
|
||||
PartSize int `json:"part_size"`
|
||||
@ -45,10 +47,23 @@ type FsOtherArgs struct {
|
||||
Method string `json:"method" form:"method"`
|
||||
Data interface{} `json:"data" form:"data"`
|
||||
}
|
||||
type RangeReadCloser struct {
|
||||
RangeReader RangeReaderFunc
|
||||
Closers *utils.Closers
|
||||
type RangeReadCloserIF interface {
|
||||
RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error)
|
||||
utils.ClosersIF
|
||||
}
|
||||
|
||||
type WriterFunc func(w io.Writer) error
|
||||
type RangeReaderFunc func(httpRange http_range.Range) (io.ReadCloser, error)
|
||||
var _ RangeReadCloserIF = (*RangeReadCloser)(nil)
|
||||
|
||||
type RangeReadCloser struct {
|
||||
RangeReader RangeReaderFunc
|
||||
utils.Closers
|
||||
}
|
||||
|
||||
func (r RangeReadCloser) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
|
||||
rc, err := r.RangeReader(ctx, httpRange)
|
||||
r.Closers.Add(rc)
|
||||
return rc, err
|
||||
}
|
||||
|
||||
// type WriterFunc func(w io.Writer) error
|
||||
type RangeReaderFunc func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error)
|
||||
|
25
internal/model/file.go
Normal file
25
internal/model/file.go
Normal file
@ -0,0 +1,25 @@
|
||||
package model
|
||||
|
||||
import "io"
|
||||
|
||||
// File is basic file level accessing interface
|
||||
type File interface {
|
||||
io.Reader
|
||||
io.ReaderAt
|
||||
io.Seeker
|
||||
io.Closer
|
||||
}
|
||||
|
||||
type NopMFileIF interface {
|
||||
io.Reader
|
||||
io.ReaderAt
|
||||
io.Seeker
|
||||
}
|
||||
type NopMFile struct {
|
||||
NopMFileIF
|
||||
}
|
||||
|
||||
func (NopMFile) Close() error { return nil }
|
||||
func NewNopMFile(r NopMFileIF) File {
|
||||
return NopMFile{r}
|
||||
}
|
@ -1,6 +1,8 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"github.com/alist-org/alist/v3/pkg/http_range"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"io"
|
||||
"regexp"
|
||||
"sort"
|
||||
@ -20,8 +22,9 @@ type Obj interface {
|
||||
GetSize() int64
|
||||
GetName() string
|
||||
ModTime() time.Time
|
||||
CreateTime() time.Time
|
||||
IsDir() bool
|
||||
//GetHash() (string, string)
|
||||
GetHash() utils.HashInfo
|
||||
|
||||
// The internal information of the driver.
|
||||
// If you want to use it, please understand what it means
|
||||
@ -29,14 +32,20 @@ type Obj interface {
|
||||
GetPath() string
|
||||
}
|
||||
|
||||
// FileStreamer ->check FileStream for more comments
|
||||
type FileStreamer interface {
|
||||
io.ReadCloser
|
||||
io.Reader
|
||||
io.Closer
|
||||
Obj
|
||||
GetMimetype() string
|
||||
SetReadCloser(io.ReadCloser)
|
||||
//SetReader(io.Reader)
|
||||
NeedStore() bool
|
||||
GetReadCloser() io.ReadCloser
|
||||
GetOld() Obj
|
||||
GetExist() Obj
|
||||
SetExist(Obj)
|
||||
//for a non-seekable Stream, RangeRead supports peeking some data, and CacheFullInTempFile still works
|
||||
RangeRead(http_range.Range) (io.Reader, error)
|
||||
//for a non-seekable Stream, if Read is called, this function won't work
|
||||
CacheFullInTempFile() (File, error)
|
||||
}
|
||||
|
||||
type URL interface {
|
||||
@ -50,9 +59,6 @@ type Thumb interface {
|
||||
type SetPath interface {
|
||||
SetPath(path string)
|
||||
}
|
||||
type SetHash interface {
|
||||
SetHash(hash string, hashType string)
|
||||
}
|
||||
|
||||
func SortFiles(objs []Obj, orderBy, orderDirection string) {
|
||||
if orderBy == "" {
|
||||
|
@ -28,9 +28,9 @@ type Object struct {
|
||||
Name string
|
||||
Size int64
|
||||
Modified time.Time
|
||||
Ctime time.Time // file create time
|
||||
IsFolder bool
|
||||
Hash string
|
||||
HashType string
|
||||
HashInfo utils.HashInfo
|
||||
}
|
||||
|
||||
func (o *Object) GetName() string {
|
||||
@ -44,6 +44,12 @@ func (o *Object) GetSize() int64 {
|
||||
func (o *Object) ModTime() time.Time {
|
||||
return o.Modified
|
||||
}
|
||||
func (o *Object) CreateTime() time.Time {
|
||||
if o.Ctime.IsZero() {
|
||||
return o.ModTime()
|
||||
}
|
||||
return o.Ctime
|
||||
}
|
||||
|
||||
func (o *Object) IsDir() bool {
|
||||
return o.IsFolder
|
||||
@ -61,13 +67,8 @@ func (o *Object) SetPath(path string) {
|
||||
o.Path = path
|
||||
}
|
||||
|
||||
func (o *Object) SetHash(hash string, hashType string) {
|
||||
o.Hash = hash
|
||||
o.HashType = hashType
|
||||
}
|
||||
|
||||
func (o *Object) GetHash() (string, string) {
|
||||
return o.Hash, o.HashType
|
||||
func (o *Object) GetHash() utils.HashInfo {
|
||||
return o.HashInfo
|
||||
}
|
||||
|
||||
type Thumbnail struct {
|
||||
|
@ -1,33 +0,0 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"io"
|
||||
)
|
||||
|
||||
type FileStream struct {
|
||||
Obj
|
||||
io.ReadCloser
|
||||
Mimetype string
|
||||
WebPutAsTask bool
|
||||
Old Obj
|
||||
}
|
||||
|
||||
func (f *FileStream) GetMimetype() string {
|
||||
return f.Mimetype
|
||||
}
|
||||
|
||||
func (f *FileStream) NeedStore() bool {
|
||||
return f.WebPutAsTask
|
||||
}
|
||||
|
||||
func (f *FileStream) GetReadCloser() io.ReadCloser {
|
||||
return f.ReadCloser
|
||||
}
|
||||
|
||||
func (f *FileStream) SetReadCloser(rc io.ReadCloser) {
|
||||
f.ReadCloser = rc
|
||||
}
|
||||
|
||||
func (f *FileStream) GetOld() Obj {
|
||||
return f.Old
|
||||
}
|
@ -124,11 +124,11 @@ func (u *User) JoinPath(reqPath string) (string, error) {
|
||||
}
|
||||
|
||||
func StaticHash(password string) string {
|
||||
return utils.GetSHA256Encode([]byte(fmt.Sprintf("%s-%s", password, StaticHashSalt)))
|
||||
return utils.HashData(utils.SHA256, []byte(fmt.Sprintf("%s-%s", password, StaticHashSalt)))
|
||||
}
|
||||
|
||||
func HashPwd(static string, salt string) string {
|
||||
return utils.GetSHA256Encode([]byte(fmt.Sprintf("%s-%s", static, salt)))
|
||||
return utils.HashData(utils.SHA256, []byte(fmt.Sprintf("%s-%s", static, salt)))
|
||||
}
|
||||
|
||||
func TwoHashPwd(password string, salt string) string {
|
||||
|
@ -43,7 +43,7 @@ type Downloader struct {
|
||||
//RequestParam HttpRequestParams
|
||||
HttpClient HttpRequestFunc
|
||||
}
|
||||
type HttpRequestFunc func(params *HttpRequestParams) (*http.Response, error)
|
||||
type HttpRequestFunc func(ctx context.Context, params *HttpRequestParams) (*http.Response, error)
|
||||
|
||||
func NewDownloader(options ...func(*Downloader)) *Downloader {
|
||||
d := &Downloader{
|
||||
@ -131,7 +131,7 @@ func (d *downloader) download() (io.ReadCloser, error) {
|
||||
}
|
||||
|
||||
if d.cfg.Concurrency == 1 {
|
||||
resp, err := d.cfg.HttpClient(d.params)
|
||||
resp, err := d.cfg.HttpClient(d.ctx, d.params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -258,7 +258,7 @@ func (d *downloader) downloadChunk(ch *chunk) error {
|
||||
|
||||
func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int64, error) {
|
||||
|
||||
resp, err := d.cfg.HttpClient(params)
|
||||
resp, err := d.cfg.HttpClient(d.ctx, params)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -371,10 +371,10 @@ type chunk struct {
|
||||
//boundary http_range.Range
|
||||
}
|
||||
|
||||
func DefaultHttpRequestFunc(params *HttpRequestParams) (*http.Response, error) {
|
||||
func DefaultHttpRequestFunc(ctx context.Context, params *HttpRequestParams) (*http.Response, error) {
|
||||
header := http_range.ApplyRangeToHttpHeader(params.Range, params.HeaderRef)
|
||||
|
||||
res, err := RequestHttp("GET", header, params.URL)
|
||||
res, err := RequestHttp(ctx, "GET", header, params.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -456,7 +456,7 @@ type Buf struct {
|
||||
// NewBuf is a buffer that can have 1 read & 1 write at the same time.
|
||||
// when read is faster write, immediately feed data to read after written
|
||||
func NewBuf(ctx context.Context, maxSize int, id int) *Buf {
|
||||
d := make([]byte, maxSize)
|
||||
d := make([]byte, 0, maxSize)
|
||||
return &Buf{ctx: ctx, buffer: bytes.NewBuffer(d), size: maxSize, notify: make(chan struct{})}
|
||||
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ type downloadCaptureClient struct {
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (c *downloadCaptureClient) HttpRequest(params *HttpRequestParams) (*http.Response, error) {
|
||||
func (c *downloadCaptureClient) HttpRequest(ctx context.Context, params *HttpRequestParams) (*http.Response, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
@ -110,7 +111,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||
}
|
||||
switch {
|
||||
case len(ranges) == 0:
|
||||
reader, err := RangeReaderFunc(http_range.Range{Length: -1})
|
||||
reader, err := RangeReaderFunc(context.Background(), http_range.Range{Length: -1})
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
@ -129,7 +130,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||
// does not request multiple parts might not support
|
||||
// multipart responses."
|
||||
ra := ranges[0]
|
||||
sendContent, err = RangeReaderFunc(ra)
|
||||
sendContent, err = RangeReaderFunc(context.Background(), ra)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
|
||||
return
|
||||
@ -156,7 +157,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
reader, err := RangeReaderFunc(ra)
|
||||
reader, err := RangeReaderFunc(context.Background(), ra)
|
||||
if err != nil {
|
||||
pw.CloseWithError(err)
|
||||
return
|
||||
@ -209,8 +210,8 @@ func ProcessHeader(origin, override http.Header) http.Header {
|
||||
}
|
||||
|
||||
// RequestHttp deal with Header properly then send the request
|
||||
func RequestHttp(httpMethod string, headerOverride http.Header, URL string) (*http.Response, error) {
|
||||
req, err := http.NewRequest(httpMethod, URL, nil)
|
||||
func RequestHttp(ctx context.Context, httpMethod string, headerOverride http.Header, URL string) (*http.Response, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, httpMethod, URL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package op
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
stdpath "path"
|
||||
"time"
|
||||
|
||||
@ -481,18 +480,10 @@ func Remove(ctx context.Context, storage driver.Driver, path string) error {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file *model.FileStream, up driver.UpdateProgress, lazyCache ...bool) error {
|
||||
func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file model.FileStreamer, up driver.UpdateProgress, lazyCache ...bool) error {
|
||||
if storage.Config().CheckStatus && storage.GetStorage().Status != WORK {
|
||||
return errors.Errorf("storage not init: %s", storage.GetStorage().Status)
|
||||
}
|
||||
defer func() {
|
||||
if f, ok := file.GetReadCloser().(*os.File); ok {
|
||||
err := os.RemoveAll(f.Name())
|
||||
if err != nil {
|
||||
log.Errorf("failed to remove file [%s]", f.Name())
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
log.Errorf("failed to close file streamer, %v", err)
|
||||
@ -508,7 +499,7 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file *mo
|
||||
if fi.GetSize() == 0 {
|
||||
err = Remove(ctx, storage, dstPath)
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed remove file that exist and have size 0")
|
||||
return errors.WithMessagef(err, "while uploading, failed remove existing file which size = 0")
|
||||
}
|
||||
} else if storage.Config().NoOverwriteUpload {
|
||||
// try to rename old obj
|
||||
@ -517,7 +508,7 @@ func Put(ctx context.Context, storage driver.Driver, dstDirPath string, file *mo
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
file.Old = fi
|
||||
file.SetExist(fi)
|
||||
}
|
||||
}
|
||||
err = MakeDir(ctx, storage, dstDirPath)
|
||||
|
@ -2,7 +2,7 @@ package qbittorrent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
@ -157,17 +157,22 @@ func (m *Monitor) complete() error {
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to open file %s", tempPath)
|
||||
}
|
||||
stream := &model.FileStream{
|
||||
s := stream.FileStream{
|
||||
Obj: &model.Object{
|
||||
Name: fileName,
|
||||
Size: size,
|
||||
Modified: time.Now(),
|
||||
IsFolder: false,
|
||||
},
|
||||
ReadCloser: struct{ io.ReadSeekCloser }{f},
|
||||
Mimetype: mimetype,
|
||||
Reader: f,
|
||||
Closers: utils.NewClosers(f),
|
||||
Mimetype: mimetype,
|
||||
}
|
||||
return op.Put(tsk.Ctx, storage, dstDir, stream, tsk.SetProgress)
|
||||
ss, err := stream.NewSeekableStream(s, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return op.Put(tsk.Ctx, storage, dstDir, ss, tsk.SetProgress)
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
278
internal/stream/stream.go
Normal file
278
internal/stream/stream.go
Normal file
@ -0,0 +1,278 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/pkg/http_range"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
type FileStream struct {
|
||||
Ctx context.Context
|
||||
model.Obj
|
||||
io.Reader
|
||||
Mimetype string
|
||||
WebPutAsTask bool
|
||||
Exist model.Obj //the file existed in the destination, we can reuse some info since we wil overwrite it
|
||||
utils.Closers
|
||||
tmpFile *os.File //if present, tmpFile has full content, it will be deleted at last
|
||||
peekBuff *bytes.Reader
|
||||
}
|
||||
|
||||
func (f *FileStream) GetMimetype() string {
|
||||
return f.Mimetype
|
||||
}
|
||||
|
||||
func (f *FileStream) NeedStore() bool {
|
||||
return f.WebPutAsTask
|
||||
}
|
||||
func (f *FileStream) Close() error {
|
||||
var err1, err2 error
|
||||
err1 = f.Closers.Close()
|
||||
if f.tmpFile != nil {
|
||||
err2 = os.RemoveAll(f.tmpFile.Name())
|
||||
if err2 != nil {
|
||||
err2 = errs.NewErr(err2, "failed to remove tmpFile [%s]", f.tmpFile.Name())
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Join(err1, err2)
|
||||
}
|
||||
|
||||
func (f *FileStream) GetExist() model.Obj {
|
||||
return f.Exist
|
||||
}
|
||||
func (f *FileStream) SetExist(obj model.Obj) {
|
||||
f.Exist = obj
|
||||
}
|
||||
|
||||
// CacheFullInTempFile save all data into tmpFile. Not recommended since it wears disk,
|
||||
// and can't start upload until the file is written. It's not thread-safe!
|
||||
// won't check if some
|
||||
func (f *FileStream) CacheFullInTempFile() (model.File, error) {
|
||||
if f.tmpFile != nil {
|
||||
return f.tmpFile, nil
|
||||
}
|
||||
if file, ok := f.Reader.(model.File); ok {
|
||||
return file, nil
|
||||
}
|
||||
tmpF, err := utils.CreateTempFile(f.Reader, f.GetSize())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.tmpFile = tmpF
|
||||
f.Reader = tmpF
|
||||
return f.tmpFile, nil
|
||||
}
|
||||
|
||||
const InMemoryBufMaxSize = 10 // Megabytes
|
||||
const InMemoryBufMaxSizeBytes = InMemoryBufMaxSize * 1024 * 1024
|
||||
|
||||
// RangeRead have to cache all data first since only Reader is provided.
|
||||
// also support a peeking RangeRead at very start, but won't buffer more than 10MB data in memory
|
||||
func (f *FileStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
||||
if httpRange.Length == -1 {
|
||||
httpRange.Length = f.GetSize()
|
||||
}
|
||||
|
||||
if f.peekBuff != nil && httpRange.Start < int64(f.peekBuff.Len()) && httpRange.Start+httpRange.Length-1 < int64(f.peekBuff.Len()) {
|
||||
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
if httpRange.Start == 0 && httpRange.Length <= InMemoryBufMaxSizeBytes && f.peekBuff == nil {
|
||||
bufSize := utils.Min(httpRange.Length, f.GetSize())
|
||||
newBuf := bytes.NewBuffer(make([]byte, 0, bufSize))
|
||||
n, err := io.CopyN(newBuf, f.Reader, bufSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if n != bufSize {
|
||||
return nil, fmt.Errorf("stream RangeRead did not get all data in peek, expect =%d ,actual =%d", bufSize, n)
|
||||
}
|
||||
f.peekBuff = bytes.NewReader(newBuf.Bytes())
|
||||
f.Reader = io.MultiReader(f.peekBuff, f.Reader)
|
||||
return io.NewSectionReader(f.peekBuff, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
if f.tmpFile == nil {
|
||||
_, err := f.CacheFullInTempFile()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return io.NewSectionReader(f.tmpFile, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
|
||||
var _ model.FileStreamer = (*SeekableStream)(nil)
|
||||
var _ model.FileStreamer = (*FileStream)(nil)
|
||||
|
||||
//var _ seekableStream = (*FileStream)(nil)
|
||||
|
||||
// for most internal stream, which is either RangeReadCloser or MFile
|
||||
type SeekableStream struct {
|
||||
FileStream
|
||||
Link *model.Link
|
||||
// should have one of belows to support rangeRead
|
||||
rangeReadCloser model.RangeReadCloserIF
|
||||
mFile model.File
|
||||
}
|
||||
|
||||
func NewSeekableStream(fs FileStream, link *model.Link) (*SeekableStream, error) {
|
||||
if len(fs.Mimetype) == 0 {
|
||||
fs.Mimetype = utils.GetMimeType(fs.Obj.GetName())
|
||||
}
|
||||
ss := SeekableStream{FileStream: fs, Link: link}
|
||||
if ss.Reader != nil {
|
||||
result, ok := ss.Reader.(model.File)
|
||||
if ok {
|
||||
ss.mFile = result
|
||||
ss.Closers.Add(result)
|
||||
return &ss, nil
|
||||
}
|
||||
}
|
||||
if ss.Link != nil {
|
||||
if ss.Link.MFile != nil {
|
||||
ss.mFile = ss.Link.MFile
|
||||
ss.Reader = ss.Link.MFile
|
||||
ss.Closers.Add(ss.Link.MFile)
|
||||
return &ss, nil
|
||||
}
|
||||
|
||||
if ss.Link.RangeReadCloser != nil {
|
||||
ss.rangeReadCloser = ss.Link.RangeReadCloser
|
||||
return &ss, nil
|
||||
}
|
||||
if len(ss.Link.URL) > 0 {
|
||||
rrc, err := GetRangeReadCloserFromLink(ss.GetSize(), link)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss.rangeReadCloser = rrc
|
||||
return &ss, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("illegal seekableStream")
|
||||
}
|
||||
|
||||
//func (ss *SeekableStream) Peek(length int) {
|
||||
//
|
||||
//}
|
||||
|
||||
// RangeRead is not thread-safe, pls use it in single thread only.
|
||||
func (ss *SeekableStream) RangeRead(httpRange http_range.Range) (io.Reader, error) {
|
||||
if httpRange.Length == -1 {
|
||||
httpRange.Length = ss.GetSize()
|
||||
}
|
||||
if ss.mFile != nil {
|
||||
return io.NewSectionReader(ss.mFile, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
if ss.tmpFile != nil {
|
||||
return io.NewSectionReader(ss.tmpFile, httpRange.Start, httpRange.Length), nil
|
||||
}
|
||||
if ss.rangeReadCloser != nil {
|
||||
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, httpRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rc, nil
|
||||
}
|
||||
return nil, fmt.Errorf("can't find mFile or rangeReadCloser")
|
||||
}
|
||||
|
||||
//func (f *FileStream) GetReader() io.Reader {
|
||||
// return f.Reader
|
||||
//}
|
||||
|
||||
// only provide Reader as full stream when it's demanded. in rapid-upload, we can skip this to save memory
|
||||
func (ss *SeekableStream) Read(p []byte) (n int, err error) {
|
||||
//f.mu.Lock()
|
||||
|
||||
//f.peekedOnce = true
|
||||
//defer f.mu.Unlock()
|
||||
if ss.Reader == nil {
|
||||
if ss.rangeReadCloser == nil {
|
||||
return 0, fmt.Errorf("illegal seekableStream")
|
||||
}
|
||||
rc, err := ss.rangeReadCloser.RangeRead(ss.Ctx, http_range.Range{Length: -1})
|
||||
if err != nil {
|
||||
return 0, nil
|
||||
}
|
||||
ss.Reader = io.NopCloser(rc)
|
||||
ss.Closers.Add(rc)
|
||||
|
||||
}
|
||||
return ss.Reader.Read(p)
|
||||
}
|
||||
|
||||
func (ss *SeekableStream) CacheFullInTempFile() (model.File, error) {
|
||||
if ss.tmpFile != nil {
|
||||
return ss.tmpFile, nil
|
||||
}
|
||||
if ss.mFile != nil {
|
||||
return ss.mFile, nil
|
||||
}
|
||||
tmpF, err := utils.CreateTempFile(ss, ss.GetSize())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ss.tmpFile = tmpF
|
||||
ss.Reader = tmpF
|
||||
return ss.tmpFile, nil
|
||||
}
|
||||
|
||||
//func (f *FileStream) SetReader(r io.Reader) {
|
||||
// f.Reader = r
|
||||
//}
|
||||
|
||||
/*
|
||||
// RangePeek allow once peek at start of the data, since most drives check first XX bytes for rapid-upload
|
||||
func (f *FileStream) RangePeek(length int64) (*bytes.Buffer, error) {
|
||||
if length > InMemoryBufMaxSize*1024*1024 {
|
||||
return nil, errs.NewErr(errs.StreamPeekFail, "can't peek size > %d MB", InMemoryBufMaxSize)
|
||||
}
|
||||
httpRange := &http_range.Range{Length: length}
|
||||
bufSize := utils.Min(httpRange.Length, f.GetSize())
|
||||
buf := bytes.NewBuffer(make([]byte, 0, bufSize))
|
||||
if f.link == nil && f.tmpFile == nil {
|
||||
if !f.peekedOnce {
|
||||
f.mu.Lock()
|
||||
f.peekedOnce = true
|
||||
_, err := io.CopyN(buf, f.Reader, bufSize)
|
||||
|
||||
if err != nil {
|
||||
f.mu.Unlock()
|
||||
return nil, errs.NewErr(errs.StreamPeekFail, "failed to copyN %d bytes data", bufSize)
|
||||
}
|
||||
f.Reader = io.MultiReader(buf, f.Reader)
|
||||
f.mu.Unlock()
|
||||
return buf, nil
|
||||
|
||||
}
|
||||
return nil, errs.NewErr(errs.StreamPeekFail, "link and tmpFile both are null")
|
||||
}
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
rc, _, err := GetReadCloserFromLink(f.Obj, f.link, httpRange)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = io.CopyN(buf, rc, bufSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf, nil
|
||||
}*/
|
||||
|
||||
//func (f *FileStream) SetTmpFile(r *os.File) {
|
||||
// f.mu.Lock()
|
||||
// //f.readDisabled = true
|
||||
// f.tmpFile = r
|
||||
// f.Reader = r
|
||||
// f.mu.Unlock()
|
||||
//}
|
84
internal/stream/util.go
Normal file
84
internal/stream/util.go
Normal file
@ -0,0 +1,84 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/net"
|
||||
"github.com/alist-org/alist/v3/pkg/http_range"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func GetRangeReadCloserFromLink(size int64, link *model.Link) (model.RangeReadCloserIF, error) {
|
||||
if len(link.URL) == 0 {
|
||||
return nil, fmt.Errorf("can't create RangeReadCloser since URL is empty in link")
|
||||
}
|
||||
//remoteClosers := utils.EmptyClosers()
|
||||
rangeReaderFunc := func(ctx context.Context, r http_range.Range) (io.ReadCloser, error) {
|
||||
if link.Concurrency != 0 || link.PartSize != 0 {
|
||||
header := net.ProcessHeader(http.Header{}, link.Header)
|
||||
down := net.NewDownloader(func(d *net.Downloader) {
|
||||
d.Concurrency = link.Concurrency
|
||||
d.PartSize = link.PartSize
|
||||
})
|
||||
req := &net.HttpRequestParams{
|
||||
URL: link.URL,
|
||||
Range: r,
|
||||
Size: size,
|
||||
HeaderRef: header,
|
||||
}
|
||||
rc, err := down.Download(ctx, req)
|
||||
if err != nil {
|
||||
return nil, errs.NewErr(err, "GetReadCloserFromLink failed")
|
||||
}
|
||||
return rc, nil
|
||||
|
||||
}
|
||||
if len(link.URL) > 0 {
|
||||
response, err := RequestRangedHttp(ctx, link, r.Start, r.Length)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("http request failure,status: %d err:%s", response.StatusCode, err)
|
||||
}
|
||||
if r.Start == 0 && (r.Length == -1 || r.Length == size) || response.StatusCode == http.StatusPartialContent ||
|
||||
checkContentRange(&response.Header, size, r.Start) {
|
||||
return response.Body, nil
|
||||
} else if response.StatusCode == http.StatusOK {
|
||||
log.Warnf("remote http server not supporting range request, expect low perfromace!")
|
||||
readCloser, err := net.GetRangedHttpReader(response.Body, r.Start, r.Length)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return readCloser, nil
|
||||
|
||||
}
|
||||
|
||||
return response.Body, nil
|
||||
}
|
||||
|
||||
return nil, errs.NotSupport
|
||||
}
|
||||
resultRangeReadCloser := model.RangeReadCloser{RangeReader: rangeReaderFunc}
|
||||
return &resultRangeReadCloser, nil
|
||||
}
|
||||
|
||||
func RequestRangedHttp(ctx context.Context, link *model.Link, offset, length int64) (*http.Response, error) {
|
||||
header := net.ProcessHeader(http.Header{}, link.Header)
|
||||
header = http_range.ApplyRangeToHttpHeader(http_range.Range{Start: offset, Length: length}, header)
|
||||
|
||||
return net.RequestHttp(ctx, "GET", header, link.URL)
|
||||
}
|
||||
|
||||
// 139 cloud does not properly return 206 http status code, add a hack here
|
||||
func checkContentRange(header *http.Header, size, offset int64) bool {
|
||||
r, err2 := http_range.ParseRange(header.Get("Content-Range"), size)
|
||||
if err2 != nil {
|
||||
log.Warnf("exception trying to parse Content-Range, will ignore,err=%s", err2)
|
||||
}
|
||||
if len(r) == 1 && r[0].Start == offset {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
Reference in New Issue
Block a user