194 lines
3.6 KiB
Go
Raw Permalink Normal View History

2025-01-06 23:38:49 -08:00
package s3
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"
"ultone/internal/log"
"github.com/samber/lo"
)
type s3_dir struct {
ctx context.Context
dir string
}
func (s *s3_dir) clean() {
now := time.Now()
filepath.Walk(s.dir, func(p string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
}
if strings.HasPrefix(info.Name(), ".meta_") {
bs, err := os.ReadFile(p)
if err != nil {
log.Warn(s.ctx, "s3.by_dir: read meta err, err = %s", err.Error())
return nil
}
nm := new(meta)
if err = json.Unmarshal(bs, nm); err != nil {
log.Warn(s.ctx, "s3.by_dir: unmarshal meta err, err = %s", err.Error())
return nil
}
if nm.ExpireAt > 0 && time.UnixMilli(nm.ExpireAt).Before(now) {
os.Remove(p)
base := strings.TrimPrefix(info.Name(), ".meta_")
dir := filepath.Dir(p)
os.Remove(path.Join(dir, base))
}
}
return nil
})
}
// Delete implements S3.
func (s *s3_dir) Delete(ctx context.Context, bucket string, key string) error {
location := path.Join(s.dir, bucket, key)
info, err := os.Stat(location)
if err != nil {
return err
}
if info.IsDir() {
return fmt.Errorf("target is dir")
}
os.Remove(path.Join(s.dir, bucket, ".meta_"+key))
return os.Remove(location)
}
// Get implements S3.
func (s *s3_dir) Get(ctx context.Context, bucket string, key string) (*Object, error) {
location := path.Join(s.dir, bucket, key)
info, err := os.Stat(location)
if err != nil {
return nil, err
}
if info.IsDir() {
return nil, fmt.Errorf("target is dir")
}
var (
f io.ReadCloser
bs []byte
obj = &Object{}
)
if f, err = os.Open(location); err != nil {
return nil, err
}
obj.Body = f
if bs, err = os.ReadFile(path.Join(s.dir, bucket, ".meta_"+key)); err != nil {
log.Warn(ctx, "s3.dir: open file meta err, err = %v", err)
return obj, nil
}
m := new(meta)
if err = json.Unmarshal(bs, m); err != nil {
log.Warn(ctx, "s3.dir: unmarshal meta err, err = %v", err)
return obj, nil
}
obj.ContentType = m.ContentType
obj.Size = m.Size
obj.ExpireAt = m.ExpireAt
return obj, nil
}
// Put implements S3.
func (s *s3_dir) Put(ctx context.Context, bucket string, key string, obj *Object) error {
if bucket != "" {
os.MkdirAll(path.Join(s.dir, bucket), 0o755)
}
location := path.Join(s.dir, bucket, key)
ml := path.Join(s.dir, bucket, ".meta_"+key)
bs, err := io.ReadAll(obj.Body)
if err != nil {
return err
}
if obj.Size != 0 && obj.Size != int64(len(bs)) {
return fmt.Errorf("object size mismatch")
}
obj.Size = int64(len(bs))
if obj.ContentType == "" {
obj.ContentType = http.DetectContentType(lo.If(len(bs) >= 128, bs[:128]).Else(bs))
}
if err = os.WriteFile(location, bs, 0o644); err != nil {
return err
}
m := &meta{
ContentType: obj.ContentType,
Size: obj.Size,
ExpireAt: obj.ExpireAt,
}
ms, _ := json.Marshal(m)
os.WriteFile(ml, ms, 0o644)
return nil
}
func newDirClient(ctx context.Context, dir string) (S3, error) {
dir = filepath.ToSlash(dir)
info, err := os.Stat(dir)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
if err = os.MkdirAll(dir, 0o755); err != nil {
return nil, err
}
}
return nil, err
}
if !info.IsDir() {
return nil, fmt.Errorf("target dir exist but not dir")
}
c := &s3_dir{ctx: ctx, dir: dir}
// todo: expire files
go func() {
ticker := time.NewTicker(10 * time.Minute)
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
log.Debug(ctx, "s3.by_dir: start clean up s3 dir @%s", t.Format("2006-01-02T15:04:05"))
c.clean()
}
}
}()
return c, nil
}