feat(offline-download): allow using offline download tools in any storage (#7716)
* Feat(offline-download): allow using thunder offline download tool in any storage * Feat(offline-download): allow using 115 offline download tool in any storage * Feat(offline-download): allow using pikpak offline download tool in any storage * style(offline-download): unify offline download tool names * feat(offline-download): show available offline download tools only * Fix(offline-download): update unmodified tool names. --------- Co-authored-by: Andy Hsu <i@nn.ci>
This commit is contained in:
@ -1,11 +1,9 @@
|
||||
package tool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/driver"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
@ -14,80 +12,60 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/xhofe/tache"
|
||||
"net/http"
|
||||
"os"
|
||||
stdpath "path"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TransferTask struct {
|
||||
task.TaskExtension
|
||||
FileDir string `json:"file_dir"`
|
||||
DstDirPath string `json:"dst_dir_path"`
|
||||
TempDir string `json:"temp_dir"`
|
||||
DeletePolicy DeletePolicy `json:"delete_policy"`
|
||||
file File
|
||||
Status string `json:"-"` //don't save status to save space
|
||||
SrcObjPath string `json:"src_obj_path"`
|
||||
DstDirPath string `json:"dst_dir_path"`
|
||||
SrcStorage driver.Driver `json:"-"`
|
||||
DstStorage driver.Driver `json:"-"`
|
||||
SrcStorageMp string `json:"src_storage_mp"`
|
||||
DstStorageMp string `json:"dst_storage_mp"`
|
||||
DeletePolicy DeletePolicy `json:"delete_policy"`
|
||||
}
|
||||
|
||||
func (t *TransferTask) Run() error {
|
||||
t.ClearEndTime()
|
||||
t.SetStartTime(time.Now())
|
||||
defer func() { t.SetEndTime(time.Now()) }()
|
||||
// check dstDir again
|
||||
var err error
|
||||
if (t.file == File{}) {
|
||||
t.file, err = GetFile(t.FileDir)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to get file %s", t.FileDir)
|
||||
}
|
||||
if t.SrcStorage == nil {
|
||||
return transferStdPath(t)
|
||||
} else {
|
||||
return transferObjPath(t)
|
||||
}
|
||||
storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.DstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
mimetype := utils.GetMimeType(t.file.Path)
|
||||
rc, err := t.file.GetReadCloser()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to open file %s", t.file.Path)
|
||||
}
|
||||
s := &stream.FileStream{
|
||||
Ctx: nil,
|
||||
Obj: &model.Object{
|
||||
Name: filepath.Base(t.file.Path),
|
||||
Size: t.file.Size,
|
||||
Modified: t.file.Modified,
|
||||
IsFolder: false,
|
||||
},
|
||||
Reader: rc,
|
||||
Mimetype: mimetype,
|
||||
Closers: utils.NewClosers(rc),
|
||||
}
|
||||
relDir, err := filepath.Rel(t.TempDir, filepath.Dir(t.file.Path))
|
||||
if err != nil {
|
||||
log.Errorf("find relation directory error: %v", err)
|
||||
}
|
||||
newDistDir := filepath.Join(dstDirActualPath, relDir)
|
||||
return op.Put(t.Ctx(), storage, newDistDir, s, t.SetProgress)
|
||||
}
|
||||
|
||||
func (t *TransferTask) GetName() string {
|
||||
return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.DstDirPath)
|
||||
return fmt.Sprintf("transfer [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
|
||||
}
|
||||
|
||||
func (t *TransferTask) GetStatus() string {
|
||||
return "transferring"
|
||||
return t.Status
|
||||
}
|
||||
|
||||
func (t *TransferTask) OnSucceeded() {
|
||||
if t.DeletePolicy == DeleteOnUploadSucceed || t.DeletePolicy == DeleteAlways {
|
||||
err := os.Remove(t.file.Path)
|
||||
if err != nil {
|
||||
log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error())
|
||||
if t.SrcStorage == nil {
|
||||
removeStdTemp(t)
|
||||
} else {
|
||||
removeObjTemp(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TransferTask) OnFailed() {
|
||||
if t.DeletePolicy == DeleteOnUploadFailed || t.DeletePolicy == DeleteAlways {
|
||||
err := os.Remove(t.file.Path)
|
||||
if err != nil {
|
||||
log.Errorf("failed to delete file %s, error: %s", t.file.Path, err.Error())
|
||||
if t.SrcStorage == nil {
|
||||
removeStdTemp(t)
|
||||
} else {
|
||||
removeObjTemp(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -95,3 +73,202 @@ func (t *TransferTask) OnFailed() {
|
||||
var (
|
||||
TransferTaskManager *tache.Manager[*TransferTask]
|
||||
)
|
||||
|
||||
func transferStd(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {
|
||||
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get dst storage")
|
||||
}
|
||||
entries, err := os.ReadDir(tempDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taskCreator, _ := ctx.Value("user").(*model.User)
|
||||
for _, entry := range entries {
|
||||
t := &TransferTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: taskCreator,
|
||||
},
|
||||
SrcObjPath: stdpath.Join(tempDir, entry.Name()),
|
||||
DstDirPath: dstDirActualPath,
|
||||
DstStorage: dstStorage,
|
||||
DstStorageMp: dstStorage.GetStorage().MountPath,
|
||||
DeletePolicy: deletePolicy,
|
||||
}
|
||||
TransferTaskManager.Add(t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func transferStdPath(t *TransferTask) error {
|
||||
t.Status = "getting src object"
|
||||
info, err := os.Stat(t.SrcObjPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
t.Status = "src object is dir, listing objs"
|
||||
entries, err := os.ReadDir(t.SrcObjPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entry := range entries {
|
||||
srcRawPath := stdpath.Join(t.SrcObjPath, entry.Name())
|
||||
dstObjPath := stdpath.Join(t.DstDirPath, info.Name())
|
||||
t := &TransferTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: t.Creator,
|
||||
},
|
||||
SrcObjPath: srcRawPath,
|
||||
DstDirPath: dstObjPath,
|
||||
DstStorage: t.DstStorage,
|
||||
SrcStorageMp: t.SrcStorageMp,
|
||||
DstStorageMp: t.DstStorageMp,
|
||||
DeletePolicy: t.DeletePolicy,
|
||||
}
|
||||
TransferTaskManager.Add(t)
|
||||
}
|
||||
t.Status = "src object is dir, added all transfer tasks of files"
|
||||
return nil
|
||||
}
|
||||
return transferStdFile(t)
|
||||
}
|
||||
|
||||
func transferStdFile(t *TransferTask) error {
|
||||
rc, err := os.Open(t.SrcObjPath)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to open file %s", t.SrcObjPath)
|
||||
}
|
||||
info, err := rc.Stat()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to get file %s", t.SrcObjPath)
|
||||
}
|
||||
mimetype := utils.GetMimeType(t.SrcObjPath)
|
||||
s := &stream.FileStream{
|
||||
Ctx: nil,
|
||||
Obj: &model.Object{
|
||||
Name: filepath.Base(t.SrcObjPath),
|
||||
Size: info.Size(),
|
||||
Modified: info.ModTime(),
|
||||
IsFolder: false,
|
||||
},
|
||||
Reader: rc,
|
||||
Mimetype: mimetype,
|
||||
Closers: utils.NewClosers(rc),
|
||||
}
|
||||
t.SetTotalBytes(info.Size())
|
||||
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, s, t.SetProgress)
|
||||
}
|
||||
|
||||
func removeStdTemp(t *TransferTask) {
|
||||
info, err := os.Stat(t.SrcObjPath)
|
||||
if err != nil || info.IsDir() {
|
||||
return
|
||||
}
|
||||
if err := os.Remove(t.SrcObjPath); err != nil {
|
||||
log.Errorf("failed to delete temp file %s, error: %s", t.SrcObjPath, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func transferObj(ctx context.Context, tempDir, dstDirPath string, deletePolicy DeletePolicy) error {
|
||||
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(tempDir)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get src storage")
|
||||
}
|
||||
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get dst storage")
|
||||
}
|
||||
objs, err := op.List(ctx, srcStorage, srcObjActualPath, model.ListArgs{})
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed list src [%s] objs", tempDir)
|
||||
}
|
||||
taskCreator, _ := ctx.Value("user").(*model.User) // taskCreator is nil when convert failed
|
||||
for _, obj := range objs {
|
||||
t := &TransferTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: taskCreator,
|
||||
},
|
||||
SrcObjPath: stdpath.Join(srcObjActualPath, obj.GetName()),
|
||||
DstDirPath: dstDirActualPath,
|
||||
SrcStorage: srcStorage,
|
||||
DstStorage: dstStorage,
|
||||
SrcStorageMp: srcStorage.GetStorage().MountPath,
|
||||
DstStorageMp: dstStorage.GetStorage().MountPath,
|
||||
DeletePolicy: deletePolicy,
|
||||
}
|
||||
TransferTaskManager.Add(t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func transferObjPath(t *TransferTask) error {
|
||||
t.Status = "getting src object"
|
||||
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
|
||||
}
|
||||
if srcObj.IsDir() {
|
||||
t.Status = "src object is dir, listing objs"
|
||||
objs, err := op.List(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.ListArgs{})
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed list src [%s] objs", t.SrcObjPath)
|
||||
}
|
||||
for _, obj := range objs {
|
||||
if utils.IsCanceled(t.Ctx()) {
|
||||
return nil
|
||||
}
|
||||
srcObjPath := stdpath.Join(t.SrcObjPath, obj.GetName())
|
||||
dstObjPath := stdpath.Join(t.DstDirPath, srcObj.GetName())
|
||||
TransferTaskManager.Add(&TransferTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: t.Creator,
|
||||
},
|
||||
SrcObjPath: srcObjPath,
|
||||
DstDirPath: dstObjPath,
|
||||
SrcStorage: t.SrcStorage,
|
||||
DstStorage: t.DstStorage,
|
||||
SrcStorageMp: t.SrcStorageMp,
|
||||
DstStorageMp: t.DstStorageMp,
|
||||
DeletePolicy: t.DeletePolicy,
|
||||
})
|
||||
}
|
||||
t.Status = "src object is dir, added all transfer tasks of objs"
|
||||
return nil
|
||||
}
|
||||
return transferObjFile(t)
|
||||
}
|
||||
|
||||
func transferObjFile(t *TransferTask) error {
|
||||
srcFile, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get src [%s] file", t.SrcObjPath)
|
||||
}
|
||||
link, _, err := op.Link(t.Ctx(), t.SrcStorage, t.SrcObjPath, model.LinkArgs{
|
||||
Header: http.Header{},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get [%s] link", t.SrcObjPath)
|
||||
}
|
||||
fs := stream.FileStream{
|
||||
Obj: srcFile,
|
||||
Ctx: t.Ctx(),
|
||||
}
|
||||
// any link provided is seekable
|
||||
ss, err := stream.NewSeekableStream(fs, link)
|
||||
if err != nil {
|
||||
return errors.WithMessagef(err, "failed get [%s] stream", t.SrcObjPath)
|
||||
}
|
||||
t.SetTotalBytes(srcFile.GetSize())
|
||||
return op.Put(t.Ctx(), t.DstStorage, t.DstDirPath, ss, t.SetProgress)
|
||||
}
|
||||
|
||||
func removeObjTemp(t *TransferTask) {
|
||||
srcObj, err := op.Get(t.Ctx(), t.SrcStorage, t.SrcObjPath)
|
||||
if err != nil || srcObj.IsDir() {
|
||||
return
|
||||
}
|
||||
if err := op.Remove(t.Ctx(), t.SrcStorage, t.SrcObjPath); err != nil {
|
||||
log.Errorf("failed to delete temp obj %s, error: %s", t.SrcObjPath, err.Error())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user