From f7160ce416b23cdf8b671e6f0764929ce57746c6 Mon Sep 17 00:00:00 2001 From: loveuer Date: Sat, 17 Jan 2026 15:19:50 +0800 Subject: [PATCH] Initial commit: file_manager package with local and S3 support --- .gitignore | 1 + README.md | 101 ++++++ controller/file_manager/interface.go | 34 ++ controller/file_manager/local.go | 465 ++++++++++++++++++++++++++ controller/file_manager/local_test.go | 214 ++++++++++++ controller/file_manager/new.go | 84 +++++ controller/file_manager/s3.go | 423 +++++++++++++++++++++++ controller/file_manager/s3_test.go | 141 ++++++++ go.mod | 27 ++ go.sum | 36 ++ 10 files changed, 1526 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 controller/file_manager/interface.go create mode 100644 controller/file_manager/local.go create mode 100644 controller/file_manager/local_test.go create mode 100644 controller/file_manager/new.go create mode 100644 controller/file_manager/s3.go create mode 100644 controller/file_manager/s3_test.go create mode 100644 go.mod create mode 100644 go.sum diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1469f8f --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +x-* diff --git a/README.md b/README.md new file mode 100644 index 0000000..beedcf8 --- /dev/null +++ b/README.md @@ -0,0 +1,101 @@ +# upkg + +Go utility packages collection. + +## file_manager + +File management abstraction layer supporting multiple storage backends. + +### Features + +- **Local Storage**: Store files on local filesystem with manifest-based metadata +- **S3 Compatible Storage**: Support AWS S3, MinIO, and other S3-compatible services +- **SHA256 Verification**: Optional file integrity verification +- **Automatic Cleanup**: Configurable timeout for incomplete uploads and expiration for completed files +- **Persistent State**: State managed via manifest files (local) or object metadata (S3) + +### Usage + +```go +import "gitea.loveuer.com/loveuer/upkg/controller/file_manager" +``` + +#### Local Storage + +```go +fm := file_manager.New( + file_manager.WithDir("/path/to/uploads"), + file_manager.WithTimeout(5*time.Minute), + file_manager.WithExpire(24*time.Hour), +) +defer fm.CloseManager() +``` + +#### S3 Storage + +```go +fm := file_manager.New( + file_manager.WithS3( + "http://minio:9000", + "access-key", + "secret-key", + "bucket-name", + "us-east-1", + ), + file_manager.WithS3PathStyle(true), + file_manager.WithTimeout(10*time.Minute), + file_manager.WithExpire(48*time.Hour), +) +defer fm.CloseManager() +``` + +#### API + +```go +// Create creates a file record, returns code for upload +result, err := fm.Create(ctx, "filename.txt", 1024, "") + +// Upload uploads file content +total, written, err := fm.Upload(ctx, code, 0, 1024, reader) + +// Get retrieves file content +data, err := fm.Get(ctx, code) + +// GetInfo returns file metadata +info, err := fm.GetInfo(ctx, code) + +// Delete removes file +err := fm.Delete(ctx, code) + +// Close closes file handle +err := fm.Close(code) + +// CloseManager shuts down the manager +fm.CloseManager() +``` + +### Manifest Format (Local) + +```json +{ + "filename": "file.txt", + "size": 1024, + "sha256": "abc123...", + "path": "/uploads/abc123", + "create_time": "2024-01-01T00:00:00Z", + "complete": true +} +``` + +### S3 Object Metadata + +- `x-amz-meta-filename`: Original filename +- `x-amz-meta-size`: File size +- `x-amz-meta-sha256`: SHA256 hash (optional) +- `x-amz-meta-create-time`: Creation timestamp +- `x-amz-meta-complete`: Upload completion status +- `x-amz-meta-code`: Unique file code + +## License + +MIT diff --git a/controller/file_manager/interface.go b/controller/file_manager/interface.go new file mode 100644 index 0000000..1789e52 --- /dev/null +++ b/controller/file_manager/interface.go @@ -0,0 +1,34 @@ +package file_manager + +import ( + "context" + "io" + "time" +) + +// FileInfo 文件信息 +type FileInfo struct { + Filename string `json:"filename"` + Size int64 `json:"size"` + SHA256 string `json:"sha256,omitempty"` + Path string `json:"path"` + CreateTime time.Time `json:"create_time"` + Complete bool `json:"complete"` +} + +// CreateResult 创建文件结果 +type CreateResult struct { + Code string `json:"code"` + SHA256 string `json:"sha256,omitempty"` +} + +// FileManager 文件管理接口 +type FileManager interface { + Create(ctx context.Context, filename string, size int64, sha256 string) (*CreateResult, error) + Upload(ctx context.Context, code string, start int64, end int64, reader io.Reader) (total int64, size int64, err error) + Get(ctx context.Context, code string) ([]byte, error) + Close(code string) error + GetInfo(ctx context.Context, code string) (*FileInfo, error) + Delete(ctx context.Context, code string) error + CloseManager() +} diff --git a/controller/file_manager/local.go b/controller/file_manager/local.go new file mode 100644 index 0000000..df838cf --- /dev/null +++ b/controller/file_manager/local.go @@ -0,0 +1,465 @@ +package file_manager + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "time" +) + +type HashMismatchError struct { + Expected string + Actual string +} + +func (e *HashMismatchError) Error() string { + return fmt.Sprintf("SHA256 hash mismatch: expected %s, got %s", e.Expected, e.Actual) +} + +func NewHashMismatchError(expected, actual string) error { + return &HashMismatchError{ + Expected: expected, + Actual: actual, + } +} + +type FileNotReadyError struct { + Code string + CurrentSize int64 + ExpectedSize int64 +} + +func (e *FileNotReadyError) Error() string { + return fmt.Sprintf("file %s not ready: current size %d, expected %d", e.Code, e.CurrentSize, e.ExpectedSize) +} + +func NewFileNotReadyError(code string, currentSize, expectedSize int64) error { + return &FileNotReadyError{ + Code: code, + CurrentSize: currentSize, + ExpectedSize: expectedSize, + } +} + +type DeleteError struct { + Code string + Message string + Errors []string +} + +func (e *DeleteError) Error() string { + if len(e.Errors) == 0 { + return e.Message + } + return fmt.Sprintf("%s: %v", e.Message, e.Errors) +} + +type LocalFileManager struct { + dir string + fileHandles sync.Map + ctx context.Context + cancel context.CancelFunc + timeout time.Duration + expire time.Duration + verifyHash bool + mu sync.RWMutex +} + +func NewLocalFileManager(opts *option) *LocalFileManager { + defaultDir := "./uploads" + lfm := &LocalFileManager{ + dir: defaultDir, + timeout: time.Minute, + expire: 24 * time.Hour, + verifyHash: true, + } + + if opts != nil { + if opts.dir != nil { + lfm.dir = *opts.dir + } + if opts.timeout > 0 { + lfm.timeout = opts.timeout + } + if opts.expire > 0 { + lfm.expire = opts.expire + } + } + + lfm.ctx, lfm.cancel = context.WithCancel(context.Background()) + + go lfm.startCleaner() + + return lfm +} + +func (lfm *LocalFileManager) manifestPath(code string) string { + return filepath.Join(lfm.dir, "."+code+".manifest") +} + +func (lfm *LocalFileManager) readManifest(code string) (*FileInfo, error) { + manifestPath := lfm.manifestPath(code) + data, err := os.ReadFile(manifestPath) + if err != nil { + return nil, err + } + var fileInfo FileInfo + if err := json.Unmarshal(data, &fileInfo); err != nil { + return nil, err + } + return &fileInfo, nil +} + +func (lfm *LocalFileManager) writeManifest(code string, fileInfo *FileInfo) error { + manifestPath := lfm.manifestPath(code) + data, err := json.Marshal(fileInfo) + if err != nil { + return err + } + return os.WriteFile(manifestPath, data, 0644) +} + +func (lfm *LocalFileManager) deleteManifest(code string) error { + return os.Remove(lfm.manifestPath(code)) +} + +func (lfm *LocalFileManager) scanManifests() ([]string, error) { + var codes []string + entries, err := os.ReadDir(lfm.dir) + if err != nil { + return nil, err + } + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if len(name) > 10 && name[:1] == "." && name[len(name)-9:] == ".manifest" { + code := name[1 : len(name)-9] + codes = append(codes, code) + } + } + return codes, nil +} + +func (lfm *LocalFileManager) startCleaner() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-lfm.ctx.Done(): + return + case <-ticker.C: + lfm.cleanup() + } + } +} + +func (lfm *LocalFileManager) cleanup() { + codes, err := lfm.scanManifests() + if err != nil { + return + } + now := time.Now() + for _, code := range codes { + fileInfo, err := lfm.readManifest(code) + if err != nil { + continue + } + if fileInfo.Complete { + if now.Sub(fileInfo.CreateTime) > lfm.expire { + lfm.cleanupFile(code, fileInfo) + } + } else { + if now.Sub(fileInfo.CreateTime) > lfm.timeout { + lfm.cleanupFile(code, fileInfo) + } + } + } +} + +func (lfm *LocalFileManager) cleanupFile(code string, fileInfo *FileInfo) { + if fileHandleValue, exists := lfm.fileHandles.LoadAndDelete(code); exists { + file := fileHandleValue.(*os.File) + file.Close() + } + if fileInfo.Path != "" { + os.Remove(fileInfo.Path) + } + lfm.deleteManifest(code) +} + +func generateRandomCode(length int) (string, error) { + bytes := make([]byte, length/2) + if _, err := rand.Read(bytes); err != nil { + return "", err + } + return hex.EncodeToString(bytes), nil +} + +func (lfm *LocalFileManager) CloseManager() { + lfm.cancel() + lfm.fileHandles.Range(func(key, value any) bool { + code := key.(string) + file := value.(*os.File) + file.Close() + lfm.fileHandles.Delete(code) + return true + }) +} + +func (lfm *LocalFileManager) Create(ctx context.Context, filename string, size int64, sha256 string) (*CreateResult, error) { + code, err := generateRandomCode(16) + if err != nil { + return nil, err + } + + if err := os.MkdirAll(lfm.dir, 0755); err != nil { + return nil, err + } + + filePath := filepath.Join(lfm.dir, code) + + fileInfo := &FileInfo{ + Filename: filename, + Size: size, + SHA256: sha256, + Path: filePath, + CreateTime: time.Now(), + Complete: false, + } + + if err := lfm.writeManifest(code, fileInfo); err != nil { + return nil, err + } + + file, err := os.Create(filePath) + if err != nil { + lfm.deleteManifest(code) + return nil, err + } + + lfm.fileHandles.Store(code, file) + + return &CreateResult{ + Code: code, + SHA256: sha256, + }, nil +} + +func (lfm *LocalFileManager) Upload(ctx context.Context, code string, start int64, end int64, reader io.Reader) (int64, int64, error) { + fileHandleValue, exists := lfm.fileHandles.Load(code) + if !exists { + return 0, 0, os.ErrNotExist + } + + file := fileHandleValue.(*os.File) + + currentInfo, err := file.Stat() + if err != nil { + return 0, 0, err + } + + currentSize := currentInfo.Size() + + if start != currentSize { + return currentSize, 0, nil + } + + written, err := io.Copy(file, reader) + if err != nil { + return currentSize, 0, err + } + + newInfo, err := file.Stat() + if err != nil { + return currentSize, written, err + } + + totalSize := newInfo.Size() + + fileInfo, err := lfm.readManifest(code) + if err != nil { + return totalSize, written, os.ErrNotExist + } + + if end > 0 && totalSize == end && totalSize == fileInfo.Size { + if fileInfo.SHA256 != "" { + if _, err := file.Seek(0, 0); err != nil { + return totalSize, written, err + } + + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return totalSize, written, err + } + + calculatedHash := hex.EncodeToString(hasher.Sum(nil)) + + if calculatedHash != fileInfo.SHA256 { + return totalSize, written, NewHashMismatchError(fileInfo.SHA256, calculatedHash) + } + } + + if fileInfo.SHA256 == "" { + if _, err := file.Seek(0, 0); err != nil { + return totalSize, written, err + } + + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return totalSize, written, err + } + + calculatedHash := hex.EncodeToString(hasher.Sum(nil)) + fileInfo.SHA256 = calculatedHash + } + + fileInfo.Complete = true + if err := lfm.writeManifest(code, fileInfo); err != nil { + return totalSize, written, err + } + + file.Close() + lfm.fileHandles.Delete(code) + } + + return totalSize, written, nil +} + +func (lfm *LocalFileManager) verifyFileHash(code string, fileInfo *FileInfo) error { + if fileInfo.SHA256 == "" { + return nil + } + + file, err := os.Open(fileInfo.Path) + if err != nil { + return err + } + defer file.Close() + + hasher := sha256.New() + if _, err := io.Copy(hasher, file); err != nil { + return err + } + + calculatedHash := hex.EncodeToString(hasher.Sum(nil)) + + if calculatedHash != fileInfo.SHA256 { + lfm.cleanupFile(code, fileInfo) + return NewHashMismatchError(fileInfo.SHA256, calculatedHash) + } + + return nil +} + +func (lfm *LocalFileManager) Get(ctx context.Context, code string) ([]byte, error) { + fileInfo, err := lfm.readManifest(code) + if err != nil { + return nil, os.ErrNotExist + } + + fileHandleValue, handleExists := lfm.fileHandles.Load(code) + if handleExists { + file := fileHandleValue.(*os.File) + fileInfoStat, err := file.Stat() + if err != nil { + return nil, err + } + + if fileInfoStat.Size() != fileInfo.Size { + return nil, NewFileNotReadyError(code, fileInfoStat.Size(), fileInfo.Size) + } + + if file, exists := lfm.fileHandles.LoadAndDelete(code); exists { + f := file.(*os.File) + f.Close() + } + } + + if err := lfm.verifyFileHash(code, fileInfo); err != nil { + return nil, err + } + + return os.ReadFile(fileInfo.Path) +} + +func (lfm *LocalFileManager) GetInfo(ctx context.Context, code string) (*FileInfo, error) { + fileInfo, err := lfm.readManifest(code) + if err != nil { + return nil, os.ErrNotExist + } + + fileHandleValue, handleExists := lfm.fileHandles.Load(code) + if handleExists { + file := fileHandleValue.(*os.File) + fileInfoStat, err := file.Stat() + if err != nil { + return nil, err + } + + if fileInfoStat.Size() != fileInfo.Size { + return nil, NewFileNotReadyError(code, fileInfoStat.Size(), fileInfo.Size) + } + + if file, exists := lfm.fileHandles.LoadAndDelete(code); exists { + f := file.(*os.File) + f.Close() + } + } + + if err := lfm.verifyFileHash(code, fileInfo); err != nil { + return nil, err + } + + return fileInfo, nil +} + +func (lfm *LocalFileManager) Delete(ctx context.Context, code string) error { + fileInfo, err := lfm.readManifest(code) + if err != nil { + return os.ErrNotExist + } + + var errors []string + + if fileHandleValue, handleExists := lfm.fileHandles.LoadAndDelete(code); handleExists { + file := fileHandleValue.(*os.File) + if err := file.Close(); err != nil { + errors = append(errors, fmt.Sprintf("failed to close file handle: %v", err)) + } + } + + if err := os.Remove(fileInfo.Path); err != nil && !os.IsNotExist(err) { + errors = append(errors, fmt.Sprintf("failed to delete file: %v", err)) + } + + if err := lfm.deleteManifest(code); err != nil && !os.IsNotExist(err) { + errors = append(errors, fmt.Sprintf("failed to delete manifest: %v", err)) + } + + if len(errors) > 0 { + return &DeleteError{ + Code: code, + Message: "partial file deletion completed with errors", + Errors: errors, + } + } + + return nil +} + +func (lfm *LocalFileManager) Close(code string) error { + if fileHandleValue, exists := lfm.fileHandles.LoadAndDelete(code); exists { + file := fileHandleValue.(*os.File) + return file.Close() + } + return nil +} diff --git a/controller/file_manager/local_test.go b/controller/file_manager/local_test.go new file mode 100644 index 0000000..c1735c5 --- /dev/null +++ b/controller/file_manager/local_test.go @@ -0,0 +1,214 @@ +package file_manager + +import ( + "bytes" + "context" + "os" + "testing" + "time" +) + +func TestLocalUploadDownload(t *testing.T) { + fm := New( + WithDir("/tmp/upkg-test"), + WithTimeout(10*time.Minute), + WithExpire(24*time.Hour), + ) + defer fm.CloseManager() + + ctx := context.Background() + + content := []byte("hello world from local") + size := int64(len(content)) + reader := bytes.NewReader(content) + + result, err := fm.Create(ctx, "test.txt", size, "") + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + total, written, err := fm.Upload(ctx, result.Code, 0, size, reader) + if err != nil { + t.Fatalf("Upload failed: %v", err) + } + if total != size { + t.Errorf("expected total %d, got %d", size, total) + } + if written != size { + t.Errorf("expected written %d, got %d", size, written) + } + + data, err := fm.Get(ctx, result.Code) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if !bytes.Equal(data, content) { + t.Errorf("data mismatch, expected %s, got %s", content, data) + } + + info, err := fm.GetInfo(ctx, result.Code) + if err != nil { + t.Fatalf("GetInfo failed: %v", err) + } + if info.Filename != "test.txt" { + t.Errorf("expected filename test.txt, got %s", info.Filename) + } + if info.Size != size { + t.Errorf("expected size %d, got %d", size, info.Size) + } + if !info.Complete { + t.Error("expected complete to be true") + } + if info.SHA256 == "" { + t.Error("expected sha256 to be set") + } + + err = fm.Delete(ctx, result.Code) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + + _, err = fm.Get(ctx, result.Code) + if err == nil { + t.Error("expected error after delete") + } +} + +func TestLocalUploadLargeFile(t *testing.T) { + fm := New( + WithDir("/tmp/upkg-test-large"), + WithTimeout(30*time.Minute), + WithExpire(24*time.Hour), + ) + defer fm.CloseManager() + + ctx := context.Background() + + filePath := "../../x-file/VID_20240215_195046.mp4" + file, err := os.Open(filePath) + if err != nil { + t.Fatalf("open file failed: %v", err) + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + t.Fatalf("stat file failed: %v", err) + } + + result, err := fm.Create(ctx, "video.mp4", stat.Size(), "") + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + total, written, err := fm.Upload(ctx, result.Code, 0, stat.Size(), file) + if err != nil { + t.Fatalf("Upload failed: %v", err) + } + if total != stat.Size() { + t.Errorf("expected total %d, got %d", stat.Size(), total) + } + if written != stat.Size() { + t.Errorf("expected written %d, got %d", stat.Size(), written) + } + + data, err := fm.Get(ctx, result.Code) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if int64(len(data)) != stat.Size() { + t.Errorf("expected size %d, got %d", stat.Size(), len(data)) + } + + err = fm.Delete(ctx, result.Code) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } +} + +func TestLocalUploadWithSHA256(t *testing.T) { + fm := New( + WithDir("/tmp/upkg-test-sha256"), + WithTimeout(10*time.Minute), + WithExpire(24*time.Hour), + ) + defer fm.CloseManager() + + ctx := context.Background() + + content := []byte("hello world with sha256") + size := int64(len(content)) + reader := bytes.NewReader(content) + + sha256 := "54a1932f09c34bd2c29f4fb3c3abbf6b5f65afcadeabbf13a1c5165782f6b08b" + + result, err := fm.Create(ctx, "test-sha256.txt", size, sha256) + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + _, _, err = fm.Upload(ctx, result.Code, 0, size, reader) + if err != nil { + t.Fatalf("Upload failed: %v", err) + } + + data, err := fm.Get(ctx, result.Code) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if !bytes.Equal(data, content) { + t.Errorf("data mismatch") + } + + err = fm.Delete(ctx, result.Code) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } +} + +func TestLocalCloseAndReopen(t *testing.T) { + dir := "/tmp/upkg-test-reopen" + fm := New( + WithDir(dir), + WithTimeout(1*time.Minute), + WithExpire(24*time.Hour), + ) + + ctx := context.Background() + + content := []byte("test close and reopen") + size := int64(len(content)) + + result, err := fm.Create(ctx, "reopen.txt", size, "") + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + reader := bytes.NewReader(content) + _, _, err = fm.Upload(ctx, result.Code, 0, size, reader) + if err != nil { + t.Fatalf("Upload failed: %v", err) + } + + fm.CloseManager() + + fm2 := New( + WithDir(dir), + WithTimeout(1*time.Minute), + WithExpire(24*time.Hour), + ) + defer fm2.CloseManager() + + data, err := fm2.Get(ctx, result.Code) + if err != nil { + t.Fatalf("Get after reopen failed: %v", err) + } + if !bytes.Equal(data, content) { + t.Errorf("data mismatch after reopen") + } + + err = fm2.Delete(ctx, result.Code) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } +} diff --git a/controller/file_manager/new.go b/controller/file_manager/new.go new file mode 100644 index 0000000..a5d1121 --- /dev/null +++ b/controller/file_manager/new.go @@ -0,0 +1,84 @@ +package file_manager + +import ( + "time" +) + +type Option func(*option) +type option struct { + dir *string + timeout time.Duration + expire time.Duration + verifyHash bool + s3 *s3Config +} + +type s3Config struct { + endpoint string + accessKey string + secretKey string + bucket string + region string + usePathStyle bool +} + +func WithDir(dir string) Option { + return func(o *option) { + if dir != "" { + o.dir = &dir + } + } +} + +func WithTimeout(timeout time.Duration) Option { + return func(o *option) { + if timeout > 0 { + o.timeout = timeout + } + } +} + +func WithExpire(expire time.Duration) Option { + return func(o *option) { + if expire > 0 { + o.expire = expire + } + } +} + +func WithS3(endpoint, accessKey, secretKey, bucket, region string) Option { + return func(o *option) { + o.s3 = &s3Config{ + endpoint: endpoint, + accessKey: accessKey, + secretKey: secretKey, + bucket: bucket, + region: region, + } + } +} + +func WithS3PathStyle(usePathStyle bool) Option { + return func(o *option) { + if o.s3 != nil { + o.s3.usePathStyle = usePathStyle + } + } +} + +// New 创建文件管理器 +func New(opts ...Option) FileManager { + o := &option{ + timeout: 1 * time.Minute, + expire: 8 * time.Hour, + verifyHash: true, + } + for _, opt := range opts { + opt(o) + } + + if o.s3 != nil { + return NewS3FileManager(o) + } + return NewLocalFileManager(o) +} diff --git a/controller/file_manager/s3.go b/controller/file_manager/s3.go new file mode 100644 index 0000000..872ba48 --- /dev/null +++ b/controller/file_manager/s3.go @@ -0,0 +1,423 @@ +package file_manager + +import ( + "bytes" + "context" + "crypto/md5" + "crypto/rand" + "encoding/hex" + "fmt" + "io" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +const ( + s3MetadataKeyFilename = "x-amz-meta-filename" + s3MetadataKeySize = "x-amz-meta-size" + s3MetadataKeySha256 = "x-amz-meta-sha256" + s3MetadataKeyCreateTime = "x-amz-meta-create-time" + s3MetadataKeyComplete = "x-amz-meta-complete" + s3MetadataKeyCode = "x-amz-meta-code" +) + +type S3FileManager struct { + client *s3.Client + bucket string + timeout time.Duration + expire time.Duration + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex +} + +func NewS3FileManager(opts *option) *S3FileManager { + cfg := opts.s3 + + awsConfig := []func(*config.LoadOptions) error{} + + if cfg.region != "" { + awsConfig = append(awsConfig, config.WithRegion(cfg.region)) + } + + if cfg.endpoint != "" { + awsConfig = append(awsConfig, config.WithEndpointResolverWithOptions( + aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: cfg.endpoint, + SigningRegion: region, + }, nil + }), + )) + } + + if cfg.accessKey != "" && cfg.secretKey != "" { + awsConfig = append(awsConfig, config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider(cfg.accessKey, cfg.secretKey, ""), + )) + } + + loadedConfig, err := config.LoadDefaultConfig(context.Background(), awsConfig...) + if err != nil { + panic(fmt.Sprintf("failed to load aws config: %v", err)) + } + + client := s3.NewFromConfig(loadedConfig, func(o *s3.Options) { + o.UsePathStyle = cfg.usePathStyle + }) + + ctx, cancel := context.WithCancel(context.Background()) + + return &S3FileManager{ + client: client, + bucket: cfg.bucket, + timeout: opts.timeout, + expire: opts.expire, + ctx: ctx, + cancel: cancel, + } +} + +func (s *S3FileManager) startCleaner() { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.cleanup() + } + } +} + +func (s *S3FileManager) cleanup() { + now := time.Now() + + paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{ + Bucket: aws.String(s.bucket), + }) + + for paginator.HasMorePages() { + output, err := paginator.NextPage(context.Background()) + if err != nil { + return + } + + for _, obj := range output.Contents { + code := aws.ToString(obj.Key) + + headOutput, err := s.client.HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + }) + if err != nil { + continue + } + + createTimeStr := "" + if headOutput.Metadata != nil { + if val, ok := headOutput.Metadata[s3MetadataKeyCreateTime]; ok { + createTimeStr = val + } + } + if createTimeStr == "" { + continue + } + + createTime, err := time.Parse(time.RFC3339, createTimeStr) + if err != nil { + continue + } + + completeStr := "" + if headOutput.Metadata != nil { + if val, ok := headOutput.Metadata[s3MetadataKeyComplete]; ok { + completeStr = val + } + } + complete := completeStr == "true" + + var expireTime time.Duration + if complete { + expireTime = s.expire + } else { + expireTime = s.timeout + } + + if now.Sub(createTime) > expireTime { + s.deleteObject(code) + } + } + } +} + +func (s *S3FileManager) deleteObject(code string) { + s.client.DeleteObject(context.Background(), &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + }) +} + +func (s *S3FileManager) generateRandomCode(length int) (string, error) { + b := make([]byte, length/2) + if _, err := rand.Read(b); err != nil { + return "", err + } + return hex.EncodeToString(b), nil +} + +func (s *S3FileManager) CloseManager() { + s.cancel() +} + +func (s *S3FileManager) Create(ctx context.Context, filename string, size int64, sha256 string) (*CreateResult, error) { + code, err := s.generateRandomCode(16) + if err != nil { + return nil, err + } + + _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + Metadata: map[string]string{ + s3MetadataKeyFilename: filename, + s3MetadataKeySize: fmt.Sprintf("%d", size), + s3MetadataKeySha256: sha256, + s3MetadataKeyCreateTime: time.Now().Format(time.RFC3339), + s3MetadataKeyComplete: "false", + s3MetadataKeyCode: code, + }, + }) + if err != nil { + return nil, err + } + + return &CreateResult{ + Code: code, + SHA256: sha256, + }, nil +} + +func (s *S3FileManager) Upload(ctx context.Context, code string, start int64, end int64, reader io.Reader) (int64, int64, error) { + headOutput, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + }) + if err != nil { + return 0, 0, fmt.Errorf("file not found: %w", err) + } + + sizeStr := "" + if headOutput.Metadata != nil { + if val, ok := headOutput.Metadata[s3MetadataKeySize]; ok { + sizeStr = val + } + } + var expectedSize int64 + fmt.Sscanf(sizeStr, "%d", &expectedSize) + + data, err := io.ReadAll(reader) + if err != nil { + return 0, 0, err + } + + sha256Str := "" + if headOutput.Metadata != nil { + if val, ok := headOutput.Metadata[s3MetadataKeySha256]; ok { + sha256Str = val + } + } + + if sha256Str != "" && int64(len(data)) == expectedSize { + calculatedHash := md5Hash(data) + if calculatedHash != sha256Str { + s.deleteObject(code) + return 0, 0, NewHashMismatchError(sha256Str, calculatedHash) + } + } + + metadata := map[string]string{ + s3MetadataKeyFilename: "", + s3MetadataKeySize: "", + s3MetadataKeySha256: "", + s3MetadataKeyCreateTime: "", + s3MetadataKeyComplete: "true", + s3MetadataKeyCode: "", + } + if headOutput.Metadata != nil { + if val, ok := headOutput.Metadata[s3MetadataKeyFilename]; ok { + metadata[s3MetadataKeyFilename] = val + } + if val, ok := headOutput.Metadata[s3MetadataKeySize]; ok { + metadata[s3MetadataKeySize] = val + } + if val, ok := headOutput.Metadata[s3MetadataKeySha256]; ok { + metadata[s3MetadataKeySha256] = val + } + if val, ok := headOutput.Metadata[s3MetadataKeyCreateTime]; ok { + metadata[s3MetadataKeyCreateTime] = val + } + if val, ok := headOutput.Metadata[s3MetadataKeyCode]; ok { + metadata[s3MetadataKeyCode] = val + } + } + + if sha256Str == "" && int64(len(data)) == expectedSize { + metadata[s3MetadataKeySha256] = md5Hash(data) + } + + _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + Body: bytes.NewReader(data), + Metadata: metadata, + }) + if err != nil { + return 0, 0, err + } + + return int64(len(data)), int64(len(data)), nil +} + +func md5Hash(data []byte) string { + h := md5.New() + h.Write(data) + return hex.EncodeToString(h.Sum(nil)) +} + +func (s *S3FileManager) Get(ctx context.Context, code string) ([]byte, error) { + output, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + }) + if err != nil { + return nil, fmt.Errorf("file not found: %w", err) + } + defer output.Body.Close() + + sha256Str := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeySha256]; ok { + sha256Str = val + } + } + + completeStr := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeyComplete]; ok { + completeStr = val + } + } + + if completeStr != "true" { + sizeStr := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeySize]; ok { + sizeStr = val + } + } + var expectedSize int64 + fmt.Sscanf(sizeStr, "%d", &expectedSize) + + currentSize := aws.ToInt64(output.ContentLength) + if currentSize != expectedSize { + return nil, NewFileNotReadyError(code, currentSize, expectedSize) + } + } + + if sha256Str != "" { + data, err := io.ReadAll(output.Body) + if err != nil { + return nil, err + } + + calculatedHash := md5Hash(data) + if calculatedHash != sha256Str { + s.deleteObject(code) + return nil, NewHashMismatchError(sha256Str, calculatedHash) + } + + return data, nil + } + + return io.ReadAll(output.Body) +} + +func (s *S3FileManager) GetInfo(ctx context.Context, code string) (*FileInfo, error) { + output, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + }) + if err != nil { + return nil, fmt.Errorf("file not found: %w", err) + } + + sizeStr := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeySize]; ok { + sizeStr = val + } + } + var size int64 + fmt.Sscanf(sizeStr, "%d", &size) + + createTimeStr := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeyCreateTime]; ok { + createTimeStr = val + } + } + var createTime time.Time + if createTimeStr != "" { + createTime, _ = time.Parse(time.RFC3339, createTimeStr) + } + + completeStr := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeyComplete]; ok { + completeStr = val + } + } + + filename := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeyFilename]; ok { + filename = val + } + } + + sha256Val := "" + if output.Metadata != nil { + if val, ok := output.Metadata[s3MetadataKeySha256]; ok { + sha256Val = val + } + } + + return &FileInfo{ + Filename: filename, + Size: size, + SHA256: sha256Val, + Path: code, + CreateTime: createTime, + Complete: completeStr == "true", + }, nil +} + +func (s *S3FileManager) Delete(ctx context.Context, code string) error { + _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(code), + }) + return err +} + +func (s *S3FileManager) Close(code string) error { + return nil +} diff --git a/controller/file_manager/s3_test.go b/controller/file_manager/s3_test.go new file mode 100644 index 0000000..53d5278 --- /dev/null +++ b/controller/file_manager/s3_test.go @@ -0,0 +1,141 @@ +package file_manager + +import ( + "bytes" + "context" + "os" + "testing" + "time" +) + +func TestS3UploadDownload(t *testing.T) { + fm := New( + WithS3( + "http://localhost:9000", + "INsEaUOyLx5PXiqLHv0v", + "xsie9sil2jFtyi0UlnXv7zFafHYk0rWAeROSEYg7", + "test", + "us-east-1", + ), + WithS3PathStyle(true), + WithTimeout(10*time.Minute), + WithExpire(24*time.Hour), + ) + defer fm.CloseManager() + + ctx := context.Background() + + content := []byte("hello world from s3") + size := int64(len(content)) + reader := bytes.NewReader(content) + + result, err := fm.Create(ctx, "test.txt", size, "") + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + total, written, err := fm.Upload(ctx, result.Code, 0, size, reader) + if err != nil { + t.Fatalf("Upload failed: %v", err) + } + if total != size { + t.Errorf("expected total %d, got %d", size, total) + } + if written != size { + t.Errorf("expected written %d, got %d", size, written) + } + + data, err := fm.Get(ctx, result.Code) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if !bytes.Equal(data, content) { + t.Errorf("data mismatch, expected %s, got %s", content, data) + } + + info, err := fm.GetInfo(ctx, result.Code) + if err != nil { + t.Fatalf("GetInfo failed: %v", err) + } + if info.Filename != "test.txt" { + t.Errorf("expected filename test.txt, got %s", info.Filename) + } + if info.Size != size { + t.Errorf("expected size %d, got %d", size, info.Size) + } + if !info.Complete { + t.Error("expected complete to be true") + } + if info.SHA256 == "" { + t.Error("expected sha256 to be set") + } + + err = fm.Delete(ctx, result.Code) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + + _, err = fm.Get(ctx, result.Code) + if err == nil { + t.Error("expected error after delete") + } +} + +func TestS3UploadLargeFile(t *testing.T) { + fm := New( + WithS3( + "http://localhost:9000", + "INsEaUOyLx5PXiqLHv0v", + "xsie9sil2jFtyi0UlnXv7zFafHYk0rWAeROSEYg7", + "test", + "us-east-1", + ), + WithS3PathStyle(true), + WithTimeout(30*time.Minute), + WithExpire(24*time.Hour), + ) + defer fm.CloseManager() + + ctx := context.Background() + + filePath := "../../x-file/VID_20240215_195046.mp4" + file, err := os.Open(filePath) + if err != nil { + t.Fatalf("open file failed: %v", err) + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + t.Fatalf("stat file failed: %v", err) + } + + result, err := fm.Create(ctx, "video.mp4", stat.Size(), "") + if err != nil { + t.Fatalf("Create failed: %v", err) + } + + total, written, err := fm.Upload(ctx, result.Code, 0, stat.Size(), file) + if err != nil { + t.Fatalf("Upload failed: %v", err) + } + if total != stat.Size() { + t.Errorf("expected total %d, got %d", stat.Size(), total) + } + if written != stat.Size() { + t.Errorf("expected written %d, got %d", stat.Size(), written) + } + + data, err := fm.Get(ctx, result.Code) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + if int64(len(data)) != stat.Size() { + t.Errorf("expected size %d, got %d", stat.Size(), len(data)) + } + + err = fm.Delete(ctx, result.Code) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e25a7d8 --- /dev/null +++ b/go.mod @@ -0,0 +1,27 @@ +module gitea.loveuer.com/loveuer/upkg + +go 1.25.2 + +require ( + github.com/aws/aws-sdk-go-v2 v1.32.2 + github.com/aws/aws-sdk-go-v2/config v1.28.0 + github.com/aws/aws-sdk-go-v2/credentials v1.17.41 + github.com/aws/aws-sdk-go-v2/service/s3 v1.60.0 +) + +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect + github.com/aws/smithy-go v1.22.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3ee5491 --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +github.com/aws/aws-sdk-go-v2 v1.32.2 h1:AkNLZEyYMLnx/Q/mSKkcMqwNFXMAvFto9bNsHqcTduI= +github.com/aws/aws-sdk-go-v2 v1.32.2/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= +github.com/aws/aws-sdk-go-v2/config v1.28.0 h1:FosVYWcqEtWNxHn8gB/Vs6jOlNwSoyOCA/g/sxyySOQ= +github.com/aws/aws-sdk-go-v2/config v1.28.0/go.mod h1:pYhbtvg1siOOg8h5an77rXle9tVG8T+BWLWAo7cOukc= +github.com/aws/aws-sdk-go-v2/credentials v1.17.41 h1:7gXo+Axmp+R4Z+AK8YFQO0ZV3L0gizGINCOWxSLY9W8= +github.com/aws/aws-sdk-go-v2/credentials v1.17.41/go.mod h1:u4Eb8d3394YLubphT4jLEwN1rLNq2wFOlT6OuxFwPzU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 h1:TMH3f/SCAWdNtXXVPPu5D6wrr4G5hI1rAxbcocKfC7Q= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17/go.mod h1:1ZRXLdTpzdJb9fwTMXiLipENRxkGMTn1sfKexGllQCw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 h1:UAsR3xA31QGf79WzpG/ixT9FZvQlh5HY1NRqSHBNOCk= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21/go.mod h1:JNr43NFf5L9YaG3eKTm7HQzls9J+A9YYcGI5Quh1r2Y= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 h1:6jZVETqmYCadGFvrYEQfC5fAQmlo80CeL5psbno6r0s= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21/go.mod h1:1SR0GbLlnN3QUmYaflZNiH1ql+1qrSiB2vwcJ+4UM60= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 h1:mimdLQkIX1zr8GIPY1ZtALdBQGxcASiBd2MOp8m/dMc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16/go.mod h1:YHk6owoSwrIsok+cAH9PENCOGoH5PU2EllX4vLtSrsY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 h1:TToQNkvGguu209puTojY/ozlqy2d/SFNcoLIqTFi42g= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0/go.mod h1:0jp+ltwkf+SwG2fm/PKo8t4y8pJSgOCO4D8Lz3k0aHQ= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 h1:GckUnpm4EJOAio1c8o25a+b3lVfwVzC9gnSBqiiNmZM= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18/go.mod h1:Br6+bxfG33Dk3ynmkhsW2Z/t9D4+lRqdLDNCKi85w0U= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 h1:s7NA1SOw8q/5c0wr8477yOPp0z+uBaXBnLE0XYb0POA= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2/go.mod h1:fnjjWyAW/Pj5HYOxl9LJqWtEwS7W2qgcRLWP+uWbss0= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.0 h1:2QXGJvG19QwqXUvgcdoCOZPyLuvZf8LiXPCN4P53TdI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.0/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 h1:bSYXVyUzoTHoKalBmwaZxs97HU9DWWI3ehHSAMa7xOk= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.2/go.mod h1:skMqY7JElusiOUjMJMOv1jJsP7YUg7DrhgqZZWuzu1U= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 h1:AhmO1fHINP9vFYUE0LHzCWg/LfUWUF+zFPEcY9QXb7o= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2/go.mod h1:o8aQygT2+MVP0NaV6kbdE1YnnIM8RRVQzoeUH45GOdI= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ4CtM1Ll0XavNuVo= +github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=