feat: add database.s3 support
This commit is contained in:
193
internal/database/s3/by_dir.go
Normal file
193
internal/database/s3/by_dir.go
Normal file
@ -0,0 +1,193 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"ultone/internal/log"
|
||||
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type s3_dir struct {
|
||||
ctx context.Context
|
||||
dir string
|
||||
}
|
||||
|
||||
func (s *s3_dir) clean() {
|
||||
now := time.Now()
|
||||
|
||||
filepath.Walk(s.dir, func(p string, info fs.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if strings.HasPrefix(info.Name(), ".meta_") {
|
||||
bs, err := os.ReadFile(p)
|
||||
if err != nil {
|
||||
log.Warn(s.ctx, "s3.by_dir: read meta err, err = %s", err.Error())
|
||||
return nil
|
||||
}
|
||||
|
||||
nm := new(meta)
|
||||
if err = json.Unmarshal(bs, nm); err != nil {
|
||||
log.Warn(s.ctx, "s3.by_dir: unmarshal meta err, err = %s", err.Error())
|
||||
return nil
|
||||
}
|
||||
|
||||
if nm.ExpireAt > 0 && time.UnixMilli(nm.ExpireAt).Before(now) {
|
||||
os.Remove(p)
|
||||
base := strings.TrimPrefix(info.Name(), ".meta_")
|
||||
dir := filepath.Dir(p)
|
||||
os.Remove(path.Join(dir, base))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Delete implements S3.
|
||||
func (s *s3_dir) Delete(ctx context.Context, bucket string, key string) error {
|
||||
location := path.Join(s.dir, bucket, key)
|
||||
info, err := os.Stat(location)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return fmt.Errorf("target is dir")
|
||||
}
|
||||
|
||||
os.Remove(path.Join(s.dir, bucket, ".meta_"+key))
|
||||
|
||||
return os.Remove(location)
|
||||
}
|
||||
|
||||
// Get implements S3.
|
||||
func (s *s3_dir) Get(ctx context.Context, bucket string, key string) (*Object, error) {
|
||||
location := path.Join(s.dir, bucket, key)
|
||||
info, err := os.Stat(location)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
return nil, fmt.Errorf("target is dir")
|
||||
}
|
||||
|
||||
var (
|
||||
f io.ReadCloser
|
||||
bs []byte
|
||||
obj = &Object{}
|
||||
)
|
||||
|
||||
if f, err = os.Open(location); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
obj.Body = f
|
||||
|
||||
if bs, err = os.ReadFile(path.Join(s.dir, bucket, ".meta_"+key)); err != nil {
|
||||
log.Warn(ctx, "s3.dir: open file meta err, err = %v", err)
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
m := new(meta)
|
||||
if err = json.Unmarshal(bs, m); err != nil {
|
||||
log.Warn(ctx, "s3.dir: unmarshal meta err, err = %v", err)
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
obj.ContentType = m.ContentType
|
||||
obj.Size = m.Size
|
||||
obj.ExpireAt = m.ExpireAt
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// Put implements S3.
|
||||
func (s *s3_dir) Put(ctx context.Context, bucket string, key string, obj *Object) error {
|
||||
if bucket != "" {
|
||||
os.MkdirAll(path.Join(s.dir, bucket), 0o755)
|
||||
}
|
||||
|
||||
location := path.Join(s.dir, bucket, key)
|
||||
ml := path.Join(s.dir, bucket, ".meta_"+key)
|
||||
|
||||
bs, err := io.ReadAll(obj.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if obj.Size != 0 && obj.Size != int64(len(bs)) {
|
||||
return fmt.Errorf("object size mismatch")
|
||||
}
|
||||
|
||||
obj.Size = int64(len(bs))
|
||||
|
||||
if obj.ContentType == "" {
|
||||
obj.ContentType = http.DetectContentType(lo.If(len(bs) >= 128, bs[:128]).Else(bs))
|
||||
}
|
||||
|
||||
if err = os.WriteFile(location, bs, 0o644); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m := &meta{
|
||||
ContentType: obj.ContentType,
|
||||
Size: obj.Size,
|
||||
ExpireAt: obj.ExpireAt,
|
||||
}
|
||||
|
||||
ms, _ := json.Marshal(m)
|
||||
os.WriteFile(ml, ms, 0o644)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newDirClient(ctx context.Context, dir string) (S3, error) {
|
||||
dir = filepath.ToSlash(dir)
|
||||
info, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
if errors.Is(err, fs.ErrNotExist) {
|
||||
if err = os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !info.IsDir() {
|
||||
return nil, fmt.Errorf("target dir exist but not dir")
|
||||
}
|
||||
|
||||
c := &s3_dir{ctx: ctx, dir: dir}
|
||||
|
||||
// todo: expire files
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case t := <-ticker.C:
|
||||
log.Debug(ctx, "s3.by_dir: start clean up s3 dir @%s", t.Format("2006-01-02T15:04:05"))
|
||||
c.clean()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return c, nil
|
||||
}
|
112
internal/database/s3/by_s3.go
Normal file
112
internal/database/s3/by_s3.go
Normal file
@ -0,0 +1,112 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"ultone/internal/log"
|
||||
"ultone/internal/tool"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
)
|
||||
|
||||
type s3_client struct {
|
||||
client *s3.Client
|
||||
}
|
||||
|
||||
// Delete implements S3.
|
||||
func (s *s3_client) Delete(ctx context.Context, bucket string, key string) error {
|
||||
var err error
|
||||
|
||||
if _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements S3.
|
||||
func (s *s3_client) Get(ctx context.Context, bucket string, key string) (*Object, error) {
|
||||
var (
|
||||
err error
|
||||
res *s3.GetObjectOutput
|
||||
)
|
||||
|
||||
if res, err = s.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Object{
|
||||
ContentType: *res.ContentType,
|
||||
Body: res.Body,
|
||||
ExpireAt: res.Expires.UnixMilli(),
|
||||
Size: *res.ContentLength,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Put implements S3.
|
||||
func (s *s3_client) Put(ctx context.Context, bucket string, key string, obj *Object) error {
|
||||
var err error
|
||||
|
||||
if _, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(bucket),
|
||||
Key: aws.String(key),
|
||||
ACL: types.ObjectCannedACLPublicRead,
|
||||
Body: obj.Body,
|
||||
ContentType: aws.String(obj.ContentType),
|
||||
Expires: aws.Time(time.UnixMilli(obj.ExpireAt)),
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newS3Client(ctx context.Context, endpoint, access, key string) (S3, error) {
|
||||
var (
|
||||
err error
|
||||
sdkConfig aws.Config
|
||||
output *s3.ListBucketsOutput
|
||||
)
|
||||
|
||||
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
return aws.Endpoint{
|
||||
URL: endpoint,
|
||||
}, nil
|
||||
})
|
||||
|
||||
if sdkConfig, err = config.LoadDefaultConfig(
|
||||
ctx,
|
||||
config.WithEndpointResolverWithOptions(customResolver),
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s3Client := s3.NewFromConfig(sdkConfig, func(o *s3.Options) {
|
||||
// o.BaseEndpoint = aws.String(endpoint)
|
||||
// o.EndpointResolverV2 = &resolverV2{}
|
||||
o.Credentials = aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(access, key, ""))
|
||||
o.UsePathStyle = true
|
||||
o.Region = "auto"
|
||||
})
|
||||
|
||||
if output, err = s3Client.ListBuckets(tool.Timeout(5), &s3.ListBucketsInput{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, item := range output.Buckets {
|
||||
log.Debug(ctx, "s3.New: list bucket name = %s", *item.Name)
|
||||
}
|
||||
|
||||
return &s3_client{client: s3Client}, nil
|
||||
}
|
35
internal/database/s3/new.go
Normal file
35
internal/database/s3/new.go
Normal file
@ -0,0 +1,35 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
var Default S3
|
||||
|
||||
func New(ctx context.Context, uri string) (S3, error) {
|
||||
ins, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch ins.Scheme {
|
||||
case "http", "https":
|
||||
if ins.User == nil {
|
||||
return nil, fmt.Errorf("missing access or key")
|
||||
}
|
||||
access := ins.User.Username()
|
||||
key, _ := ins.User.Password()
|
||||
return newS3Client(ctx, fmt.Sprintf("%s://%s", ins.Scheme, ins.Host), access, key)
|
||||
case "dir":
|
||||
return newDirClient(ctx, ins.Host)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid new s3 uri scheme: %s", ins.Scheme)
|
||||
}
|
||||
}
|
||||
|
||||
func Init(ctx context.Context, uri string) (err error) {
|
||||
Default, err = New(ctx, uri)
|
||||
return err
|
||||
}
|
25
internal/database/s3/s3.go
Normal file
25
internal/database/s3/s3.go
Normal file
@ -0,0 +1,25 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
type meta struct {
|
||||
ContentType string `json:"content_type"`
|
||||
Size int64 `json:"size"`
|
||||
ExpireAt int64 `json:"expire_at"`
|
||||
}
|
||||
|
||||
type Object struct {
|
||||
ContentType string
|
||||
Body io.ReadCloser
|
||||
Size int64
|
||||
ExpireAt int64
|
||||
}
|
||||
|
||||
type S3 interface {
|
||||
Get(ctx context.Context, bucket, key string) (*Object, error)
|
||||
Put(ctx context.Context, bucket, key string, obj *Object) error
|
||||
Delete(ctx context.Context, bucket, key string) error
|
||||
}
|
Reference in New Issue
Block a user