From adf0178bb76cf1b366706fb75c2d67a53dbaf7d5 Mon Sep 17 00:00:00 2001 From: Noah Hsu Date: Sat, 18 Jun 2022 20:06:45 +0800 Subject: [PATCH] feat: add progress for task --- internal/driver/driver.go | 2 +- internal/fs/copy.go | 32 +++++++++++++++----------------- pkg/task/manager.go | 1 + pkg/task/task.go | 19 ++++++++++++------- 4 files changed, 29 insertions(+), 25 deletions(-) diff --git a/internal/driver/driver.go b/internal/driver/driver.go index dc69bb09..f794c4ea 100644 --- a/internal/driver/driver.go +++ b/internal/driver/driver.go @@ -53,4 +53,4 @@ type Writer interface { Put(ctx context.Context, parentDir model.Obj, stream model.FileStreamer, up UpdateProgress) error } -type UpdateProgress func(percentage float64) +type UpdateProgress func(percentage int) diff --git a/internal/fs/copy.go b/internal/fs/copy.go index 39d31198..504eaf59 100644 --- a/internal/fs/copy.go +++ b/internal/fs/copy.go @@ -35,53 +35,51 @@ func Copy(ctx context.Context, account driver.Driver, srcPath, dstPath string) ( CopyTaskManager.Add( fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcAccount.GetAccount().VirtualPath, srcActualPath, dstAccount.GetAccount().VirtualPath, dstActualPath), func(task *task.Task) error { - return CopyBetween2Accounts(task.Ctx, srcAccount, dstAccount, srcActualPath, dstActualPath, task.SetStatus) + return CopyBetween2Accounts(task, srcAccount, dstAccount, srcActualPath, dstActualPath) }) return true, nil } -func CopyBetween2Accounts(ctx context.Context, srcAccount, dstAccount driver.Driver, srcPath, dstPath string, setStatus func(status string)) error { - setStatus("getting src object") - srcObj, err := operations.Get(ctx, srcAccount, srcPath) +func CopyBetween2Accounts(t *task.Task, srcAccount, dstAccount driver.Driver, srcPath, dstPath string) error { + t.SetStatus("getting src object") + srcObj, err := operations.Get(t.Ctx, srcAccount, srcPath) if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", srcPath) } if srcObj.IsDir() { - setStatus("src object is dir, listing objs") - objs, err := operations.List(ctx, srcAccount, srcPath) + t.SetStatus("src object is dir, listing objs") + objs, err := operations.List(t.Ctx, srcAccount, srcPath) if err != nil { return errors.WithMessagef(err, "failed list src [%s] objs", srcPath) } for _, obj := range objs { - if utils.IsCanceled(ctx) { + if utils.IsCanceled(t.Ctx) { return nil } srcObjPath := stdpath.Join(srcPath, obj.GetName()) dstObjPath := stdpath.Join(dstPath, obj.GetName()) CopyTaskManager.Add( fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcAccount.GetAccount().VirtualPath, srcObjPath, dstAccount.GetAccount().VirtualPath, dstObjPath), - func(task *task.Task) error { - return CopyBetween2Accounts(ctx, srcAccount, dstAccount, srcObjPath, dstObjPath, task.SetStatus) + func(t *task.Task) error { + return CopyBetween2Accounts(t, srcAccount, dstAccount, srcObjPath, dstObjPath) }) } } else { CopyTaskManager.Add( fmt.Sprintf("copy [%s](%s) to [%s](%s)", srcAccount.GetAccount().VirtualPath, srcPath, dstAccount.GetAccount().VirtualPath, dstPath), - func(task *task.Task) error { - return CopyFileBetween2Accounts(task.Ctx, srcAccount, dstAccount, srcPath, dstPath, func(percentage float64) { - task.SetStatus(fmt.Sprintf("uploading: %2.f%%", percentage)) - }) + func(t *task.Task) error { + return CopyFileBetween2Accounts(t, srcAccount, dstAccount, srcPath, dstPath) }) } return nil } -func CopyFileBetween2Accounts(ctx context.Context, srcAccount, dstAccount driver.Driver, srcPath, dstPath string, up driver.UpdateProgress) error { - srcFile, err := operations.Get(ctx, srcAccount, srcPath) +func CopyFileBetween2Accounts(t *task.Task, srcAccount, dstAccount driver.Driver, srcPath, dstPath string) error { + srcFile, err := operations.Get(t.Ctx, srcAccount, srcPath) if err != nil { return errors.WithMessagef(err, "failed get src [%s] file", srcPath) } - link, err := operations.Link(ctx, srcAccount, srcPath, model.LinkArgs{}) + link, err := operations.Link(t.Ctx, srcAccount, srcPath, model.LinkArgs{}) if err != nil { return errors.WithMessagef(err, "failed get [%s] link", srcPath) } @@ -89,5 +87,5 @@ func CopyFileBetween2Accounts(ctx context.Context, srcAccount, dstAccount driver if err != nil { return errors.WithMessagef(err, "failed get [%s] stream", srcPath) } - return operations.Put(ctx, dstAccount, dstPath, stream, up) + return operations.Put(t.Ctx, dstAccount, dstPath, stream, t.SetProgress) } diff --git a/pkg/task/manager.go b/pkg/task/manager.go index 4dfd65ca..de702737 100644 --- a/pkg/task/manager.go +++ b/pkg/task/manager.go @@ -8,6 +8,7 @@ import ( ) type Manager struct { + works uint curID uint64 tasks generic_sync.MapOf[uint64, *Task] } diff --git a/pkg/task/task.go b/pkg/task/task.go index ec9ea88e..76d9f009 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -17,13 +17,14 @@ var ( type Func func(task *Task) error type Task struct { - ID uint64 - Name string - Status string - Error error - Func Func - Ctx context.Context - cancel context.CancelFunc + ID uint64 + Name string + Status string + Error error + Func Func + Progress int + Ctx context.Context + cancel context.CancelFunc } func newTask(name string, func_ Func) *Task { @@ -41,6 +42,10 @@ func (t *Task) SetStatus(status string) { t.Status = status } +func (t *Task) SetProgress(percentage int) { + t.Progress = percentage +} + func (t *Task) Run() { t.Status = RUNNING t.Error = t.Func(t)