feat(baidu_netdisk): add retry to most operations (close #4863 in #4939)

This commit is contained in:
Sean
2023-08-07 13:44:28 +08:00
committed by GitHub
parent e9cb37122e
commit 7877184bee
17 changed files with 122 additions and 89 deletions

View File

@ -5,18 +5,19 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/errs"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/avast/retry-go"
log "github.com/sirupsen/logrus"
"io"
"math"
"os"
stdpath "path"
"strconv"
"strings"
"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
log "github.com/sirupsen/logrus"
)
type BaiduNetdisk struct {
@ -24,6 +25,9 @@ type BaiduNetdisk struct {
Addition
}
const BaiduFileAPI = "https://d.pcs.baidu.com/rest/2.0/pcs/superfile2"
const DefaultSliceSize int64 = 4 * 1024 * 1024
func (d *BaiduNetdisk) Config() driver.Config {
return config
}
@ -108,7 +112,9 @@ func (d *BaiduNetdisk) Remove(ctx context.Context, obj model.Obj) error {
}
func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) error {
tempFile, err := utils.CreateTempFile(stream.GetReadCloser())
streamSize := stream.GetSize()
tempFile, err := utils.CreateTempFile(stream.GetReadCloser(), stream.GetSize())
if err != nil {
return err
}
@ -116,19 +122,20 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
_ = tempFile.Close()
_ = os.Remove(tempFile.Name())
}()
var Default int64 = 4 * 1024 * 1024
count := int(math.Ceil(float64(stream.GetSize()) / float64(Default)))
var SliceSize int64 = 256 * 1024
count := int(math.Ceil(float64(streamSize) / float64(DefaultSliceSize)))
//cal md5 for first 256k data
const SliceSize int64 = 256 * 1024
// cal md5
h1 := md5.New()
h2 := md5.New()
block_list := make([]string, 0)
content_md5 := ""
slice_md5 := ""
left := stream.GetSize()
blockList := make([]string, 0)
contentMd5 := ""
sliceMd5 := ""
left := streamSize
for i := 0; i < count; i++ {
byteSize := Default
if left < Default {
byteSize := DefaultSliceSize
if left < DefaultSliceSize {
byteSize = left
}
left -= byteSize
@ -136,16 +143,16 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
if err != nil {
return err
}
block_list = append(block_list, fmt.Sprintf("\"%s\"", hex.EncodeToString(h2.Sum(nil))))
blockList = append(blockList, fmt.Sprintf("\"%s\"", hex.EncodeToString(h2.Sum(nil))))
h2.Reset()
}
content_md5 = hex.EncodeToString(h1.Sum(nil))
contentMd5 = hex.EncodeToString(h1.Sum(nil))
_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
return err
}
if stream.GetSize() <= SliceSize {
slice_md5 = content_md5
if streamSize <= SliceSize {
sliceMd5 = contentMd5
} else {
sliceData := make([]byte, SliceSize)
_, err = io.ReadFull(tempFile, sliceData)
@ -153,19 +160,15 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
return err
}
h2.Write(sliceData)
slice_md5 = hex.EncodeToString(h2.Sum(nil))
_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
return err
}
sliceMd5 = hex.EncodeToString(h2.Sum(nil))
}
rawPath := stdpath.Join(dstDir.GetPath(), stream.GetName())
path := encodeURIComponent(rawPath)
block_list_str := fmt.Sprintf("[%s]", strings.Join(block_list, ","))
block_list_str := fmt.Sprintf("[%s]", strings.Join(blockList, ","))
data := fmt.Sprintf("path=%s&size=%d&isdir=0&autoinit=1&block_list=%s&content-md5=%s&slice-md5=%s",
path, stream.GetSize(),
path, streamSize,
block_list_str,
content_md5, slice_md5)
contentMd5, sliceMd5)
params := map[string]string{
"method": "precreate",
}
@ -177,6 +180,7 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
}
log.Debugf("%+v", precreateResp)
if precreateResp.ReturnType == 2 {
//rapid upload, since got md5 match from baidu server
return nil
}
params = map[string]string{
@ -186,33 +190,49 @@ func (d *BaiduNetdisk) Put(ctx context.Context, dstDir model.Obj, stream model.F
"path": path,
"uploadid": precreateResp.Uploadid,
}
left = stream.GetSize()
var offset int64 = 0
for i, partseq := range precreateResp.BlockList {
if utils.IsCanceled(ctx) {
return ctx.Err()
}
byteSize := Default
if left < Default {
byteSize = left
}
left -= byteSize
u := "https://d.pcs.baidu.com/rest/2.0/pcs/superfile2"
params["partseq"] = strconv.Itoa(partseq)
res, err := base.RestyClient.R().
SetContext(ctx).
SetQueryParams(params).
SetFileReader("file", stream.GetName(), io.LimitReader(tempFile, byteSize)).
Post(u)
byteSize := int64(math.Min(float64(streamSize-offset), float64(DefaultSliceSize)))
err := retry.Do(func() error {
return d.uploadSlice(ctx, &params, stream.GetName(), tempFile, offset, byteSize)
},
retry.Context(ctx),
retry.Attempts(3))
if err != nil {
return err
}
log.Debugln(res.String())
offset += byteSize
if len(precreateResp.BlockList) > 0 {
up(i * 100 / len(precreateResp.BlockList))
}
}
_, err = d.create(rawPath, stream.GetSize(), 0, precreateResp.Uploadid, block_list_str)
_, err = d.create(rawPath, streamSize, 0, precreateResp.Uploadid, block_list_str)
return err
}
func (d *BaiduNetdisk) uploadSlice(ctx context.Context, params *map[string]string, fileName string, file *os.File, offset int64, byteSize int64) error {
_, err := file.Seek(offset, io.SeekStart)
if err != nil {
return err
}
res, err := base.RestyClient.R().
SetContext(ctx).
SetQueryParams(*params).
SetFileReader("file", fileName, io.LimitReader(file, byteSize)).
Post(BaiduFileAPI)
if err != nil {
return err
}
log.Debugln(res.RawResponse.Status + res.String())
errCode := utils.Json.Get(res.Body(), "error_code").ToInt()
errNo := utils.Json.Get(res.Body(), "errno").ToInt()
if errCode != 0 || errNo != 0 {
return errs.NewErr(errs.StreamIncomplete, "error in uploading to baidu, will retry. response=%s", res.String())
}
return nil
}
var _ driver.Driver = (*BaiduNetdisk)(nil)

View File

@ -2,6 +2,7 @@ package baidu_netdisk
import (
"fmt"
"github.com/avast/retry-go"
"net/http"
"net/url"
"strconv"
@ -51,31 +52,37 @@ func (d *BaiduNetdisk) _refreshToken() error {
}
func (d *BaiduNetdisk) request(furl string, method string, callback base.ReqCallback, resp interface{}) ([]byte, error) {
req := base.RestyClient.R()
req.SetQueryParam("access_token", d.AccessToken)
if callback != nil {
callback(req)
}
if resp != nil {
req.SetResult(resp)
}
res, err := req.Execute(method, furl)
if err != nil {
return nil, err
}
log.Debugf("[baidu_netdisk] req: %s, resp: %s", furl, res.String())
errno := utils.Json.Get(res.Body(), "errno").ToInt()
if errno != 0 {
if utils.SliceContains([]int{111, -6}, errno) {
err = d.refreshToken()
if err != nil {
return nil, err
}
return d.request(furl, method, callback, resp)
var result []byte
err := retry.Do(func() error {
req := base.RestyClient.R()
req.SetQueryParam("access_token", d.AccessToken)
if callback != nil {
callback(req)
}
return nil, fmt.Errorf("req: [%s] ,errno: %d, refer to https://pan.baidu.com/union/doc/", furl, errno)
}
return res.Body(), nil
if resp != nil {
req.SetResult(resp)
}
res, err := req.Execute(method, furl)
if err != nil {
return err
}
log.Debugf("[baidu_netdisk] req: %s, resp: %s", furl, res.String())
errno := utils.Json.Get(res.Body(), "errno").ToInt()
if errno != 0 {
if utils.SliceContains([]int{111, -6}, errno) {
log.Info("refreshing baidu_netdisk token.")
err2 := d.refreshToken()
if err2 != nil {
return err2
}
}
return fmt.Errorf("req: [%s] ,errno: %d, refer to https://pan.baidu.com/union/doc/", furl, errno)
}
result = res.Body()
return nil
},
retry.Attempts(3))
return result, err
}
func (d *BaiduNetdisk) get(pathname string, params map[string]string, resp interface{}) ([]byte, error) {