feat: refactor task module

This commit is contained in:
Andy Hsu
2023-11-20 18:01:51 +08:00
parent de9647a5fa
commit 11a30c5044
14 changed files with 405 additions and 377 deletions

View File

@ -2,21 +2,27 @@ package tool
import (
"context"
"fmt"
"path/filepath"
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/task"
"github.com/google/uuid"
"github.com/pkg/errors"
"path/filepath"
)
type DeletePolicy string
const (
DeleteOnUploadSucceed DeletePolicy = "delete_on_upload_succeed"
DeleteOnUploadFailed DeletePolicy = "delete_on_upload_failed"
DeleteNever DeletePolicy = "delete_never"
)
type AddURLArgs struct {
URL string
DstDirPath string
Tool string
URL string
DstDirPath string
Tool string
DeletePolicy DeletePolicy
}
func AddURL(ctx context.Context, args *AddURLArgs) error {
@ -56,29 +62,13 @@ func AddURL(ctx context.Context, args *AddURLArgs) error {
uid := uuid.NewString()
tempDir := filepath.Join(conf.Conf.TempDir, args.Tool, uid)
signal := make(chan int)
gid, err := tool.AddURL(&AddUrlArgs{
Url: args.URL,
UID: uid,
TempDir: tempDir,
Signal: signal,
})
if err != nil {
return errors.Wrapf(err, "[%s] failed to add uri %s", args.Tool, args.URL)
t := &DownloadTask{
Url: args.URL,
DstDirPath: args.DstDirPath,
TempDir: tempDir,
DeletePolicy: args.DeletePolicy,
tool: tool,
}
DownTaskManager.Submit(task.WithCancelCtx(&task.Task[string]{
ID: gid,
Name: fmt.Sprintf("download %s to [%s](%s)", args.URL, storage.GetStorage().MountPath, dstDirActualPath),
Func: func(tsk *task.Task[string]) error {
m := &Monitor{
tool: tool,
tsk: tsk,
tempDir: tempDir,
dstDirPath: args.DstDirPath,
signal: signal,
}
return m.Loop()
},
}))
DownloadTaskManager.Add(t)
return nil
}

View File

@ -17,13 +17,14 @@ type AddUrlArgs struct {
type Status struct {
Progress float64
NewTID string
NewGID string
Completed bool
Status string
Err error
}
type Tool interface {
Name() string
// Items return the setting items the tool need
Items() []model.SettingItem
Init() (string, error)
@ -31,20 +32,23 @@ type Tool interface {
// AddURL add an uri to download, return the task id
AddURL(args *AddUrlArgs) (string, error)
// Remove the download if task been canceled
Remove(tid string) error
Remove(task *DownloadTask) error
// Status return the status of the download task, if an error occurred, return the error in Status.Err
Status(tid string) (*Status, error)
Status(task *DownloadTask) (*Status, error)
}
type GetFileser interface {
// GetFiles return the files of the download task, if nil, means walk the temp dir to get the files
GetFiles(tid string) []File
GetFiles(task *DownloadTask) []File
}
type File struct {
// ReadCloser for http client
io.ReadCloser
Name string
Size int64
Path string
Modified time.Time
ReadCloser io.ReadCloser
Name string
Size int64
Path string
Modified time.Time
}
func (f *File) GetReadCloser() (io.ReadCloser, error) {

View File

@ -0,0 +1,147 @@
package tool
import (
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/xhofe/tache"
"sync"
"time"
)
type DownloadTask struct {
tache.Base
Url string `json:"url"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"`
Status string `json:"status"`
Signal chan int `json:"-"`
GID string `json:"-"`
finish chan struct{}
tool Tool
callStatusRetried int
}
func (t *DownloadTask) Run() error {
t.Signal = make(chan int)
t.finish = make(chan struct{})
defer func() {
t.Signal = nil
t.finish = nil
}()
gid, err := t.tool.AddURL(&AddUrlArgs{
Url: t.Url,
UID: t.ID,
TempDir: t.TempDir,
Signal: t.Signal,
})
if err != nil {
return err
}
t.GID = gid
var (
ok bool
)
outer:
for {
select {
case <-t.CtxDone():
err := t.tool.Remove(t)
return err
case <-t.Signal:
ok, err = t.Update()
if ok {
break outer
}
case <-time.After(time.Second * 3):
ok, err = t.Update()
if ok {
break outer
}
}
}
if err != nil {
return err
}
t.Status = "aria2 download completed, maybe transferring"
t.finish <- struct{}{}
t.Status = "offline download completed"
return nil
}
// Update download status, return true if download completed
func (t *DownloadTask) Update() (bool, error) {
info, err := t.tool.Status(t)
if err != nil {
t.callStatusRetried++
log.Errorf("failed to get status of %s, retried %d times", t.ID, t.callStatusRetried)
return false, nil
}
if t.callStatusRetried > 5 {
return true, errors.Errorf("failed to get status of %s, retried %d times", t.ID, t.callStatusRetried)
}
t.callStatusRetried = 0
t.SetProgress(info.Progress)
t.Status = fmt.Sprintf("[%s]: %s", t.tool.Name(), info.Status)
if info.NewGID != "" {
log.Debugf("followen by: %+v", info.NewGID)
t.GID = info.NewGID
return false, nil
}
// if download completed
if info.Completed {
err := t.Complete()
return true, errors.WithMessage(err, "failed to transfer file")
}
// if download failed
if info.Err != nil {
return true, errors.Errorf("failed to download %s, error: %s", t.ID, info.Err.Error())
}
return false, nil
}
func (t *DownloadTask) Complete() error {
var (
files []File
err error
)
if getFileser, ok := t.tool.(GetFileser); ok {
files = getFileser.GetFiles(t)
} else {
files, err = GetFiles(t.TempDir)
if err != nil {
return errors.Wrapf(err, "failed to get files")
}
}
// upload files
var wg sync.WaitGroup
wg.Add(len(files))
go func() {
wg.Wait()
t.finish <- struct{}{}
}()
for i, _ := range files {
file := files[i]
TransferTaskManager.Add(&TransferTask{
file: file,
dstDirPath: t.DstDirPath,
wg: &wg,
tempDir: t.TempDir,
})
}
return nil
}
func (t *DownloadTask) GetName() string {
return fmt.Sprintf("download %s to (%s)", t.Url, t.DstDirPath)
}
func (t *DownloadTask) GetStatus() string {
return t.Status
}
var (
DownloadTaskManager *tache.Manager[*DownloadTask] = tache.NewManager[*DownloadTask]()
)

View File

@ -1,159 +0,0 @@
package tool
import (
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/task"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
type Monitor struct {
tool Tool
tsk *task.Task[string]
tempDir string
retried int
dstDirPath string
finish chan struct{}
signal chan int
}
func (m *Monitor) Loop() error {
m.finish = make(chan struct{})
var (
err error
ok bool
)
outer:
for {
select {
case <-m.tsk.Ctx.Done():
err := m.tool.Remove(m.tsk.ID)
return err
case <-m.signal:
ok, err = m.Update()
if ok {
break outer
}
case <-time.After(time.Second * 2):
ok, err = m.Update()
if ok {
break outer
}
}
}
if err != nil {
return err
}
m.tsk.SetStatus("aria2 download completed, transferring")
<-m.finish
m.tsk.SetStatus("completed")
return nil
}
// Update download status, return true if download completed
func (m *Monitor) Update() (bool, error) {
info, err := m.tool.Status(m.tsk.ID)
if err != nil {
m.retried++
log.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried)
return false, nil
}
if m.retried > 5 {
return true, errors.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried)
}
m.retried = 0
m.tsk.SetProgress(info.Progress)
m.tsk.SetStatus("tool: " + info.Status)
if info.NewTID != "" {
log.Debugf("followen by: %+v", info.NewTID)
DownTaskManager.RawTasks().Delete(m.tsk.ID)
m.tsk.ID = info.NewTID
DownTaskManager.RawTasks().Store(m.tsk.ID, m.tsk)
return false, nil
}
// if download completed
if info.Completed {
err := m.Complete()
return true, errors.WithMessage(err, "failed to transfer file")
}
// if download failed
if info.Err != nil {
return true, errors.Errorf("failed to download %s, error: %s", m.tsk.ID, info.Err.Error())
}
return false, nil
}
var TransferTaskManager = task.NewTaskManager(3, func(k *uint64) {
atomic.AddUint64(k, 1)
})
func (m *Monitor) Complete() error {
// check dstDir again
storage, dstDirActualPath, err := op.GetStorageAndActualPath(m.dstDirPath)
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
var files []File
if f := m.tool.GetFiles(m.tsk.ID); f != nil {
files = f
} else {
files, err = GetFiles(m.tempDir)
if err != nil {
return errors.Wrapf(err, "failed to get files")
}
}
// upload files
var wg sync.WaitGroup
wg.Add(len(files))
go func() {
wg.Wait()
err := os.RemoveAll(m.tempDir)
m.finish <- struct{}{}
if err != nil {
log.Errorf("failed to remove aria2 temp dir: %+v", err.Error())
}
}()
for i, _ := range files {
file := files[i]
TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
Name: fmt.Sprintf("transfer %s to [%s](%s)", file.Path, storage.GetStorage().MountPath, dstDirActualPath),
Func: func(tsk *task.Task[uint64]) error {
defer wg.Done()
mimetype := utils.GetMimeType(file.Path)
rc, err := file.GetReadCloser()
if err != nil {
return errors.Wrapf(err, "failed to open file %s", file.Path)
}
s := &stream.FileStream{
Ctx: nil,
Obj: &model.Object{
Name: filepath.Base(file.Path),
Size: file.Size,
Modified: file.Modified,
IsFolder: false,
},
Reader: rc,
Mimetype: mimetype,
Closers: utils.NewClosers(rc),
}
relDir, err := filepath.Rel(m.tempDir, filepath.Dir(file.Path))
if err != nil {
log.Errorf("find relation directory error: %v", err)
}
newDistDir := filepath.Join(dstDirActualPath, relDir)
return op.Put(tsk.Ctx, storage, newDistDir, s, tsk.SetProgress)
},
}))
}
return nil
}

View File

@ -2,14 +2,11 @@ package tool
import (
"fmt"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/task"
)
var (
Tools = make(ToolsManager)
DownTaskManager = task.NewTaskManager[string](3)
Tools = make(ToolsManager)
)
type ToolsManager map[string]Tool
@ -21,8 +18,8 @@ func (t ToolsManager) Get(name string) (Tool, error) {
return nil, fmt.Errorf("tool %s not found", name)
}
func (t ToolsManager) Add(name string, tool Tool) {
t[name] = tool
func (t ToolsManager) Add(tool Tool) {
t[tool.Name()] = tool
}
func (t ToolsManager) Names() []string {

View File

@ -0,0 +1,66 @@
package tool
import (
"fmt"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/internal/stream"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/xhofe/tache"
"path/filepath"
"sync"
)
type TransferTask struct {
tache.Base
file File
dstDirPath string
wg *sync.WaitGroup
tempDir string
}
func (t *TransferTask) Run() error {
defer t.wg.Done()
// check dstDir again
storage, dstDirActualPath, err := op.GetStorageAndActualPath(t.dstDirPath)
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
mimetype := utils.GetMimeType(t.file.Path)
rc, err := t.file.GetReadCloser()
if err != nil {
return errors.Wrapf(err, "failed to open file %s", t.file.Path)
}
s := &stream.FileStream{
Ctx: nil,
Obj: &model.Object{
Name: filepath.Base(t.file.Path),
Size: t.file.Size,
Modified: t.file.Modified,
IsFolder: false,
},
Reader: rc,
Mimetype: mimetype,
Closers: utils.NewClosers(rc),
}
relDir, err := filepath.Rel(t.tempDir, filepath.Dir(t.file.Path))
if err != nil {
log.Errorf("find relation directory error: %v", err)
}
newDistDir := filepath.Join(dstDirActualPath, relDir)
return op.Put(t.Ctx(), storage, newDistDir, s, t.SetProgress)
}
func (t *TransferTask) GetName() string {
return fmt.Sprintf("transfer %s to [%s]", t.file.Path, t.dstDirPath)
}
func (t *TransferTask) GetStatus() string {
return "transferring"
}
var (
TransferTaskManager *tache.Manager[*TransferTask] = tache.NewManager[*TransferTask]()
)