feat: add support for halalcloud driver (#6696)

This commit is contained in:
YangXu
2024-07-07 13:20:34 +08:00
committed by GitHub
parent 316f3569a5
commit ca30849e24
9 changed files with 1075 additions and 64 deletions

View File

@ -0,0 +1,406 @@
package halalcloud
import (
"context"
"crypto/sha1"
"fmt"
"github.com/alist-org/alist/v3/drivers/base"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/http_range"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/city404/v6-public-rpc-proto/go/v6/common"
pbPublicUser "github.com/city404/v6-public-rpc-proto/go/v6/user"
pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile"
"github.com/rclone/rclone/lib/readers"
"github.com/zzzhr1990/go-common-entity/userfile"
"io"
"net/url"
"path"
"strconv"
"time"
)
type HalalCloud struct {
*HalalCommon
model.Storage
Addition
uploadThread int
}
func (d *HalalCloud) Config() driver.Config {
return config
}
func (d *HalalCloud) GetAddition() driver.Additional {
return &d.Addition
}
func (d *HalalCloud) Init(ctx context.Context) error {
d.uploadThread, _ = strconv.Atoi(d.UploadThread)
if d.uploadThread < 1 || d.uploadThread > 32 {
d.uploadThread, d.UploadThread = 3, "3"
}
if d.HalalCommon == nil {
d.HalalCommon = &HalalCommon{
Common: &Common{},
AuthService: &AuthService{
appID: func() string {
if d.Addition.AppID != "" {
return d.Addition.AppID
}
return AppID
}(),
appVersion: func() string {
if d.Addition.AppVersion != "" {
return d.Addition.AppVersion
}
return AppVersion
}(),
appSecret: func() string {
if d.Addition.AppSecret != "" {
return d.Addition.AppSecret
}
return AppSecret
}(),
tr: &TokenResp{
RefreshToken: d.Addition.RefreshToken,
},
},
UserInfo: &UserInfo{},
refreshTokenFunc: func(token string) error {
d.Addition.RefreshToken = token
op.MustSaveDriverStorage(d)
return nil
},
}
}
// 防止重复登录
if d.Addition.RefreshToken == "" || !d.IsLogin() {
as, err := d.NewAuthServiceWithOauth()
if err != nil {
d.GetStorage().SetStatus(fmt.Sprintf("%+v", err.Error()))
return err
}
d.HalalCommon.AuthService = as
d.SetTokenResp(as.tr)
op.MustSaveDriverStorage(d)
}
var err error
d.HalalCommon.serv, err = d.NewAuthService(d.Addition.RefreshToken)
if err != nil {
return err
}
return nil
}
func (d *HalalCloud) Drop(ctx context.Context) error {
return nil
}
func (d *HalalCloud) List(ctx context.Context, dir model.Obj, args model.ListArgs) ([]model.Obj, error) {
return d.getFiles(ctx, dir)
}
func (d *HalalCloud) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
return d.getLink(ctx, file, args)
}
func (d *HalalCloud) MakeDir(ctx context.Context, parentDir model.Obj, dirName string) (model.Obj, error) {
return d.makeDir(ctx, parentDir, dirName)
}
func (d *HalalCloud) Move(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
return d.move(ctx, srcObj, dstDir)
}
func (d *HalalCloud) Rename(ctx context.Context, srcObj model.Obj, newName string) (model.Obj, error) {
return d.rename(ctx, srcObj, newName)
}
func (d *HalalCloud) Copy(ctx context.Context, srcObj, dstDir model.Obj) (model.Obj, error) {
return d.copy(ctx, srcObj, dstDir)
}
func (d *HalalCloud) Remove(ctx context.Context, obj model.Obj) error {
return d.remove(ctx, obj)
}
func (d *HalalCloud) Put(ctx context.Context, dstDir model.Obj, stream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
return d.put(ctx, dstDir, stream, up)
}
func (d *HalalCloud) IsLogin() bool {
if d.AuthService.tr == nil {
return false
}
serv, err := d.NewAuthService(d.Addition.RefreshToken)
if err != nil {
return false
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
result, err := pbPublicUser.NewPubUserClient(serv.GetGrpcConnection()).Get(ctx, &pbPublicUser.User{
Identity: "",
})
if result == nil || err != nil {
return false
}
d.UserInfo.Identity = result.Identity
d.UserInfo.CreateTs = result.CreateTs
d.UserInfo.Name = result.Name
d.UserInfo.UpdateTs = result.UpdateTs
return true
}
type HalalCommon struct {
*Common
*AuthService // 登录信息
*UserInfo // 用户信息
refreshTokenFunc func(token string) error
serv *AuthService
}
func (d *HalalCloud) SetTokenResp(tr *TokenResp) {
d.Addition.RefreshToken = tr.RefreshToken
}
func (d *HalalCloud) getFiles(ctx context.Context, dir model.Obj) ([]model.Obj, error) {
files := make([]model.Obj, 0)
limit := int64(100)
token := ""
client := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection())
opDir := d.GetCurrentDir(dir)
for {
result, err := client.List(ctx, &pubUserFile.FileListRequest{
Parent: &pubUserFile.File{Path: opDir},
ListInfo: &common.ScanListRequest{
Limit: limit,
Token: token,
},
})
if err != nil {
return nil, err
}
for i := 0; len(result.Files) > i; i++ {
files = append(files, (*Files)(result.Files[i]))
}
if result.ListInfo == nil || result.ListInfo.Token == "" {
break
}
token = result.ListInfo.Token
}
return files, nil
}
func (d *HalalCloud) getLink(ctx context.Context, file model.Obj, args model.LinkArgs) (*model.Link, error) {
client := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection())
ctx1, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
result, err := client.ParseFileSlice(ctx1, (*pubUserFile.File)(file.(*Files)))
if err != nil {
return nil, err
}
fileAddrs := []*pubUserFile.SliceDownloadInfo{}
var addressDuration int64
nodesNumber := len(result.RawNodes)
nodesIndex := nodesNumber - 1
startIndex, endIndex := 0, nodesIndex
for nodesIndex >= 0 {
if nodesIndex >= 200 {
endIndex = 200
} else {
endIndex = nodesNumber
}
for ; endIndex <= nodesNumber; endIndex += 200 {
if endIndex == 0 {
endIndex = 1
}
sliceAddress, err := client.GetSliceDownloadAddress(ctx, &pubUserFile.SliceDownloadAddressRequest{
Identity: result.RawNodes[startIndex:endIndex],
Version: 1,
})
if err != nil {
return nil, err
}
addressDuration = sliceAddress.ExpireAt
fileAddrs = append(fileAddrs, sliceAddress.Addresses...)
startIndex = endIndex
nodesIndex -= 200
}
}
size := result.FileSize
chunks := getChunkSizes(result.Sizes)
var finalClosers utils.Closers
resultRangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
length := httpRange.Length
if httpRange.Length >= 0 && httpRange.Start+httpRange.Length >= size {
length = -1
}
if err != nil {
return nil, fmt.Errorf("open download file failed: %w", err)
}
oo := &openObject{
ctx: ctx,
d: fileAddrs,
chunk: &[]byte{},
chunks: &chunks,
skip: httpRange.Start,
sha: result.Sha1,
shaTemp: sha1.New(),
}
finalClosers.Add(oo)
return readers.NewLimitedReadCloser(oo, length), nil
}
var duration time.Duration
if addressDuration != 0 {
duration = time.Until(time.UnixMilli(addressDuration))
} else {
duration = time.Until(time.Now().Add(time.Hour))
}
resultRangeReadCloser := &model.RangeReadCloser{RangeReader: resultRangeReader, Closers: finalClosers}
return &model.Link{
RangeReadCloser: resultRangeReadCloser,
Expiration: &duration,
}, nil
}
func (d *HalalCloud) makeDir(ctx context.Context, dir model.Obj, name string) (model.Obj, error) {
newDir := userfile.NewFormattedPath(d.GetCurrentOpDir(dir, []string{name}, 0)).GetPath()
_, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Create(ctx, &pubUserFile.File{
Path: newDir,
})
return nil, err
}
func (d *HalalCloud) move(ctx context.Context, obj model.Obj, dir model.Obj) (model.Obj, error) {
oldDir := userfile.NewFormattedPath(d.GetCurrentDir(obj)).GetPath()
newDir := userfile.NewFormattedPath(d.GetCurrentDir(dir)).GetPath()
_, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Move(ctx, &pubUserFile.BatchOperationRequest{
Source: []*pubUserFile.File{
{
Identity: obj.GetID(),
Path: oldDir,
},
},
Dest: &pubUserFile.File{
Identity: dir.GetID(),
Path: newDir,
},
})
return nil, err
}
func (d *HalalCloud) rename(ctx context.Context, obj model.Obj, name string) (model.Obj, error) {
id := obj.GetID()
newPath := userfile.NewFormattedPath(d.GetCurrentOpDir(obj, []string{name}, 0)).GetPath()
_, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Rename(ctx, &pubUserFile.File{
Path: newPath,
Identity: id,
Name: name,
})
return nil, err
}
func (d *HalalCloud) copy(ctx context.Context, obj model.Obj, dir model.Obj) (model.Obj, error) {
id := obj.GetID()
sourcePath := userfile.NewFormattedPath(d.GetCurrentDir(obj)).GetPath()
if len(id) > 0 {
sourcePath = ""
}
dest := &pubUserFile.File{
Identity: dir.GetID(),
Path: userfile.NewFormattedPath(d.GetCurrentDir(dir)).GetPath(),
}
_, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Copy(ctx, &pubUserFile.BatchOperationRequest{
Source: []*pubUserFile.File{
{
Path: sourcePath,
Identity: id,
},
},
Dest: dest,
})
return nil, err
}
func (d *HalalCloud) remove(ctx context.Context, obj model.Obj) error {
id := obj.GetID()
newPath := userfile.NewFormattedPath(d.GetCurrentDir(obj)).GetPath()
//if len(id) > 0 {
// newPath = ""
//}
_, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).Delete(ctx, &pubUserFile.BatchOperationRequest{
Source: []*pubUserFile.File{
{
Path: newPath,
Identity: id,
},
},
})
return err
}
func (d *HalalCloud) put(ctx context.Context, dstDir model.Obj, fileStream model.FileStreamer, up driver.UpdateProgress) (model.Obj, error) {
newDir := path.Join(dstDir.GetPath(), fileStream.GetName())
result, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).CreateUploadToken(ctx, &pubUserFile.File{
Path: newDir,
})
if err != nil {
return nil, err
}
u, _ := url.Parse(result.Endpoint)
u.Host = "s3." + u.Host
result.Endpoint = u.String()
s, err := session.NewSession(&aws.Config{
HTTPClient: base.HttpClient,
Credentials: credentials.NewStaticCredentials(result.AccessKey, result.SecretKey, result.Token),
Region: aws.String(result.Region),
Endpoint: aws.String(result.Endpoint),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
return nil, err
}
uploader := s3manager.NewUploader(s, func(u *s3manager.Uploader) {
u.Concurrency = d.uploadThread
})
if fileStream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize {
uploader.PartSize = fileStream.GetSize() / (s3manager.MaxUploadParts - 1)
}
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(result.Bucket),
Key: aws.String(result.Key),
Body: io.TeeReader(fileStream, driver.NewProgress(fileStream.GetSize(), up)),
})
return nil, err
}
var _ driver.Driver = (*HalalCloud)(nil)

View File

@ -0,0 +1,38 @@
package halalcloud
import (
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/op"
)
type Addition struct {
// Usually one of two
driver.RootPath
// define other
RefreshToken string `json:"refresh_token" required:"true" help:"login type is refresh_token,this is required"`
UploadThread string `json:"upload_thread" default:"3" help:"1 <= thread <= 32"`
AppID string `json:"app_id" required:"true" default:"devDebugger/1.0"`
AppVersion string `json:"app_version" required:"true" default:"1.0.0"`
AppSecret string `json:"app_secret" required:"true" default:"Nkx3Y2xvZ2luLmNu"`
}
var config = driver.Config{
Name: "HalalCloud",
LocalSort: false,
OnlyLocal: true,
OnlyProxy: true,
NoCache: false,
NoUpload: false,
NeedMs: false,
DefaultRoot: "/",
CheckStatus: false,
Alert: "",
NoOverwriteUpload: false,
}
func init() {
op.RegisterDriver(func() driver.Driver {
return &HalalCloud{}
})
}

View File

@ -0,0 +1,52 @@
package halalcloud
import "google.golang.org/grpc"
func defaultOptions() halalOptions {
return halalOptions{
// onRefreshTokenRefreshed: func(string) {},
grpcOptions: []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024 * 1024 * 32)),
// grpc.WithMaxMsgSize(1024 * 1024 * 1024),
},
}
}
type HalalOption interface {
apply(*halalOptions)
}
// halalOptions configure a RPC call. halalOptions are set by the HalalOption
// values passed to Dial.
type halalOptions struct {
onTokenRefreshed func(accessToken string, accessTokenExpiredAt int64, refreshToken string, refreshTokenExpiredAt int64)
grpcOptions []grpc.DialOption
}
// funcDialOption wraps a function that modifies halalOptions into an
// implementation of the DialOption interface.
type funcDialOption struct {
f func(*halalOptions)
}
func (fdo *funcDialOption) apply(do *halalOptions) {
fdo.f(do)
}
func newFuncDialOption(f func(*halalOptions)) *funcDialOption {
return &funcDialOption{
f: f,
}
}
func WithRefreshTokenRefreshedCallback(s func(accessToken string, accessTokenExpiredAt int64, refreshToken string, refreshTokenExpiredAt int64)) HalalOption {
return newFuncDialOption(func(o *halalOptions) {
o.onTokenRefreshed = s
})
}
func WithGrpcDialOptions(opts ...grpc.DialOption) HalalOption {
return newFuncDialOption(func(o *halalOptions) {
o.grpcOptions = opts
})
}

101
drivers/halalcloud/types.go Normal file
View File

@ -0,0 +1,101 @@
package halalcloud
import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/city404/v6-public-rpc-proto/go/v6/common"
pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile"
"google.golang.org/grpc"
"time"
)
type AuthService struct {
appID string
appVersion string
appSecret string
grpcConnection *grpc.ClientConn
dopts halalOptions
tr *TokenResp
}
type TokenResp struct {
AccessToken string `json:"accessToken,omitempty"`
AccessTokenExpiredAt int64 `json:"accessTokenExpiredAt,omitempty"`
RefreshToken string `json:"refreshToken,omitempty"`
RefreshTokenExpiredAt int64 `json:"refreshTokenExpiredAt,omitempty"`
}
type UserInfo struct {
Identity string `json:"identity,omitempty"`
UpdateTs int64 `json:"updateTs,omitempty"`
Name string `json:"name,omitempty"`
CreateTs int64 `json:"createTs,omitempty"`
}
type OrderByInfo struct {
Field string `json:"field,omitempty"`
Asc bool `json:"asc,omitempty"`
}
type ListInfo struct {
Token string `json:"token,omitempty"`
Limit int64 `json:"limit,omitempty"`
OrderBy []*OrderByInfo `json:"order_by,omitempty"`
Version int32 `json:"version,omitempty"`
}
type FilesList struct {
Files []*Files `json:"files,omitempty"`
ListInfo *common.ScanListRequest `json:"list_info,omitempty"`
}
var _ model.Obj = (*Files)(nil)
type Files pubUserFile.File
func (f *Files) GetSize() int64 {
return f.Size
}
func (f *Files) GetName() string {
return f.Name
}
func (f *Files) ModTime() time.Time {
return time.UnixMilli(f.UpdateTs)
}
func (f *Files) CreateTime() time.Time {
return time.UnixMilli(f.UpdateTs)
}
func (f *Files) IsDir() bool {
return f.Dir
}
func (f *Files) GetHash() utils.HashInfo {
return utils.HashInfo{}
}
func (f *Files) GetID() string {
if len(f.Identity) == 0 {
f.Identity = "/"
}
return f.Identity
}
func (f *Files) GetPath() string {
return f.Path
}
type SteamFile struct {
file model.File
}
func (s *SteamFile) Read(p []byte) (n int, err error) {
return s.file.Read(p)
}
func (s *SteamFile) Close() error {
return s.file.Close()
}

385
drivers/halalcloud/util.go Normal file
View File

@ -0,0 +1,385 @@
package halalcloud
import (
"bytes"
"context"
"crypto/md5"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/pkg/utils"
pbPublicUser "github.com/city404/v6-public-rpc-proto/go/v6/user"
pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"hash"
"io"
"net/http"
"strconv"
"strings"
"sync"
"time"
)
const (
AppID = "devDebugger/1.0"
AppVersion = "1.0.0"
AppSecret = "Nkx3Y2xvZ2luLmNu"
)
const (
grpcServer = "grpcuserapi.2dland.cn:443"
grpcServerAuth = "grpcuserapi.2dland.cn"
)
func (d *HalalCloud) NewAuthServiceWithOauth(options ...HalalOption) (*AuthService, error) {
aService := &AuthService{}
err2 := errors.New("")
svc := d.HalalCommon.AuthService
for _, opt := range options {
opt.apply(&svc.dopts)
}
grpcOptions := svc.dopts.grpcOptions
grpcOptions = append(grpcOptions, grpc.WithAuthority(grpcServerAuth), grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})), grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctxx := svc.signContext(method, ctx)
err := invoker(ctxx, method, req, reply, cc, opts...) // invoking RPC method
return err
}))
grpcConnection, err := grpc.NewClient(grpcServer, grpcOptions...)
if err != nil {
return nil, err
}
defer grpcConnection.Close()
userClient := pbPublicUser.NewPubUserClient(grpcConnection)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stateString := uuid.New().String()
// queryValues.Add("callback", oauthToken.Callback)
oauthToken, err := userClient.CreateAuthToken(ctx, &pbPublicUser.LoginRequest{
ReturnType: 2,
State: stateString,
ReturnUrl: "",
})
if err != nil {
return nil, err
}
if len(oauthToken.State) < 1 {
oauthToken.State = stateString
}
if oauthToken.Url != "" {
return nil, fmt.Errorf(`need verify: <a target="_blank" href="%s">Click Here</a>`, oauthToken.Url)
}
return aService, err2
}
func (d *HalalCloud) NewAuthService(refreshToken string, options ...HalalOption) (*AuthService, error) {
svc := d.HalalCommon.AuthService
if len(refreshToken) < 1 {
refreshToken = d.Addition.RefreshToken
}
if len(d.tr.AccessToken) > 0 {
accessTokenExpiredAt := d.tr.AccessTokenExpiredAt
current := time.Now().UnixMilli()
if accessTokenExpiredAt < current {
// access token expired
d.tr.AccessToken = ""
d.tr.AccessTokenExpiredAt = 0
} else {
svc.tr.AccessTokenExpiredAt = accessTokenExpiredAt
svc.tr.AccessToken = d.tr.AccessToken
}
}
for _, opt := range options {
opt.apply(&svc.dopts)
}
grpcOptions := svc.dopts.grpcOptions
grpcOptions = append(grpcOptions, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(10*1024*1024), grpc.MaxCallRecvMsgSize(10*1024*1024)), grpc.WithAuthority(grpcServerAuth), grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})), grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctxx := svc.signContext(method, ctx)
err := invoker(ctxx, method, req, reply, cc, opts...) // invoking RPC method
if err != nil {
grpcStatus, ok := status.FromError(err)
if ok && grpcStatus.Code() == codes.Unauthenticated && strings.Contains(grpcStatus.Err().Error(), "invalid accesstoken") && len(refreshToken) > 0 {
// refresh token
refreshResponse, err := pbPublicUser.NewPubUserClient(cc).Refresh(ctx, &pbPublicUser.Token{
RefreshToken: refreshToken,
})
if err != nil {
return err
}
if len(refreshResponse.AccessToken) > 0 {
svc.tr.AccessToken = refreshResponse.AccessToken
svc.tr.AccessTokenExpiredAt = refreshResponse.AccessTokenExpireTs
svc.OnAccessTokenRefreshed(refreshResponse.AccessToken, refreshResponse.AccessTokenExpireTs, refreshResponse.RefreshToken, refreshResponse.RefreshTokenExpireTs)
}
// retry
ctxx := svc.signContext(method, ctx)
err = invoker(ctxx, method, req, reply, cc, opts...) // invoking RPC method
if err != nil {
return err
} else {
return nil
}
}
}
return err
}))
grpcConnection, err := grpc.NewClient(grpcServer, grpcOptions...)
if err != nil {
return nil, err
}
svc.grpcConnection = grpcConnection
return svc, err
}
func (s *AuthService) OnAccessTokenRefreshed(accessToken string, accessTokenExpiredAt int64, refreshToken string, refreshTokenExpiredAt int64) {
s.tr.AccessToken = accessToken
s.tr.AccessTokenExpiredAt = accessTokenExpiredAt
s.tr.RefreshToken = refreshToken
s.tr.RefreshTokenExpiredAt = refreshTokenExpiredAt
if s.dopts.onTokenRefreshed != nil {
s.dopts.onTokenRefreshed(accessToken, accessTokenExpiredAt, refreshToken, refreshTokenExpiredAt)
}
}
func (s *AuthService) GetGrpcConnection() *grpc.ClientConn {
return s.grpcConnection
}
func (s *AuthService) Close() {
_ = s.grpcConnection.Close()
}
func (s *AuthService) signContext(method string, ctx context.Context) context.Context {
var kvString []string
currentTimeStamp := strconv.FormatInt(time.Now().UnixMilli(), 10)
bufferedString := bytes.NewBufferString(method)
kvString = append(kvString, "timestamp", currentTimeStamp)
bufferedString.WriteString(currentTimeStamp)
kvString = append(kvString, "appid", AppID)
bufferedString.WriteString(AppID)
kvString = append(kvString, "appversion", AppVersion)
bufferedString.WriteString(AppVersion)
if s.tr != nil && len(s.tr.AccessToken) > 0 {
authorization := "Bearer " + s.tr.AccessToken
kvString = append(kvString, "authorization", authorization)
bufferedString.WriteString(authorization)
}
bufferedString.WriteString(AppSecret)
sign := GetMD5Hash(bufferedString.String())
kvString = append(kvString, "sign", sign)
return metadata.AppendToOutgoingContext(ctx, kvString...)
}
func (d *HalalCloud) GetCurrentOpDir(dir model.Obj, args []string, index int) string {
currentDir := dir.GetPath()
if len(currentDir) == 0 {
currentDir = "/"
}
opPath := currentDir + "/" + args[index]
if strings.HasPrefix(args[index], "/") {
opPath = args[index]
}
return opPath
}
func (d *HalalCloud) GetCurrentDir(dir model.Obj) string {
currentDir := dir.GetPath()
if len(currentDir) == 0 {
currentDir = "/"
}
return currentDir
}
type Common struct {
}
func getRawFiles(addr *pubUserFile.SliceDownloadInfo) ([]byte, error) {
if addr == nil {
return nil, errors.New("addr is nil")
}
client := http.Client{
Timeout: time.Duration(60 * time.Second), // Set timeout to 5 seconds
}
resp, err := client.Get(addr.DownloadAddress)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("bad status: %s, body: %s", resp.Status, body)
}
if addr.Encrypt > 0 {
cd := uint8(addr.Encrypt)
for idx := 0; idx < len(body); idx++ {
body[idx] = body[idx] ^ cd
}
}
if addr.StoreType != 10 {
sourceCid, err := cid.Decode(addr.Identity)
if err != nil {
return nil, err
}
checkCid, err := sourceCid.Prefix().Sum(body)
if err != nil {
return nil, err
}
if !checkCid.Equals(sourceCid) {
return nil, fmt.Errorf("bad cid: %s, body: %s", checkCid.String(), body)
}
}
return body, nil
}
type openObject struct {
ctx context.Context
mu sync.Mutex
d []*pubUserFile.SliceDownloadInfo
id int
skip int64
chunk *[]byte
chunks *[]chunkSize
closed bool
sha string
shaTemp hash.Hash
}
// get the next chunk
func (oo *openObject) getChunk(ctx context.Context) (err error) {
if oo.id >= len(*oo.chunks) {
return io.EOF
}
var chunk []byte
err = utils.Retry(3, time.Second, func() (err error) {
chunk, err = getRawFiles(oo.d[oo.id])
return err
})
if err != nil {
return err
}
oo.id++
oo.chunk = &chunk
return nil
}
// Read reads up to len(p) bytes into p.
func (oo *openObject) Read(p []byte) (n int, err error) {
oo.mu.Lock()
defer oo.mu.Unlock()
if oo.closed {
return 0, fmt.Errorf("read on closed file")
}
// Skip data at the start if requested
for oo.skip > 0 {
//size := 1024 * 1024
_, size, err := oo.ChunkLocation(oo.id)
if err != nil {
return 0, err
}
if oo.skip < int64(size) {
break
}
oo.id++
oo.skip -= int64(size)
}
if len(*oo.chunk) == 0 {
err = oo.getChunk(oo.ctx)
if err != nil {
return 0, err
}
if oo.skip > 0 {
*oo.chunk = (*oo.chunk)[oo.skip:]
oo.skip = 0
}
}
n = copy(p, *oo.chunk)
*oo.chunk = (*oo.chunk)[n:]
oo.shaTemp.Write(*oo.chunk)
return n, nil
}
// Close closed the file - MAC errors are reported here
func (oo *openObject) Close() (err error) {
oo.mu.Lock()
defer oo.mu.Unlock()
if oo.closed {
return nil
}
// 校验Sha1
if string(oo.shaTemp.Sum(nil)) != oo.sha {
return fmt.Errorf("failed to finish download: %w", err)
}
oo.closed = true
return nil
}
func GetMD5Hash(text string) string {
tHash := md5.Sum([]byte(text))
return hex.EncodeToString(tHash[:])
}
// chunkSize describes a size and position of chunk
type chunkSize struct {
position int64
size int
}
func getChunkSizes(sliceSize []*pubUserFile.SliceSize) (chunks []chunkSize) {
chunks = make([]chunkSize, 0)
for _, s := range sliceSize {
// 对最后一个做特殊处理
if s.EndIndex == 0 {
s.EndIndex = s.StartIndex
}
for j := s.StartIndex; j <= s.EndIndex; j++ {
chunks = append(chunks, chunkSize{position: j, size: int(s.Size)})
}
}
return chunks
}
func (oo *openObject) ChunkLocation(id int) (position int64, size int, err error) {
if id < 0 || id >= len(*oo.chunks) {
return 0, 0, errors.New("invalid arguments")
}
return (*oo.chunks)[id].position, (*oo.chunks)[id].size, nil
}