feat(archive): archive manage (#7817)
* feat(archive): archive management * fix(ftp-server): remove duplicate ReadAtSeeker realization * fix(archive): bad seeking of SeekableStream * fix(archive): split internal and driver extraction api * feat(archive): patch * fix(shutdown): clear decompress upload tasks * chore * feat(archive): support .iso format * chore
This commit is contained in:
395
internal/fs/archive.go
Normal file
395
internal/fs/archive.go
Normal file
@ -0,0 +1,395 @@
|
||||
package fs
|
||||
|
||||
import (
|
||||
"context"
|
||||
stderrors "errors"
|
||||
"fmt"
|
||||
"github.com/alist-org/alist/v3/internal/archive/tool"
|
||||
"github.com/alist-org/alist/v3/internal/conf"
|
||||
"github.com/alist-org/alist/v3/internal/driver"
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
"github.com/alist-org/alist/v3/internal/task"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/xhofe/tache"
|
||||
"io"
|
||||
"math/rand"
|
||||
"mime"
|
||||
"net/http"
|
||||
"os"
|
||||
stdpath "path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ArchiveDownloadTask struct {
|
||||
task.TaskExtension
|
||||
model.ArchiveDecompressArgs
|
||||
status string
|
||||
SrcObjPath string
|
||||
DstDirPath string
|
||||
srcStorage driver.Driver
|
||||
dstStorage driver.Driver
|
||||
SrcStorageMp string
|
||||
DstStorageMp string
|
||||
Tool tool.Tool
|
||||
}
|
||||
|
||||
func (t *ArchiveDownloadTask) GetName() string {
|
||||
return fmt.Sprintf("decompress [%s](%s)[%s] to [%s](%s) with password <%s>", t.SrcStorageMp, t.SrcObjPath,
|
||||
t.InnerPath, t.DstStorageMp, t.DstDirPath, t.Password)
|
||||
}
|
||||
|
||||
func (t *ArchiveDownloadTask) GetStatus() string {
|
||||
return t.status
|
||||
}
|
||||
|
||||
func (t *ArchiveDownloadTask) Run() error {
|
||||
t.ClearEndTime()
|
||||
t.SetStartTime(time.Now())
|
||||
defer func() { t.SetEndTime(time.Now()) }()
|
||||
uploadTask, err := t.RunWithoutPushUploadTask()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ArchiveContentUploadTaskManager.Add(uploadTask)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *ArchiveDownloadTask) RunWithoutPushUploadTask() (*ArchiveContentUploadTask, error) {
|
||||
var err error
|
||||
if t.srcStorage == nil {
|
||||
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
|
||||
}
|
||||
l, srcObj, err := op.Link(t.Ctx(), t.srcStorage, t.SrcObjPath, model.LinkArgs{
|
||||
Header: http.Header{},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs := stream.FileStream{
|
||||
Obj: srcObj,
|
||||
Ctx: t.Ctx(),
|
||||
}
|
||||
ss, err := stream.NewSeekableStream(fs, l)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
if err := ss.Close(); err != nil {
|
||||
log.Errorf("failed to close file streamer, %v", err)
|
||||
}
|
||||
}()
|
||||
var decompressUp model.UpdateProgress
|
||||
if t.CacheFull {
|
||||
t.SetTotalBytes(srcObj.GetSize())
|
||||
t.status = "getting src object"
|
||||
_, err = ss.CacheFullInTempFileAndUpdateProgress(t.SetProgress)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
decompressUp = func(_ float64) {}
|
||||
} else {
|
||||
decompressUp = t.SetProgress
|
||||
}
|
||||
t.status = "walking and decompressing"
|
||||
dir, err := os.MkdirTemp(conf.Conf.TempDir, "dir-*")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = t.Tool.Decompress(ss, dir, t.ArchiveInnerArgs, decompressUp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
baseName := strings.TrimSuffix(srcObj.GetName(), stdpath.Ext(srcObj.GetName()))
|
||||
uploadTask := &ArchiveContentUploadTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: t.GetCreator(),
|
||||
},
|
||||
ObjName: baseName,
|
||||
InPlace: !t.PutIntoNewDir,
|
||||
FilePath: dir,
|
||||
DstDirPath: t.DstDirPath,
|
||||
dstStorage: t.dstStorage,
|
||||
DstStorageMp: t.DstStorageMp,
|
||||
}
|
||||
return uploadTask, nil
|
||||
}
|
||||
|
||||
var ArchiveDownloadTaskManager *tache.Manager[*ArchiveDownloadTask]
|
||||
|
||||
type ArchiveContentUploadTask struct {
|
||||
task.TaskExtension
|
||||
status string
|
||||
ObjName string
|
||||
InPlace bool
|
||||
FilePath string
|
||||
DstDirPath string
|
||||
dstStorage driver.Driver
|
||||
DstStorageMp string
|
||||
finalized bool
|
||||
}
|
||||
|
||||
func (t *ArchiveContentUploadTask) GetName() string {
|
||||
return fmt.Sprintf("upload %s to [%s](%s)", t.ObjName, t.DstStorageMp, t.DstDirPath)
|
||||
}
|
||||
|
||||
func (t *ArchiveContentUploadTask) GetStatus() string {
|
||||
return t.status
|
||||
}
|
||||
|
||||
func (t *ArchiveContentUploadTask) Run() error {
|
||||
t.ClearEndTime()
|
||||
t.SetStartTime(time.Now())
|
||||
defer func() { t.SetEndTime(time.Now()) }()
|
||||
return t.RunWithNextTaskCallback(func(nextTsk *ArchiveContentUploadTask) error {
|
||||
ArchiveContentUploadTaskManager.Add(nextTsk)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (t *ArchiveContentUploadTask) RunWithNextTaskCallback(f func(nextTsk *ArchiveContentUploadTask) error) error {
|
||||
var err error
|
||||
if t.dstStorage == nil {
|
||||
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
|
||||
}
|
||||
info, err := os.Stat(t.FilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.IsDir() {
|
||||
t.status = "src object is dir, listing objs"
|
||||
nextDstPath := t.DstDirPath
|
||||
if !t.InPlace {
|
||||
nextDstPath = stdpath.Join(nextDstPath, t.ObjName)
|
||||
err = op.MakeDir(t.Ctx(), t.dstStorage, nextDstPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
entries, err := os.ReadDir(t.FilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var es error
|
||||
for _, entry := range entries {
|
||||
var nextFilePath string
|
||||
if entry.IsDir() {
|
||||
nextFilePath, err = moveToTempPath(stdpath.Join(t.FilePath, entry.Name()), "dir-")
|
||||
} else {
|
||||
nextFilePath, err = moveToTempPath(stdpath.Join(t.FilePath, entry.Name()), "file-")
|
||||
}
|
||||
if err != nil {
|
||||
es = stderrors.Join(es, err)
|
||||
continue
|
||||
}
|
||||
err = f(&ArchiveContentUploadTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: t.GetCreator(),
|
||||
},
|
||||
ObjName: entry.Name(),
|
||||
InPlace: false,
|
||||
FilePath: nextFilePath,
|
||||
DstDirPath: nextDstPath,
|
||||
dstStorage: t.dstStorage,
|
||||
DstStorageMp: t.DstStorageMp,
|
||||
})
|
||||
if err != nil {
|
||||
es = stderrors.Join(es, err)
|
||||
}
|
||||
}
|
||||
if es != nil {
|
||||
return es
|
||||
}
|
||||
} else {
|
||||
t.SetTotalBytes(info.Size())
|
||||
file, err := os.Open(t.FilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fs := &stream.FileStream{
|
||||
Obj: &model.Object{
|
||||
Name: t.ObjName,
|
||||
Size: info.Size(),
|
||||
Modified: time.Now(),
|
||||
},
|
||||
Mimetype: mime.TypeByExtension(filepath.Ext(t.ObjName)),
|
||||
WebPutAsTask: true,
|
||||
Reader: file,
|
||||
}
|
||||
fs.Closers.Add(file)
|
||||
t.status = "uploading"
|
||||
err = op.Put(t.Ctx(), t.dstStorage, t.DstDirPath, fs, t.SetProgress, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
t.deleteSrcFile()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *ArchiveContentUploadTask) Cancel() {
|
||||
t.TaskExtension.Cancel()
|
||||
t.deleteSrcFile()
|
||||
}
|
||||
|
||||
func (t *ArchiveContentUploadTask) deleteSrcFile() {
|
||||
if !t.finalized {
|
||||
_ = os.RemoveAll(t.FilePath)
|
||||
t.finalized = true
|
||||
}
|
||||
}
|
||||
|
||||
func moveToTempPath(path, prefix string) (string, error) {
|
||||
newPath, err := genTempFileName(prefix)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
err = os.Rename(path, newPath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return newPath, nil
|
||||
}
|
||||
|
||||
func genTempFileName(prefix string) (string, error) {
|
||||
retry := 0
|
||||
for retry < 10000 {
|
||||
newPath := stdpath.Join(conf.Conf.TempDir, prefix+strconv.FormatUint(uint64(rand.Uint32()), 10))
|
||||
if _, err := os.Stat(newPath); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return newPath, nil
|
||||
} else {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
retry++
|
||||
}
|
||||
return "", errors.New("failed to generate temp-file name: too many retries")
|
||||
}
|
||||
|
||||
type archiveContentUploadTaskManagerType struct {
|
||||
*tache.Manager[*ArchiveContentUploadTask]
|
||||
}
|
||||
|
||||
func (m *archiveContentUploadTaskManagerType) Remove(id string) {
|
||||
if t, ok := m.GetByID(id); ok {
|
||||
t.deleteSrcFile()
|
||||
m.Manager.Remove(id)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *archiveContentUploadTaskManagerType) RemoveAll() {
|
||||
tasks := m.GetAll()
|
||||
for _, t := range tasks {
|
||||
m.Remove(t.GetID())
|
||||
}
|
||||
}
|
||||
|
||||
func (m *archiveContentUploadTaskManagerType) RemoveByState(state ...tache.State) {
|
||||
tasks := m.GetByState(state...)
|
||||
for _, t := range tasks {
|
||||
m.Remove(t.GetID())
|
||||
}
|
||||
}
|
||||
|
||||
func (m *archiveContentUploadTaskManagerType) RemoveByCondition(condition func(task *ArchiveContentUploadTask) bool) {
|
||||
tasks := m.GetByCondition(condition)
|
||||
for _, t := range tasks {
|
||||
m.Remove(t.GetID())
|
||||
}
|
||||
}
|
||||
|
||||
var ArchiveContentUploadTaskManager = &archiveContentUploadTaskManagerType{
|
||||
Manager: nil,
|
||||
}
|
||||
|
||||
func archiveMeta(ctx context.Context, path string, args model.ArchiveMetaArgs) (*model.ArchiveMetaProvider, error) {
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(path)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
return op.GetArchiveMeta(ctx, storage, actualPath, args)
|
||||
}
|
||||
|
||||
func archiveList(ctx context.Context, path string, args model.ArchiveListArgs) ([]model.Obj, error) {
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(path)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
return op.ListArchive(ctx, storage, actualPath, args)
|
||||
}
|
||||
|
||||
func archiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args model.ArchiveDecompressArgs, lazyCache ...bool) (task.TaskExtensionInfo, error) {
|
||||
srcStorage, srcObjActualPath, err := op.GetStorageAndActualPath(srcObjPath)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed get src storage")
|
||||
}
|
||||
dstStorage, dstDirActualPath, err := op.GetStorageAndActualPath(dstDirPath)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed get dst storage")
|
||||
}
|
||||
if srcStorage.GetStorage() == dstStorage.GetStorage() {
|
||||
err = op.ArchiveDecompress(ctx, srcStorage, srcObjActualPath, dstDirActualPath, args, lazyCache...)
|
||||
if !errors.Is(err, errs.NotImplement) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
ext := stdpath.Ext(srcObjActualPath)
|
||||
t, err := tool.GetArchiveTool(ext)
|
||||
if err != nil {
|
||||
return nil, errors.WithMessagef(err, "failed get [%s] archive tool", ext)
|
||||
}
|
||||
taskCreator, _ := ctx.Value("user").(*model.User)
|
||||
tsk := &ArchiveDownloadTask{
|
||||
TaskExtension: task.TaskExtension{
|
||||
Creator: taskCreator,
|
||||
},
|
||||
ArchiveDecompressArgs: args,
|
||||
srcStorage: srcStorage,
|
||||
dstStorage: dstStorage,
|
||||
SrcObjPath: srcObjActualPath,
|
||||
DstDirPath: dstDirActualPath,
|
||||
SrcStorageMp: srcStorage.GetStorage().MountPath,
|
||||
DstStorageMp: dstStorage.GetStorage().MountPath,
|
||||
Tool: t,
|
||||
}
|
||||
if ctx.Value(conf.NoTaskKey) != nil {
|
||||
uploadTask, err := tsk.RunWithoutPushUploadTask()
|
||||
if err != nil {
|
||||
return nil, errors.WithMessagef(err, "failed download [%s]", srcObjPath)
|
||||
}
|
||||
defer uploadTask.deleteSrcFile()
|
||||
var callback func(t *ArchiveContentUploadTask) error
|
||||
callback = func(t *ArchiveContentUploadTask) error {
|
||||
e := t.RunWithNextTaskCallback(callback)
|
||||
t.deleteSrcFile()
|
||||
return e
|
||||
}
|
||||
return nil, uploadTask.RunWithNextTaskCallback(callback)
|
||||
} else {
|
||||
ArchiveDownloadTaskManager.Add(tsk)
|
||||
return tsk, nil
|
||||
}
|
||||
}
|
||||
|
||||
func archiveDriverExtract(ctx context.Context, path string, args model.ArchiveInnerArgs) (*model.Link, model.Obj, error) {
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(path)
|
||||
if err != nil {
|
||||
return nil, nil, errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
return op.DriverExtract(ctx, storage, actualPath, args)
|
||||
}
|
||||
|
||||
func archiveInternalExtract(ctx context.Context, path string, args model.ArchiveInnerArgs) (io.ReadCloser, int64, error) {
|
||||
storage, actualPath, err := op.GetStorageAndActualPath(path)
|
||||
if err != nil {
|
||||
return nil, 0, errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
return op.InternalExtract(ctx, storage, actualPath, args)
|
||||
}
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/task"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"io"
|
||||
)
|
||||
|
||||
// the param named path of functions in this package is a mount path
|
||||
@ -109,6 +110,46 @@ func PutAsTask(ctx context.Context, dstDirPath string, file model.FileStreamer)
|
||||
return t, err
|
||||
}
|
||||
|
||||
func ArchiveMeta(ctx context.Context, path string, args model.ArchiveMetaArgs) (*model.ArchiveMetaProvider, error) {
|
||||
meta, err := archiveMeta(ctx, path, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed get archive meta %s: %+v", path, err)
|
||||
}
|
||||
return meta, err
|
||||
}
|
||||
|
||||
func ArchiveList(ctx context.Context, path string, args model.ArchiveListArgs) ([]model.Obj, error) {
|
||||
objs, err := archiveList(ctx, path, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed list archive [%s]%s: %+v", path, args.InnerPath, err)
|
||||
}
|
||||
return objs, err
|
||||
}
|
||||
|
||||
func ArchiveDecompress(ctx context.Context, srcObjPath, dstDirPath string, args model.ArchiveDecompressArgs, lazyCache ...bool) (task.TaskExtensionInfo, error) {
|
||||
t, err := archiveDecompress(ctx, srcObjPath, dstDirPath, args, lazyCache...)
|
||||
if err != nil {
|
||||
log.Errorf("failed decompress [%s]%s: %+v", srcObjPath, args.InnerPath, err)
|
||||
}
|
||||
return t, err
|
||||
}
|
||||
|
||||
func ArchiveDriverExtract(ctx context.Context, path string, args model.ArchiveInnerArgs) (*model.Link, model.Obj, error) {
|
||||
l, obj, err := archiveDriverExtract(ctx, path, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed extract [%s]%s: %+v", path, args.InnerPath, err)
|
||||
}
|
||||
return l, obj, err
|
||||
}
|
||||
|
||||
func ArchiveInternalExtract(ctx context.Context, path string, args model.ArchiveInnerArgs) (io.ReadCloser, int64, error) {
|
||||
l, obj, err := archiveInternalExtract(ctx, path, args)
|
||||
if err != nil {
|
||||
log.Errorf("failed extract [%s]%s: %+v", path, args.InnerPath, err)
|
||||
}
|
||||
return l, obj, err
|
||||
}
|
||||
|
||||
type GetStoragesArgs struct {
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user