package uploads import ( "context" "errors" "fmt" "io" "net/http" "os" "path" "sync" "time" "nf-repo/internal/interfaces" "nf-repo/internal/model" "nf-repo/internal/tool/rerr" "nf-repo/internal/verify" "github.com/loveuer/nf/nft/log" ) type localUploader struct { lock sync.Mutex basedir string sizeMap map[string]int } func (l *localUploader) UploadId() string { id := fmt.Sprintf("%d", time.Now().UnixNano()) l.lock.Lock() l.sizeMap[id] = 0 l.lock.Unlock() return id } func (l *localUploader) Write(ctx context.Context, id string, reader io.ReadCloser, start, end int) (int, *rerr.RepositoryError) { var ( err error filename = path.Join(l.basedir, id) f *os.File ok bool flag = os.O_CREATE | os.O_RDWR | os.O_TRUNC copied int64 ) if _, ok = l.sizeMap[id]; !ok { return 0, &rerr.RepositoryError{ Status: http.StatusRequestedRangeNotSatisfiable, Code: "BLOB_UPLOAD_UNKNOWN", Message: "Your content range doesn't match what we have", } } if start > 0 { flag = os.O_APPEND | os.O_RDWR } if f, err = os.OpenFile(filename, flag, 0o644); err != nil { return 0, rerr.ErrInternal(err) } if copied, err = io.Copy(f, reader); err != nil { return 0, rerr.ErrInternal(err) } reader.Close() l.lock.Lock() l.sizeMap[id] += int(copied) l.lock.Unlock() return l.sizeMap[id], nil } func (l *localUploader) Done(ctx context.Context, bh interfaces.BlobHandler, id string, reader io.ReadCloser, contentLength int, repo string, hash model.Hash) *rerr.RepositoryError { size := verify.SizeUnknown if contentLength > 0 { size = l.sizeMap[id] + contentLength } var ( err error f *os.File filename = path.Join(l.basedir, id) ) if f, err = os.OpenFile(filename, os.O_RDONLY, 0o644); err != nil { return rerr.ErrInternal(err) } in := io.NopCloser(io.MultiReader(f, reader)) vrc, err := verify.ReadCloser(in, int64(size), hash) if err != nil { return rerr.ErrInternal(err) } defer vrc.Close() log.Error("localUploader: upload done, id = %s, size = %d", id, size) if err := bh.Put(ctx, repo, hash, vrc); err != nil { if errors.As(err, &verify.Error{}) { log.Debug("localUploader: blob handler put err, repo = %s, hash = %s, err = %s", repo, hash.String(), fmt.Sprintf("Digest mismatch: %v", err)) return rerr.ErrDigestMismatch } return rerr.ErrInternal(err) } delete(l.sizeMap, id) f.Close() if err = os.Remove(filename); err != nil { log.Warn("localUploader: os remove err, filename = %s, err = %s", filename, err.Error()) } return nil } func NewLocalUploader(basedir string) interfaces.UploadHandler { var err error if err = os.MkdirAll(basedir, 0o755); err != nil { log.Panic("NewLocalUploader: os MkdirAll err, basedir = %s, err = %s", basedir, err.Error()) } return &localUploader{lock: sync.Mutex{}, basedir: basedir, sizeMap: make(map[string]int)} }