diff --git a/internal/bootstrap/data/setting.go b/internal/bootstrap/data/setting.go index ca17b6b1..70e237c0 100644 --- a/internal/bootstrap/data/setting.go +++ b/internal/bootstrap/data/setting.go @@ -141,10 +141,6 @@ func InitialSettings() []model.SettingItem { {Key: conf.ForwardDirectLinkParams, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL}, {Key: conf.WebauthnLoginEnabled, Value: "false", Type: conf.TypeBool, Group: model.GLOBAL, Flag: model.PUBLIC}, - // aria2 settings - {Key: conf.Aria2Uri, Value: "http://localhost:6800/jsonrpc", Type: conf.TypeString, Group: model.ARIA2, Flag: model.PRIVATE}, - {Key: conf.Aria2Secret, Value: "", Type: conf.TypeString, Group: model.ARIA2, Flag: model.PRIVATE}, - // single settings {Key: conf.Token, Value: token, Type: conf.TypeString, Group: model.SINGLE, Flag: model.PRIVATE}, {Key: conf.SearchIndex, Value: "none", Type: conf.TypeSelect, Options: "database,database_non_full_text,bleve,none", Group: model.INDEX}, diff --git a/internal/model/setting.go b/internal/model/setting.go index f4202ee0..3b2c30f1 100644 --- a/internal/model/setting.go +++ b/internal/model/setting.go @@ -6,7 +6,7 @@ const ( STYLE PREVIEW GLOBAL - ARIA2 + OFFLINE_DOWNLOAD INDEX SSO ) diff --git a/internal/offline_download/add.go b/internal/offline_download/add.go new file mode 100644 index 00000000..67147f35 --- /dev/null +++ b/internal/offline_download/add.go @@ -0,0 +1,76 @@ +package offline_download + +import ( + "context" + "fmt" + "path/filepath" + + "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/errs" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/pkg/task" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +type AddURIArgs struct { + URI string + DstDirPath string + Tool string +} + +func AddURI(ctx context.Context, args *AddURIArgs) error { + // get tool + tool, err := Tools.Get(args.Tool) + if err != nil { + return errors.Wrapf(err, "failed get tool") + } + // check storage + storage, dstDirActualPath, err := op.GetStorageAndActualPath(args.DstDirPath) + if err != nil { + return errors.WithMessage(err, "failed get storage") + } + // check is it could upload + if storage.Config().NoUpload { + return errors.WithStack(errs.UploadNotSupported) + } + // check path is valid + obj, err := op.Get(ctx, storage, dstDirActualPath) + if err != nil { + if !errs.IsObjectNotFound(err) { + return errors.WithMessage(err, "failed get object") + } + } else { + if !obj.IsDir() { + // can't add to a file + return errors.WithStack(errs.NotFolder) + } + } + + uid := uuid.NewString() + tempDir := filepath.Join(conf.Conf.TempDir, args.Tool, uid) + signal := make(chan int) + gid, err := tool.AddURI(&AddUriArgs{ + Uri: args.URI, + UID: uid, + TempDir: tempDir, + Signal: signal, + }) + if err != nil { + return errors.Wrapf(err, "failed to add uri %s", args.URI) + } + DownTaskManager.Submit(task.WithCancelCtx(&task.Task[string]{ + ID: gid, + Name: fmt.Sprintf("download %s to [%s](%s)", args.URI, storage.GetStorage().MountPath, dstDirActualPath), + Func: func(tsk *task.Task[string]) error { + m := &Monitor{ + tsk: tsk, + tempDir: tempDir, + dstDirPath: args.DstDirPath, + signal: signal, + } + return m.Loop() + }, + })) + return nil +} diff --git a/internal/offline_download/all_test.go b/internal/offline_download/all_test.go new file mode 100644 index 00000000..e8b32e09 --- /dev/null +++ b/internal/offline_download/all_test.go @@ -0,0 +1,13 @@ +package offline_download + +import "testing" + +func TestGetFiles(t *testing.T) { + files, err := GetFiles("..") + if err != nil { + t.Fatal(err) + } + for _, file := range files { + t.Log(file.Name, file.Size, file.Path) + } +} diff --git a/internal/offline_download/aria2/aria2.go b/internal/offline_download/aria2/aria2.go new file mode 100644 index 00000000..947b890c --- /dev/null +++ b/internal/offline_download/aria2/aria2.go @@ -0,0 +1,73 @@ +package aria2 + +import ( + "context" + "fmt" + "time" + + "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/offline_download" + "github.com/alist-org/alist/v3/internal/setting" + "github.com/alist-org/alist/v3/pkg/aria2/rpc" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +var notify = NewNotify() + +type Aria2 struct { + client rpc.Client +} + +func (a *Aria2) Items() []model.SettingItem { + // aria2 settings + return []model.SettingItem{ + {Key: conf.Aria2Uri, Value: "http://localhost:6800/jsonrpc", Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE}, + {Key: conf.Aria2Secret, Value: "", Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE}, + } +} + +func (a *Aria2) Init() (string, error) { + a.client = nil + uri := setting.GetStr(conf.Aria2Uri) + secret := setting.GetStr(conf.Aria2Secret) + c, err := rpc.New(context.Background(), uri, secret, 4*time.Second, notify) + if err != nil { + return "", errors.Wrap(err, "failed to init aria2 client") + } + version, err := c.GetVersion() + if err != nil { + return "", errors.Wrapf(err, "failed get aria2 version") + } + a.client = c + log.Infof("using aria2 version: %s", version.Version) + return fmt.Sprintf("aria2 version: %s", version.Version), nil +} + +func (a *Aria2) IsReady() bool { + //TODO implement me + panic("implement me") +} + +func (a *Aria2) AddURI(args *offline_download.AddUriArgs) (string, error) { + //TODO implement me + panic("implement me") +} + +func (a *Aria2) Remove(tid string) error { + //TODO implement me + panic("implement me") +} + +func (a *Aria2) Status(tid string) (*offline_download.Status, error) { + //TODO implement me + panic("implement me") +} + +func (a *Aria2) GetFile(tid string) *offline_download.File { + //TODO implement me + panic("implement me") +} + +var _ offline_download.Tool = (*Aria2)(nil) diff --git a/internal/offline_download/aria2/notify.go b/internal/offline_download/aria2/notify.go new file mode 100644 index 00000000..056fe514 --- /dev/null +++ b/internal/offline_download/aria2/notify.go @@ -0,0 +1,70 @@ +package aria2 + +import ( + "github.com/alist-org/alist/v3/pkg/aria2/rpc" + "github.com/alist-org/alist/v3/pkg/generic_sync" +) + +const ( + Downloading = iota + Paused + Stopped + Completed + Errored +) + +type Notify struct { + Signals generic_sync.MapOf[string, chan int] +} + +func NewNotify() *Notify { + return &Notify{Signals: generic_sync.MapOf[string, chan int]{}} +} + +func (n *Notify) OnDownloadStart(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Downloading + } + } +} + +func (n *Notify) OnDownloadPause(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Paused + } + } +} + +func (n *Notify) OnDownloadStop(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Stopped + } + } +} + +func (n *Notify) OnDownloadComplete(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Completed + } + } +} + +func (n *Notify) OnDownloadError(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Errored + } + } +} + +func (n *Notify) OnBtDownloadComplete(events []rpc.Event) { + for _, e := range events { + if signal, ok := n.Signals.Load(e.Gid); ok { + signal <- Completed + } + } +} diff --git a/internal/offline_download/base.go b/internal/offline_download/base.go new file mode 100644 index 00000000..2276a6a5 --- /dev/null +++ b/internal/offline_download/base.go @@ -0,0 +1,56 @@ +package offline_download + +import ( + "io" + "os" + + "github.com/alist-org/alist/v3/internal/model" +) + +type AddUriArgs struct { + Uri string + UID string + TempDir string + Signal chan int +} + +type Status struct { + Progress int + NewTID string + Completed bool + Status string + Err error +} + +type Tool interface { + // Items return the setting items the tool need + Items() []model.SettingItem + Init() (string, error) + IsReady() bool + // AddURI add an uri to download, return the task id + AddURI(args *AddUriArgs) (string, error) + // Remove the task if an error occurred + Remove(tid string) error + // Status return the status of the download task, if an error occurred, return the error in Status.Err + Status(tid string) (*Status, error) + // GetFile return an io.ReadCloser as the download file, if nil, means walk the temp dir to get the files + GetFile(tid string) *File +} + +type File struct { + io.ReadCloser + Name string + Size int64 + Path string +} + +func (f *File) GetReadCloser() (io.ReadCloser, error) { + if f.ReadCloser != nil { + return f.ReadCloser, nil + } + file, err := os.Open(f.Path) + if err != nil { + return nil, err + } + return file, nil +} diff --git a/internal/offline_download/monitor.go b/internal/offline_download/monitor.go new file mode 100644 index 00000000..f446f268 --- /dev/null +++ b/internal/offline_download/monitor.go @@ -0,0 +1,157 @@ +package offline_download + +import ( + "fmt" + "os" + "path" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/alist-org/alist/v3/internal/model" + "github.com/alist-org/alist/v3/internal/op" + "github.com/alist-org/alist/v3/pkg/task" + "github.com/alist-org/alist/v3/pkg/utils" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +type Monitor struct { + tool Tool + tsk *task.Task[string] + tempDir string + retried int + dstDirPath string + finish chan struct{} + signal chan int +} + +func (m *Monitor) Loop() error { + m.finish = make(chan struct{}) + var ( + err error + ok bool + ) +outer: + for { + select { + case <-m.tsk.Ctx.Done(): + err := m.tool.Remove(m.tsk.ID) + return err + case <-m.signal: + ok, err = m.Update() + if ok { + break outer + } + case <-time.After(time.Second * 2): + ok, err = m.Update() + if ok { + break outer + } + } + } + if err != nil { + return err + } + m.tsk.SetStatus("aria2 download completed, transferring") + <-m.finish + m.tsk.SetStatus("completed") + return nil +} + +// Update download status, return true if download completed +func (m *Monitor) Update() (bool, error) { + info, err := m.tool.Status(m.tsk.ID) + if err != nil { + m.retried++ + log.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried) + return false, nil + } + if m.retried > 5 { + return true, errors.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried) + } + m.retried = 0 + m.tsk.SetProgress(info.Progress) + m.tsk.SetStatus("tool: " + info.Status) + if info.NewTID != "" { + log.Debugf("followen by: %+v", info.NewTID) + DownTaskManager.RawTasks().Delete(m.tsk.ID) + m.tsk.ID = info.NewTID + DownTaskManager.RawTasks().Store(m.tsk.ID, m.tsk) + return false, nil + } + // if download completed + if info.Completed { + err := m.Complete() + return true, errors.WithMessage(err, "failed to transfer file") + } + // if download failed + if info.Err != nil { + return true, errors.Errorf("failed to download %s, error: %s", m.tsk.ID, info.Err.Error()) + } + return false, nil +} + +var TransferTaskManager = task.NewTaskManager(3, func(k *uint64) { + atomic.AddUint64(k, 1) +}) + +func (m *Monitor) Complete() error { + // check dstDir again + storage, dstDirActualPath, err := op.GetStorageAndActualPath(m.dstDirPath) + if err != nil { + return errors.WithMessage(err, "failed get storage") + } + var files []*File + if f := m.tool.GetFile(m.tsk.ID); f != nil { + files = append(files, f) + } else { + files, err = GetFiles(m.tempDir) + if err != nil { + return errors.Wrapf(err, "failed to get files") + } + } + // upload files + var wg sync.WaitGroup + wg.Add(len(files)) + go func() { + wg.Wait() + err := os.RemoveAll(m.tempDir) + m.finish <- struct{}{} + if err != nil { + log.Errorf("failed to remove aria2 temp dir: %+v", err.Error()) + } + }() + for i, _ := range files { + file := files[i] + TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ + Name: fmt.Sprintf("transfer %s to [%s](%s)", file.Path, storage.GetStorage().MountPath, dstDirActualPath), + Func: func(tsk *task.Task[uint64]) error { + defer wg.Done() + mimetype := utils.GetMimeType(file.Path) + rc, err := file.GetReadCloser() + if err != nil { + return errors.Wrapf(err, "failed to open file %s", file.Path) + } + stream := &model.FileStream{ + Obj: &model.Object{ + Name: path.Base(file.Path), + Size: file.Size, + Modified: time.Now(), + IsFolder: false, + }, + ReadCloser: rc, + Mimetype: mimetype, + } + relDir, err := filepath.Rel(m.tempDir, filepath.Dir(file.Path)) + if err != nil { + log.Errorf("find relation directory error: %v", err) + } + newDistDir := filepath.Join(dstDirActualPath, relDir) + return op.Put(tsk.Ctx, storage, newDistDir, stream, tsk.SetProgress) + }, + })) + } + return nil +} diff --git a/internal/offline_download/tools.go b/internal/offline_download/tools.go new file mode 100644 index 00000000..ca328f24 --- /dev/null +++ b/internal/offline_download/tools.go @@ -0,0 +1,33 @@ +package offline_download + +import ( + "fmt" + + "github.com/alist-org/alist/v3/pkg/task" +) + +var ( + Tools = make(ToolsManager) + DownTaskManager = task.NewTaskManager[string](3) +) + +type ToolsManager map[string]Tool + +func (t ToolsManager) Get(name string) (Tool, error) { + if tool, ok := t[name]; ok { + return tool, nil + } + return nil, fmt.Errorf("tool %s not found", name) +} + +func (t ToolsManager) Add(name string, tool Tool) { + t[name] = tool +} + +func (t ToolsManager) Names() []string { + names := make([]string, 0, len(t)) + for name := range t { + names = append(names, name) + } + return names +} diff --git a/internal/offline_download/util.go b/internal/offline_download/util.go new file mode 100644 index 00000000..8f1c1484 --- /dev/null +++ b/internal/offline_download/util.go @@ -0,0 +1,27 @@ +package offline_download + +import ( + "os" + "path/filepath" +) + +func GetFiles(dir string) ([]*File, error) { + var files []*File + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + files = append(files, &File{ + Name: info.Name(), + Size: info.Size(), + Path: path, + }) + } + return nil + }) + if err != nil { + return nil, err + } + return files, nil +} diff --git a/server/handles/aria2.go b/server/handles/aria2.go index 325367a7..a78eb659 100644 --- a/server/handles/aria2.go +++ b/server/handles/aria2.go @@ -21,8 +21,8 @@ func SetAria2(c *gin.Context) { return } items := []model.SettingItem{ - {Key: conf.Aria2Uri, Value: req.Uri, Type: conf.TypeString, Group: model.ARIA2, Flag: model.PRIVATE}, - {Key: conf.Aria2Secret, Value: req.Secret, Type: conf.TypeString, Group: model.ARIA2, Flag: model.PRIVATE}, + {Key: conf.Aria2Uri, Value: req.Uri, Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE}, + {Key: conf.Aria2Secret, Value: req.Secret, Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE}, } if err := op.SaveSettingItems(items); err != nil { common.ErrorResp(c, err, 500)