* wip: refactor offline download (#5331) * base tool * working: aria2 * refactor: change type of percentage to float64 * wip: adapt aria2 * wip: use items in offline_download * wip: use tool manager * wip: adapt qBittorrent * chore: fix typo * Squashed commit of the following: commit4fc0a77565
Author: Andy Hsu <i@nn.ci> Date: Fri Oct 20 21:06:25 2023 +0800 fix(baidu_netdisk): upload file > 4GB (close #5392) commitaaffaee2b5
Author: gmugu <94156510@qq.com> Date: Thu Oct 19 19:17:53 2023 +0800 perf(webdav): support request with cookies (#5391) commit8ef8023c20
Author: NewbieOrange <NewbieOrange@users.noreply.github.com> Date: Thu Oct 19 19:17:09 2023 +0800 fix(aliyundrive_open): upload progress for normal upload (#5398) commitcdfbe6dcf2
Author: foxxorcat <95907542+foxxorcat@users.noreply.github.com> Date: Wed Oct 18 16:27:07 2023 +0800 fix: hash gcid empty file (#5394) commit94d028743a
Author: Andy Hsu <i@nn.ci> Date: Sat Oct 14 13:17:51 2023 +0800 ci: remove `pr-welcome` label when close issue [skip ci] commit7f7335435c
Author: itsHenry <2671230065@qq.com> Date: Sat Oct 14 13:12:46 2023 +0800 feat(cloudreve): support thumbnail (#5373 close #5348) * feat(cloudreve): support thumbnail * chore: remove unnecessary code commitb9e192b29c
Author: foxxorcat <95907542+foxxorcat@users.noreply.github.com> Date: Thu Oct 12 20:57:12 2023 +0800 fix(115): limit request rate (#5367 close #5275) * fix(115):limit request rate * chore(115): fix unit of `limit_rate` --------- Co-authored-by: Andy Hsu <i@nn.ci> commit69a98eaef6
Author: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Date: Wed Oct 11 22:01:55 2023 +0800 fix(deps): update module github.com/aliyun/aliyun-oss-go-sdk to v2.2.9+incompatible (#5141) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> commit1ebc96a4e5
Author: Andy Hsu <i@nn.ci> Date: Tue Oct 10 18:32:00 2023 +0800 fix(wopan): fatal error concurrent map writes (close #5352) commit66e2324cac
Author: Andy Hsu <i@nn.ci> Date: Tue Oct 10 18:23:11 2023 +0800 chore(deps): upgrade dependencies commit7600dc28df
Author: Andy Hsu <i@nn.ci> Date: Tue Oct 10 18:13:58 2023 +0800 fix(aliyundrive_open): change default api to raw server (close #5358) commit8ef89ad0a4
Author: foxxorcat <95907542+foxxorcat@users.noreply.github.com> Date: Tue Oct 10 18:08:27 2023 +0800 fix(baidu_netdisk): hash and `error 2` (#5356) * fix(baidu):hash and error:2 * fix:invalid memory address commit35d672217d
Author: jeffmingup <1960588251@qq.com> Date: Sun Oct 8 19:29:45 2023 +0800 fix(onedrive_app): incorrect api on `_accessToken` (#5346) commit1a283bb272
Author: foxxorcat <95907542+foxxorcat@users.noreply.github.com> Date: Fri Oct 6 16:04:39 2023 +0800 feat(google_drive): add `hash_info`, `ctime`, `thumbnail` (#5334) commita008f54f4d
Author: nkh0472 <67589323+nkh0472@users.noreply.github.com> Date: Thu Oct 5 13:10:51 2023 +0800 docs: minor language improvements (#5329) [skip ci] * fix: adapt update progress type * Squashed commit of the following: commit65c5ec0c34
Author: itsHenry <2671230065@qq.com> Date: Sat Nov 4 13:35:09 2023 +0800 feat(cloudreve): folder size count and switch (#5457 close #5395) commita6325967d0
Author: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Date: Mon Oct 30 15:11:20 2023 +0800 fix(deps): update module github.com/charmbracelet/lipgloss to v0.9.1 (#5234) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> commit4dff49470a
Author: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Date: Mon Oct 30 15:10:36 2023 +0800 fix(deps): update golang.org/x/exp digest to 7918f67 (#5366) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> commitcc86d6f3d1
Author: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> Date: Sun Oct 29 14:45:55 2023 +0800 fix(deps): update module golang.org/x/net to v0.17.0 [security] (#5370) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> commitc0f9c8ebaf
Author: Andy Hsu <i@nn.ci> Date: Thu Oct 26 19:21:09 2023 +0800 feat: add ignore direct link params (close #5434)
This commit is contained in:
6
internal/offline_download/all.go
Normal file
6
internal/offline_download/all.go
Normal file
@ -0,0 +1,6 @@
|
||||
package offline_download
|
||||
|
||||
import (
|
||||
_ "github.com/alist-org/alist/v3/internal/offline_download/aria2"
|
||||
_ "github.com/alist-org/alist/v3/internal/offline_download/qbit"
|
||||
)
|
133
internal/offline_download/aria2/aria2.go
Normal file
133
internal/offline_download/aria2/aria2.go
Normal file
@ -0,0 +1,133 @@
|
||||
package aria2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/conf"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/offline_download/tool"
|
||||
"github.com/alist-org/alist/v3/internal/setting"
|
||||
"github.com/alist-org/alist/v3/pkg/aria2/rpc"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var notify = NewNotify()
|
||||
|
||||
type Aria2 struct {
|
||||
client rpc.Client
|
||||
}
|
||||
|
||||
func (a *Aria2) Items() []model.SettingItem {
|
||||
// aria2 settings
|
||||
return []model.SettingItem{
|
||||
{Key: conf.Aria2Uri, Value: "http://localhost:6800/jsonrpc", Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE},
|
||||
{Key: conf.Aria2Secret, Value: "", Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Aria2) Init() (string, error) {
|
||||
a.client = nil
|
||||
uri := setting.GetStr(conf.Aria2Uri)
|
||||
secret := setting.GetStr(conf.Aria2Secret)
|
||||
c, err := rpc.New(context.Background(), uri, secret, 4*time.Second, notify)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "failed to init aria2 client")
|
||||
}
|
||||
version, err := c.GetVersion()
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "failed get aria2 version")
|
||||
}
|
||||
a.client = c
|
||||
log.Infof("using aria2 version: %s", version.Version)
|
||||
return fmt.Sprintf("aria2 version: %s", version.Version), nil
|
||||
}
|
||||
|
||||
func (a *Aria2) IsReady() bool {
|
||||
return a.client != nil
|
||||
}
|
||||
|
||||
func (a *Aria2) AddURL(args *tool.AddUrlArgs) (string, error) {
|
||||
options := map[string]interface{}{
|
||||
"dir": args.TempDir,
|
||||
}
|
||||
gid, err := a.client.AddURI([]string{args.Url}, options)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return gid, nil
|
||||
}
|
||||
|
||||
func (a *Aria2) Remove(tid string) error {
|
||||
_, err := a.client.Remove(tid)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *Aria2) Status(tid string) (*tool.Status, error) {
|
||||
info, err := a.client.TellStatus(tid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
total, err := strconv.ParseUint(info.TotalLength, 10, 64)
|
||||
if err != nil {
|
||||
total = 0
|
||||
}
|
||||
downloaded, err := strconv.ParseUint(info.CompletedLength, 10, 64)
|
||||
if err != nil {
|
||||
downloaded = 0
|
||||
}
|
||||
s := &tool.Status{
|
||||
Completed: info.Status == "complete",
|
||||
Err: err,
|
||||
}
|
||||
s.Progress = float64(downloaded) / float64(total) * 100
|
||||
if len(info.FollowedBy) != 0 {
|
||||
s.NewTID = info.FollowedBy[0]
|
||||
notify.Signals.Delete(tid)
|
||||
//notify.Signals.Store(gid, m.c)
|
||||
}
|
||||
switch info.Status {
|
||||
case "complete":
|
||||
s.Completed = true
|
||||
case "error":
|
||||
s.Err = errors.Errorf("failed to download %s, error: %s", tid, info.ErrorMessage)
|
||||
case "active":
|
||||
s.Status = "aria2: " + info.Status
|
||||
if info.Seeder == "true" {
|
||||
s.Completed = true
|
||||
}
|
||||
case "waiting", "paused":
|
||||
s.Status = "aria2: " + info.Status
|
||||
case "removed":
|
||||
s.Err = errors.Errorf("failed to download %s, removed", tid)
|
||||
default:
|
||||
return nil, errors.Errorf("[aria2] unknown status %s", info.Status)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (a *Aria2) GetFiles(tid string) []tool.File {
|
||||
//files, err := a.client.GetFiles(tid)
|
||||
//if err != nil {
|
||||
// return nil
|
||||
//}
|
||||
//return utils.MustSliceConvert(files, func(f rpc.FileInfo) tool.File {
|
||||
// return tool.File{
|
||||
// //ReadCloser: nil,
|
||||
// Name: path.Base(f.Path),
|
||||
// Size: f.Length,
|
||||
// Path: "",
|
||||
// Modified: time.Time{},
|
||||
// }
|
||||
//})
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ tool.Tool = (*Aria2)(nil)
|
||||
|
||||
func init() {
|
||||
tool.Tools.Add("aria2", &Aria2{})
|
||||
}
|
70
internal/offline_download/aria2/notify.go
Normal file
70
internal/offline_download/aria2/notify.go
Normal file
@ -0,0 +1,70 @@
|
||||
package aria2
|
||||
|
||||
import (
|
||||
"github.com/alist-org/alist/v3/pkg/aria2/rpc"
|
||||
"github.com/alist-org/alist/v3/pkg/generic_sync"
|
||||
)
|
||||
|
||||
const (
|
||||
Downloading = iota
|
||||
Paused
|
||||
Stopped
|
||||
Completed
|
||||
Errored
|
||||
)
|
||||
|
||||
type Notify struct {
|
||||
Signals generic_sync.MapOf[string, chan int]
|
||||
}
|
||||
|
||||
func NewNotify() *Notify {
|
||||
return &Notify{Signals: generic_sync.MapOf[string, chan int]{}}
|
||||
}
|
||||
|
||||
func (n *Notify) OnDownloadStart(events []rpc.Event) {
|
||||
for _, e := range events {
|
||||
if signal, ok := n.Signals.Load(e.Gid); ok {
|
||||
signal <- Downloading
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notify) OnDownloadPause(events []rpc.Event) {
|
||||
for _, e := range events {
|
||||
if signal, ok := n.Signals.Load(e.Gid); ok {
|
||||
signal <- Paused
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notify) OnDownloadStop(events []rpc.Event) {
|
||||
for _, e := range events {
|
||||
if signal, ok := n.Signals.Load(e.Gid); ok {
|
||||
signal <- Stopped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notify) OnDownloadComplete(events []rpc.Event) {
|
||||
for _, e := range events {
|
||||
if signal, ok := n.Signals.Load(e.Gid); ok {
|
||||
signal <- Completed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notify) OnDownloadError(events []rpc.Event) {
|
||||
for _, e := range events {
|
||||
if signal, ok := n.Signals.Load(e.Gid); ok {
|
||||
signal <- Errored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notify) OnBtDownloadComplete(events []rpc.Event) {
|
||||
for _, e := range events {
|
||||
if signal, ok := n.Signals.Load(e.Gid); ok {
|
||||
signal <- Completed
|
||||
}
|
||||
}
|
||||
}
|
80
internal/offline_download/qbit/qbit.go
Normal file
80
internal/offline_download/qbit/qbit.go
Normal file
@ -0,0 +1,80 @@
|
||||
package qbit
|
||||
|
||||
import (
|
||||
"github.com/alist-org/alist/v3/internal/conf"
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/offline_download/tool"
|
||||
"github.com/alist-org/alist/v3/internal/qbittorrent"
|
||||
"github.com/alist-org/alist/v3/internal/setting"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type QBittorrent struct {
|
||||
client qbittorrent.Client
|
||||
}
|
||||
|
||||
func (a *QBittorrent) Items() []model.SettingItem {
|
||||
// qBittorrent settings
|
||||
return []model.SettingItem{
|
||||
{Key: conf.QbittorrentUrl, Value: "http://admin:adminadmin@localhost:8080/", Type: conf.TypeString, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE},
|
||||
{Key: conf.QbittorrentSeedtime, Value: "0", Type: conf.TypeNumber, Group: model.OFFLINE_DOWNLOAD, Flag: model.PRIVATE},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *QBittorrent) Init() (string, error) {
|
||||
a.client = nil
|
||||
url := setting.GetStr(conf.QbittorrentUrl)
|
||||
qbClient, err := qbittorrent.New(url)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
a.client = qbClient
|
||||
return "ok", nil
|
||||
}
|
||||
|
||||
func (a *QBittorrent) IsReady() bool {
|
||||
return a.client != nil
|
||||
}
|
||||
|
||||
func (a *QBittorrent) AddURL(args *tool.AddUrlArgs) (string, error) {
|
||||
err := a.client.AddFromLink(args.Url, args.TempDir, args.UID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return args.UID, nil
|
||||
}
|
||||
|
||||
func (a *QBittorrent) Remove(tid string) error {
|
||||
err := a.client.Delete(tid, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *QBittorrent) Status(tid string) (*tool.Status, error) {
|
||||
info, err := a.client.GetInfo(tid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &tool.Status{}
|
||||
s.Progress = float64(info.Completed) / float64(info.Size) * 100
|
||||
switch info.State {
|
||||
case qbittorrent.UPLOADING, qbittorrent.PAUSEDUP, qbittorrent.QUEUEDUP, qbittorrent.STALLEDUP, qbittorrent.FORCEDUP, qbittorrent.CHECKINGUP:
|
||||
s.Completed = true
|
||||
case qbittorrent.ALLOCATING, qbittorrent.DOWNLOADING, qbittorrent.METADL, qbittorrent.PAUSEDDL, qbittorrent.QUEUEDDL, qbittorrent.STALLEDDL, qbittorrent.CHECKINGDL, qbittorrent.FORCEDDL, qbittorrent.CHECKINGRESUMEDATA, qbittorrent.MOVING:
|
||||
s.Status = "[qBittorrent] downloading"
|
||||
case qbittorrent.ERROR, qbittorrent.MISSINGFILES, qbittorrent.UNKNOWN:
|
||||
s.Err = errors.Errorf("[qBittorrent] failed to download %s, error: %s", tid, info.State)
|
||||
default:
|
||||
s.Err = errors.Errorf("[qBittorrent] unknown error occurred downloading %s", tid)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (a *QBittorrent) GetFiles(tid string) []tool.File {
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ tool.Tool = (*QBittorrent)(nil)
|
||||
|
||||
func init() {
|
||||
tool.Tools.Add("qBittorrent", &QBittorrent{})
|
||||
}
|
84
internal/offline_download/tool/add.go
Normal file
84
internal/offline_download/tool/add.go
Normal file
@ -0,0 +1,84 @@
|
||||
package tool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/conf"
|
||||
"github.com/alist-org/alist/v3/internal/errs"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/pkg/task"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type AddURLArgs struct {
|
||||
URL string
|
||||
DstDirPath string
|
||||
Tool string
|
||||
}
|
||||
|
||||
func AddURL(ctx context.Context, args *AddURLArgs) error {
|
||||
// get tool
|
||||
tool, err := Tools.Get(args.Tool)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed get tool")
|
||||
}
|
||||
// check tool is ready
|
||||
if !tool.IsReady() {
|
||||
// try to init tool
|
||||
if _, err := tool.Init(); err != nil {
|
||||
return errors.Wrapf(err, "failed init tool %s", args.Tool)
|
||||
}
|
||||
}
|
||||
// check storage
|
||||
storage, dstDirActualPath, err := op.GetStorageAndActualPath(args.DstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
// check is it could upload
|
||||
if storage.Config().NoUpload {
|
||||
return errors.WithStack(errs.UploadNotSupported)
|
||||
}
|
||||
// check path is valid
|
||||
obj, err := op.Get(ctx, storage, dstDirActualPath)
|
||||
if err != nil {
|
||||
if !errs.IsObjectNotFound(err) {
|
||||
return errors.WithMessage(err, "failed get object")
|
||||
}
|
||||
} else {
|
||||
if !obj.IsDir() {
|
||||
// can't add to a file
|
||||
return errors.WithStack(errs.NotFolder)
|
||||
}
|
||||
}
|
||||
|
||||
uid := uuid.NewString()
|
||||
tempDir := filepath.Join(conf.Conf.TempDir, args.Tool, uid)
|
||||
signal := make(chan int)
|
||||
gid, err := tool.AddURL(&AddUrlArgs{
|
||||
Url: args.URL,
|
||||
UID: uid,
|
||||
TempDir: tempDir,
|
||||
Signal: signal,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "[%s] failed to add uri %s", args.Tool, args.URL)
|
||||
}
|
||||
DownTaskManager.Submit(task.WithCancelCtx(&task.Task[string]{
|
||||
ID: gid,
|
||||
Name: fmt.Sprintf("download %s to [%s](%s)", args.URL, storage.GetStorage().MountPath, dstDirActualPath),
|
||||
Func: func(tsk *task.Task[string]) error {
|
||||
m := &Monitor{
|
||||
tool: tool,
|
||||
tsk: tsk,
|
||||
tempDir: tempDir,
|
||||
dstDirPath: args.DstDirPath,
|
||||
signal: signal,
|
||||
}
|
||||
return m.Loop()
|
||||
},
|
||||
}))
|
||||
return nil
|
||||
}
|
17
internal/offline_download/tool/all_test.go
Normal file
17
internal/offline_download/tool/all_test.go
Normal file
@ -0,0 +1,17 @@
|
||||
package tool_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/offline_download/tool"
|
||||
)
|
||||
|
||||
func TestGetFiles(t *testing.T) {
|
||||
files, err := tool.GetFiles("..")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, file := range files {
|
||||
t.Log(file.Name, file.Size, file.Path, file.Modified)
|
||||
}
|
||||
}
|
59
internal/offline_download/tool/base.go
Normal file
59
internal/offline_download/tool/base.go
Normal file
@ -0,0 +1,59 @@
|
||||
package tool
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
)
|
||||
|
||||
type AddUrlArgs struct {
|
||||
Url string
|
||||
UID string
|
||||
TempDir string
|
||||
Signal chan int
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
Progress float64
|
||||
NewTID string
|
||||
Completed bool
|
||||
Status string
|
||||
Err error
|
||||
}
|
||||
|
||||
type Tool interface {
|
||||
// Items return the setting items the tool need
|
||||
Items() []model.SettingItem
|
||||
Init() (string, error)
|
||||
IsReady() bool
|
||||
// AddURL add an uri to download, return the task id
|
||||
AddURL(args *AddUrlArgs) (string, error)
|
||||
// Remove the download if task been canceled
|
||||
Remove(tid string) error
|
||||
// Status return the status of the download task, if an error occurred, return the error in Status.Err
|
||||
Status(tid string) (*Status, error)
|
||||
// GetFiles return the files of the download task, if nil, means walk the temp dir to get the files
|
||||
GetFiles(tid string) []File
|
||||
}
|
||||
|
||||
type File struct {
|
||||
// ReadCloser for http client
|
||||
io.ReadCloser
|
||||
Name string
|
||||
Size int64
|
||||
Path string
|
||||
Modified time.Time
|
||||
}
|
||||
|
||||
func (f *File) GetReadCloser() (io.ReadCloser, error) {
|
||||
if f.ReadCloser != nil {
|
||||
return f.ReadCloser, nil
|
||||
}
|
||||
file, err := os.Open(f.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return file, nil
|
||||
}
|
159
internal/offline_download/tool/monitor.go
Normal file
159
internal/offline_download/tool/monitor.go
Normal file
@ -0,0 +1,159 @@
|
||||
package tool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/internal/op"
|
||||
"github.com/alist-org/alist/v3/internal/stream"
|
||||
"github.com/alist-org/alist/v3/pkg/task"
|
||||
"github.com/alist-org/alist/v3/pkg/utils"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Monitor struct {
|
||||
tool Tool
|
||||
tsk *task.Task[string]
|
||||
tempDir string
|
||||
retried int
|
||||
dstDirPath string
|
||||
finish chan struct{}
|
||||
signal chan int
|
||||
}
|
||||
|
||||
func (m *Monitor) Loop() error {
|
||||
m.finish = make(chan struct{})
|
||||
var (
|
||||
err error
|
||||
ok bool
|
||||
)
|
||||
outer:
|
||||
for {
|
||||
select {
|
||||
case <-m.tsk.Ctx.Done():
|
||||
err := m.tool.Remove(m.tsk.ID)
|
||||
return err
|
||||
case <-m.signal:
|
||||
ok, err = m.Update()
|
||||
if ok {
|
||||
break outer
|
||||
}
|
||||
case <-time.After(time.Second * 2):
|
||||
ok, err = m.Update()
|
||||
if ok {
|
||||
break outer
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.tsk.SetStatus("aria2 download completed, transferring")
|
||||
<-m.finish
|
||||
m.tsk.SetStatus("completed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update download status, return true if download completed
|
||||
func (m *Monitor) Update() (bool, error) {
|
||||
info, err := m.tool.Status(m.tsk.ID)
|
||||
if err != nil {
|
||||
m.retried++
|
||||
log.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried)
|
||||
return false, nil
|
||||
}
|
||||
if m.retried > 5 {
|
||||
return true, errors.Errorf("failed to get status of %s, retried %d times", m.tsk.ID, m.retried)
|
||||
}
|
||||
m.retried = 0
|
||||
m.tsk.SetProgress(info.Progress)
|
||||
m.tsk.SetStatus("tool: " + info.Status)
|
||||
if info.NewTID != "" {
|
||||
log.Debugf("followen by: %+v", info.NewTID)
|
||||
DownTaskManager.RawTasks().Delete(m.tsk.ID)
|
||||
m.tsk.ID = info.NewTID
|
||||
DownTaskManager.RawTasks().Store(m.tsk.ID, m.tsk)
|
||||
return false, nil
|
||||
}
|
||||
// if download completed
|
||||
if info.Completed {
|
||||
err := m.Complete()
|
||||
return true, errors.WithMessage(err, "failed to transfer file")
|
||||
}
|
||||
// if download failed
|
||||
if info.Err != nil {
|
||||
return true, errors.Errorf("failed to download %s, error: %s", m.tsk.ID, info.Err.Error())
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var TransferTaskManager = task.NewTaskManager(3, func(k *uint64) {
|
||||
atomic.AddUint64(k, 1)
|
||||
})
|
||||
|
||||
func (m *Monitor) Complete() error {
|
||||
// check dstDir again
|
||||
storage, dstDirActualPath, err := op.GetStorageAndActualPath(m.dstDirPath)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed get storage")
|
||||
}
|
||||
var files []File
|
||||
if f := m.tool.GetFiles(m.tsk.ID); f != nil {
|
||||
files = f
|
||||
} else {
|
||||
files, err = GetFiles(m.tempDir)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to get files")
|
||||
}
|
||||
}
|
||||
// upload files
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(files))
|
||||
go func() {
|
||||
wg.Wait()
|
||||
err := os.RemoveAll(m.tempDir)
|
||||
m.finish <- struct{}{}
|
||||
if err != nil {
|
||||
log.Errorf("failed to remove aria2 temp dir: %+v", err.Error())
|
||||
}
|
||||
}()
|
||||
for i, _ := range files {
|
||||
file := files[i]
|
||||
TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
|
||||
Name: fmt.Sprintf("transfer %s to [%s](%s)", file.Path, storage.GetStorage().MountPath, dstDirActualPath),
|
||||
Func: func(tsk *task.Task[uint64]) error {
|
||||
defer wg.Done()
|
||||
mimetype := utils.GetMimeType(file.Path)
|
||||
rc, err := file.GetReadCloser()
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to open file %s", file.Path)
|
||||
}
|
||||
s := &stream.FileStream{
|
||||
Ctx: nil,
|
||||
Obj: &model.Object{
|
||||
Name: filepath.Base(file.Path),
|
||||
Size: file.Size,
|
||||
Modified: file.Modified,
|
||||
IsFolder: false,
|
||||
},
|
||||
Reader: rc,
|
||||
Mimetype: mimetype,
|
||||
Closers: utils.NewClosers(rc),
|
||||
}
|
||||
relDir, err := filepath.Rel(m.tempDir, filepath.Dir(file.Path))
|
||||
if err != nil {
|
||||
log.Errorf("find relation directory error: %v", err)
|
||||
}
|
||||
newDistDir := filepath.Join(dstDirActualPath, relDir)
|
||||
return op.Put(tsk.Ctx, storage, newDistDir, s, tsk.SetProgress)
|
||||
},
|
||||
}))
|
||||
}
|
||||
return nil
|
||||
}
|
42
internal/offline_download/tool/tools.go
Normal file
42
internal/offline_download/tool/tools.go
Normal file
@ -0,0 +1,42 @@
|
||||
package tool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/alist-org/alist/v3/internal/model"
|
||||
"github.com/alist-org/alist/v3/pkg/task"
|
||||
)
|
||||
|
||||
var (
|
||||
Tools = make(ToolsManager)
|
||||
DownTaskManager = task.NewTaskManager[string](3)
|
||||
)
|
||||
|
||||
type ToolsManager map[string]Tool
|
||||
|
||||
func (t ToolsManager) Get(name string) (Tool, error) {
|
||||
if tool, ok := t[name]; ok {
|
||||
return tool, nil
|
||||
}
|
||||
return nil, fmt.Errorf("tool %s not found", name)
|
||||
}
|
||||
|
||||
func (t ToolsManager) Add(name string, tool Tool) {
|
||||
t[name] = tool
|
||||
}
|
||||
|
||||
func (t ToolsManager) Names() []string {
|
||||
names := make([]string, 0, len(t))
|
||||
for name := range t {
|
||||
names = append(names, name)
|
||||
}
|
||||
return names
|
||||
}
|
||||
|
||||
func (t ToolsManager) Items() []model.SettingItem {
|
||||
var items []model.SettingItem
|
||||
for _, tool := range t {
|
||||
items = append(items, tool.Items()...)
|
||||
}
|
||||
return items
|
||||
}
|
28
internal/offline_download/tool/util.go
Normal file
28
internal/offline_download/tool/util.go
Normal file
@ -0,0 +1,28 @@
|
||||
package tool
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func GetFiles(dir string) ([]File, error) {
|
||||
var files []File
|
||||
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
files = append(files, File{
|
||||
Name: info.Name(),
|
||||
Size: info.Size(),
|
||||
Path: path,
|
||||
Modified: info.ModTime(),
|
||||
})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return files, nil
|
||||
}
|
Reference in New Issue
Block a user