package blobs import ( "bytes" "context" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/sirupsen/logrus" "io" "nf-repo/internal/interfaces" "nf-repo/internal/model" "nf-repo/internal/verify" "strings" ) type s3Handler struct { client *s3.Client bucket string } func (s *s3Handler) Get(ctx context.Context, repo string, hash model.Hash) (io.ReadCloser, error) { var ( err error output *s3.GetObjectOutput ) if output, err = s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(hash.Hex), }); err != nil { return nil, err } return output.Body, nil } func (s *s3Handler) Stat(ctx context.Context, repo string, hash model.Hash) (int64, error) { var ( err error output *s3.GetObjectOutput ) if output, err = s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(hash.Hex), }); err != nil { return 0, err } defer output.Body.Close() return *output.ContentLength, nil } func (s *s3Handler) Put(ctx context.Context, repo string, hash model.Hash, rc io.ReadCloser) error { var ( err error nrc io.ReadCloser ) if nrc, err = verify.ReadCloser(rc, verify.SizeUnknown, hash); err != nil { return err } defer nrc.Close() var bs []byte if bs, err = io.ReadAll(nrc); err != nil { return err } if _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(hash.Hex), ACL: types.ObjectCannedACLPublicRead, Body: bytes.NewReader(bs), }, s3.WithAPIOptions( //v4.AddUnsignedPayloadMiddleware, //v4.RemoveComputePayloadSHA256Middleware, )); err != nil { logrus. WithField("path", "s3Handler.Put"). WithField("err", err). Debug() } return err } func (s *s3Handler) Delete(ctx context.Context, repo string, hash model.Hash) error { var ( err error ) if _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(hash.Hex), }); err != nil { return err } return nil } func NewS3BlobHandler( ctx context.Context, endpoint string, accessKey string, secretKey string, bucket string, ) interfaces.BlobHandler { var ( err error cfg aws.Config client *s3.Client ) customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { return aws.Endpoint{ URL: endpoint, }, nil }) if cfg, err = config.LoadDefaultConfig(ctx, config.WithEndpointResolverWithOptions(customResolver), config.WithCredentialsProvider(credentials.StaticCredentialsProvider{ Value: aws.Credentials{AccessKeyID: accessKey, SecretAccessKey: secretKey}, }), ); err != nil { logrus.Panicf("init s3 client err: %v", err) } client = s3.NewFromConfig(cfg, func(options *s3.Options) { options.UsePathStyle = true }) if _, err = client.HeadBucket(ctx, &s3.HeadBucketInput{ Bucket: aws.String(bucket), }); err != nil { if !strings.Contains(err.Error(), "404") { logrus.Panicf("init s3 bucket err: %v", err) } logrus.Info("s3 bucket not found, start create...") if _, err = client.CreateBucket(ctx, &s3.CreateBucketInput{ Bucket: aws.String(bucket), ACL: types.BucketCannedACLPublicRead, }); err != nil { logrus.Panicf("create s3 bucket err: %v", err) } } return &s3Handler{client: client, bucket: bucket} }