fix(qbittorrent): fix two file transferring related bugs [skip ci] (#3501)

* fix(qbittorrent): delete qbittorrent task before transferring

* fix(qbittorrent): parse the path correctly when the torrent contains folders
This commit is contained in:
kdxcxs 2023-02-18 18:54:51 +08:00 committed by GitHub
parent 84219d3d70
commit 3c7512f64a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -56,8 +56,7 @@ outer:
for { for {
select { select {
case <-m.tsk.Ctx.Done(): case <-m.tsk.Ctx.Done():
err = qbclient.Delete(m.tsk.ID) return qbclient.Delete(m.tsk.ID)
return err
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
completed, err = m.update() completed, err = m.update()
if completed { if completed {
@ -102,7 +101,7 @@ var TransferTaskManager = task.NewTaskManager(3, func(k *uint64) {
func (m *Monitor) complete() error { func (m *Monitor) complete() error {
// check dstDir again // check dstDir again
storage, dstDirActualPath, err := op.GetStorageAndActualPath(m.dstDirPath) storage, dstBaseDir, err := op.GetStorageAndActualPath(m.dstDirPath)
if err != nil { if err != nil {
return errors.WithMessage(err, "failed get storage") return errors.WithMessage(err, "failed get storage")
} }
@ -112,6 +111,12 @@ func (m *Monitor) complete() error {
return errors.Wrapf(err, "failed to get files of %s", m.tsk.ID) return errors.Wrapf(err, "failed to get files of %s", m.tsk.ID)
} }
log.Debugf("files len: %d", len(files)) log.Debugf("files len: %d", len(files))
// delete qbittorrent task but do not delete the files before transferring to avoid qbittorrent
// accessing downloaded files and throw `cannot access the file because it is being used by another process` error
err = qbclient.Delete(m.tsk.ID)
if err != nil {
return err
}
// upload files // upload files
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(files)) wg.Add(len(files))
@ -124,20 +129,23 @@ func (m *Monitor) complete() error {
} }
}() }()
for _, file := range files { for _, file := range files {
filePath := filepath.Join(m.tempDir, file.Name) tempPath := filepath.Join(m.tempDir, file.Name)
dstPath := filepath.Join(dstBaseDir, file.Name)
dstDir := filepath.Dir(dstPath)
fileName := filepath.Base(dstPath)
TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{ TransferTaskManager.Submit(task.WithCancelCtx(&task.Task[uint64]{
Name: fmt.Sprintf("transfer %s to [%s](%s)", filePath, storage.GetStorage().MountPath, dstDirActualPath), Name: fmt.Sprintf("transfer %s to [%s](%s)", tempPath, storage.GetStorage().MountPath, dstPath),
Func: func(tsk *task.Task[uint64]) error { Func: func(tsk *task.Task[uint64]) error {
defer wg.Done() defer wg.Done()
size := file.Size size := file.Size
mimetype := utils.GetMimeType(filePath) mimetype := utils.GetMimeType(tempPath)
f, err := os.Open(filePath) f, err := os.Open(tempPath)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to open file %s", filePath) return errors.Wrapf(err, "failed to open file %s", tempPath)
} }
stream := &model.FileStream{ stream := &model.FileStream{
Obj: &model.Object{ Obj: &model.Object{
Name: file.Name, Name: fileName,
Size: size, Size: size,
Modified: time.Now(), Modified: time.Now(),
IsFolder: false, IsFolder: false,
@ -145,7 +153,7 @@ func (m *Monitor) complete() error {
ReadCloser: f, ReadCloser: f,
Mimetype: mimetype, Mimetype: mimetype,
} }
return op.Put(tsk.Ctx, storage, dstDirActualPath, stream, tsk.SetProgress) return op.Put(tsk.Ctx, storage, dstDir, stream, tsk.SetProgress)
}, },
})) }))
} }