feat(cloudreve): s3 policy support (#8245)
* feat(cloudreve): s3 policy support * fix(cloudreve): correct potential off-by-one error in `etags` initialization
This commit is contained in:
@ -162,6 +162,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File
|
|||||||
switch r.Policy.Type {
|
switch r.Policy.Type {
|
||||||
case "onedrive":
|
case "onedrive":
|
||||||
err = d.upOneDrive(ctx, stream, u, up)
|
err = d.upOneDrive(ctx, stream, u, up)
|
||||||
|
case "s3":
|
||||||
|
err = d.upS3(ctx, stream, u, up)
|
||||||
case "remote": // 从机存储
|
case "remote": // 从机存储
|
||||||
err = d.upRemote(ctx, stream, u, up)
|
err = d.upRemote(ctx, stream, u, up)
|
||||||
case "local": // 本机存储
|
case "local": // 本机存储
|
||||||
|
@ -21,11 +21,12 @@ type Policy struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type UploadInfo struct {
|
type UploadInfo struct {
|
||||||
SessionID string `json:"sessionID"`
|
SessionID string `json:"sessionID"`
|
||||||
ChunkSize int `json:"chunkSize"`
|
ChunkSize int `json:"chunkSize"`
|
||||||
Expires int `json:"expires"`
|
Expires int `json:"expires"`
|
||||||
UploadURLs []string `json:"uploadURLs"`
|
UploadURLs []string `json:"uploadURLs"`
|
||||||
Credential string `json:"credential,omitempty"`
|
Credential string `json:"credential,omitempty"` // local
|
||||||
|
CompleteURL string `json:"completeURL,omitempty"` // s3
|
||||||
}
|
}
|
||||||
|
|
||||||
type DirectoryResp struct {
|
type DirectoryResp struct {
|
||||||
|
@ -312,3 +312,82 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error {
|
||||||
|
var finish int64 = 0
|
||||||
|
var chunk int = 0
|
||||||
|
var etags []string
|
||||||
|
DEFAULT := int64(u.ChunkSize)
|
||||||
|
for finish < stream.GetSize() {
|
||||||
|
if utils.IsCanceled(ctx) {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
utils.Log.Debugf("[Cloudreve-S3] upload: %d", finish)
|
||||||
|
var byteSize = DEFAULT
|
||||||
|
left := stream.GetSize() - finish
|
||||||
|
if left < DEFAULT {
|
||||||
|
byteSize = left
|
||||||
|
}
|
||||||
|
byteData := make([]byte, byteSize)
|
||||||
|
n, err := io.ReadFull(stream, byteData)
|
||||||
|
utils.Log.Debug(err, n)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req, err := http.NewRequest("PUT", u.UploadURLs[chunk],
|
||||||
|
driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(byteData)))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
req.ContentLength = byteSize
|
||||||
|
finish += byteSize
|
||||||
|
res, err := base.HttpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_ = res.Body.Close()
|
||||||
|
etags = append(etags, res.Header.Get("ETag"))
|
||||||
|
up(float64(finish) * 100 / float64(stream.GetSize()))
|
||||||
|
chunk++
|
||||||
|
}
|
||||||
|
|
||||||
|
// s3LikeFinishUpload
|
||||||
|
// https://github.com/cloudreve/frontend/blob/b485bf297974cbe4834d2e8e744ae7b7e5b2ad39/src/component/Uploader/core/api/index.ts#L204-L252
|
||||||
|
bodyBuilder := &strings.Builder{}
|
||||||
|
bodyBuilder.WriteString("<CompleteMultipartUpload>")
|
||||||
|
for i, etag := range etags {
|
||||||
|
bodyBuilder.WriteString(fmt.Sprintf(
|
||||||
|
`<Part><PartNumber>%d</PartNumber><ETag>%s</ETag></Part>`,
|
||||||
|
i+1, // PartNumber 从 1 开始
|
||||||
|
etag,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
bodyBuilder.WriteString("</CompleteMultipartUpload>")
|
||||||
|
req, err := http.NewRequest(
|
||||||
|
"POST",
|
||||||
|
u.CompleteURL,
|
||||||
|
strings.NewReader(bodyBuilder.String()),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/xml")
|
||||||
|
req.Header.Set("User-Agent", d.getUA())
|
||||||
|
res, err := base.HttpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(res.Body)
|
||||||
|
return fmt.Errorf("up status: %d, error: %s", res.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 上传成功发送回调请求
|
||||||
|
err = d.request(http.MethodGet, "/callback/s3/"+u.SessionID, nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user