diff --git a/drivers/cloudreve/driver.go b/drivers/cloudreve/driver.go index d0ab30b6..8c2321b8 100644 --- a/drivers/cloudreve/driver.go +++ b/drivers/cloudreve/driver.go @@ -162,6 +162,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File switch r.Policy.Type { case "onedrive": err = d.upOneDrive(ctx, stream, u, up) + case "s3": + err = d.upS3(ctx, stream, u, up) case "remote": // 从机存储 err = d.upRemote(ctx, stream, u, up) case "local": // 本机存储 diff --git a/drivers/cloudreve/types.go b/drivers/cloudreve/types.go index a7c3919e..8a465f01 100644 --- a/drivers/cloudreve/types.go +++ b/drivers/cloudreve/types.go @@ -21,11 +21,12 @@ type Policy struct { } type UploadInfo struct { - SessionID string `json:"sessionID"` - ChunkSize int `json:"chunkSize"` - Expires int `json:"expires"` - UploadURLs []string `json:"uploadURLs"` - Credential string `json:"credential,omitempty"` + SessionID string `json:"sessionID"` + ChunkSize int `json:"chunkSize"` + Expires int `json:"expires"` + UploadURLs []string `json:"uploadURLs"` + Credential string `json:"credential,omitempty"` // local + CompleteURL string `json:"completeURL,omitempty"` // s3 } type DirectoryResp struct { diff --git a/drivers/cloudreve/util.go b/drivers/cloudreve/util.go index cffa7988..1fd5ed8a 100644 --- a/drivers/cloudreve/util.go +++ b/drivers/cloudreve/util.go @@ -312,3 +312,82 @@ func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u } return nil } + +func (d *Cloudreve) upS3(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error { + var finish int64 = 0 + var chunk int = 0 + var etags []string + DEFAULT := int64(u.ChunkSize) + for finish < stream.GetSize() { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + utils.Log.Debugf("[Cloudreve-S3] upload: %d", finish) + var byteSize = DEFAULT + left := stream.GetSize() - finish + if left < DEFAULT { + byteSize = left + } + byteData := make([]byte, byteSize) + n, err := io.ReadFull(stream, byteData) + utils.Log.Debug(err, n) + if err != nil { + return err + } + req, err := http.NewRequest("PUT", u.UploadURLs[chunk], + driver.NewLimitedUploadStream(ctx, bytes.NewBuffer(byteData))) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.ContentLength = byteSize + finish += byteSize + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + _ = res.Body.Close() + etags = append(etags, res.Header.Get("ETag")) + up(float64(finish) * 100 / float64(stream.GetSize())) + chunk++ + } + + // s3LikeFinishUpload + // https://github.com/cloudreve/frontend/blob/b485bf297974cbe4834d2e8e744ae7b7e5b2ad39/src/component/Uploader/core/api/index.ts#L204-L252 + bodyBuilder := &strings.Builder{} + bodyBuilder.WriteString("") + for i, etag := range etags { + bodyBuilder.WriteString(fmt.Sprintf( + `%d%s`, + i+1, // PartNumber 从 1 开始 + etag, + )) + } + bodyBuilder.WriteString("") + req, err := http.NewRequest( + "POST", + u.CompleteURL, + strings.NewReader(bodyBuilder.String()), + ) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/xml") + req.Header.Set("User-Agent", d.getUA()) + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + body, _ := io.ReadAll(res.Body) + return fmt.Errorf("up status: %d, error: %s", res.StatusCode, string(body)) + } + + // 上传成功发送回调请求 + err = d.request(http.MethodGet, "/callback/s3/"+u.SessionID, nil, nil) + if err != nil { + return err + } + return nil +}