feat: mem, local uploader

This commit is contained in:
loveuer
2024-04-10 22:10:09 +08:00
commit c5d0b8e45b
40 changed files with 8261 additions and 0 deletions

View File

@ -0,0 +1,14 @@
package interfaces
import (
"context"
"io"
"nf-repo/internal/model"
)
type BlobHandler interface {
Get(ctx context.Context, repo string, hash model.Hash) (io.ReadCloser, error)
Stat(ctx context.Context, repo string, hash model.Hash) (int64, error)
Put(ctx context.Context, repo string, hash model.Hash, rc io.ReadCloser) error
Delete(ctx context.Context, repo string, hash model.Hash) error
}

View File

@ -0,0 +1,113 @@
package blobs
import (
"context"
"errors"
"io"
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/opt"
"os"
"path"
"sync"
)
type localHandler struct {
base string
sync.Mutex
}
func (l *localHandler) path(hash model.Hash) string {
//return path.Join(l.base, hash.Hex)
dir := path.Join(l.base, hash.Hex[:2], hash.Hex[2:4])
_ = os.MkdirAll(dir, 0755)
return path.Join(dir, hash.Hex)
}
func (l *localHandler) Get(ctx context.Context, repo string, hash model.Hash) (io.ReadCloser, error) {
var (
err error
f *os.File
)
l.Lock()
defer l.Unlock()
if f, err = os.Open(l.path(hash)); err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, opt.ErrNotFound
}
return nil, err
}
return f, nil
}
func (l *localHandler) Stat(ctx context.Context, repo string, hash model.Hash) (int64, error) {
var (
err error
info os.FileInfo
)
l.Lock()
defer l.Unlock()
if info, err = os.Stat(l.path(hash)); err != nil {
if errors.Is(err, os.ErrNotExist) {
return 0, opt.ErrNotFound
}
return 0, err
}
return info.Size(), nil
}
func (l *localHandler) Put(ctx context.Context, repo string, hash model.Hash, rc io.ReadCloser) error {
var (
err error
f *os.File
)
l.Lock()
defer l.Unlock()
if f, err = os.OpenFile(l.path(hash), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644); err != nil {
return err
}
if _, err = io.Copy(f, rc); err != nil {
return err
}
return nil
}
func (l *localHandler) Delete(ctx context.Context, repo string, hash model.Hash) error {
var (
err error
info os.FileInfo
filename = l.path(hash)
)
l.Lock()
defer l.Unlock()
if info, err = os.Stat(filename); err != nil {
if errors.Is(err, os.ErrNotExist) {
return opt.ErrNotFound
}
return err
}
_ = info
return os.Remove(filename)
}
func NewLocalBlobHandler(baseDir string) interfaces.BlobHandler {
_ = os.MkdirAll(baseDir, 0755)
return &localHandler{base: baseDir}
}

View File

@ -0,0 +1,77 @@
package blobs
import (
"bytes"
"context"
"io"
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/opt"
"sync"
)
type bytesCloser struct {
*bytes.Reader
}
func (r *bytesCloser) Close() error {
return nil
}
type memHandler struct {
m map[string][]byte
lock sync.Mutex
}
func NewMemBlobHandler() interfaces.BlobHandler {
return &memHandler{
m: map[string][]byte{},
}
}
func (m *memHandler) Stat(_ context.Context, _ string, h model.Hash) (int64, error) {
m.lock.Lock()
defer m.lock.Unlock()
bs, found := m.m[h.String()]
if !found {
return 0, opt.ErrNotFound
}
return int64(len(bs)), nil
}
func (m *memHandler) Get(_ context.Context, _ string, h model.Hash) (io.ReadCloser, error) {
m.lock.Lock()
defer m.lock.Unlock()
bs, found := m.m[h.String()]
if !found {
return nil, opt.ErrNotFound
}
return &bytesCloser{bytes.NewReader(bs)}, nil
}
func (m *memHandler) Put(_ context.Context, _ string, h model.Hash, rc io.ReadCloser) error {
m.lock.Lock()
defer m.lock.Unlock()
defer rc.Close()
all, err := io.ReadAll(rc)
if err != nil {
return err
}
m.m[h.String()] = all
return nil
}
func (m *memHandler) Delete(_ context.Context, _ string, h model.Hash) error {
m.lock.Lock()
defer m.lock.Unlock()
if _, found := m.m[h.String()]; !found {
return opt.ErrNotFound
}
delete(m.m, h.String())
return nil
}

10
internal/interfaces/db.go Normal file
View File

@ -0,0 +1,10 @@
package interfaces
import (
"context"
"gorm.io/gorm"
)
type Database interface {
TX(ctx context.Context) *gorm.DB
}

View File

@ -0,0 +1,18 @@
package interfaces
import (
"context"
"io"
"nf-repo/internal/model"
"nf-repo/internal/util/rerr"
)
type ManifestHandler interface {
Get(ctx context.Context, repo string, target string) (io.ReadCloser, string, *rerr.RepositoryError)
Put(ctx context.Context, repo string, target string, digest string, mf *model.Manifest) *rerr.RepositoryError
Delete(ctx context.Context, repo string, target string) *rerr.RepositoryError
Catelog(ctx context.Context, limit int, last int) (*model.Catalog, *rerr.RepositoryError)
Tags(ctx context.Context, repo string, limit, last int) (*model.Tag, *rerr.RepositoryError)
Referrers(ctx context.Context, repo string, target string) (*model.IndexManifest, *rerr.RepositoryError)
}

View File

@ -0,0 +1,234 @@
package manifests
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"io"
"net/http"
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/util/rerr"
"nf-repo/internal/util/tools"
)
type PackageManifest struct {
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"`
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"`
Repo string `json:"repo" gorm:"uniqueIndex:repo_tag_idx;column:repo"`
Target string `json:"target" gorm:"uniqueIndex:repo_tag_idx;column:target"`
Digest string `json:"digest" gorm:"unique;column:digest"`
ContentType string `json:"content_type" gorm:"column:content_type"`
Content []byte `json:"content" gorm:"column:content;type:bytes"`
}
type dbManifests struct {
db interfaces.Database
}
func (m *dbManifests) Get(ctx context.Context, repo string, target string) (io.ReadCloser, string, *rerr.RepositoryError) {
var (
err error
pm = new(PackageManifest)
h model.Hash
tx = m.db.TX(tools.Timeout(5)).Model(pm)
)
if h, err = model.NewHash(target); err == nil {
tx = tx.Where("digest", h.String())
} else {
tx = tx.Where("repo", repo).
Where("target", target)
}
if err = tx.
Take(pm).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, "", &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: fmt.Sprintf("Unknown name: %s@%s", repo, target),
}
}
return nil, "", rerr.ErrInternal(err)
}
return io.NopCloser(bytes.NewReader(pm.Content)), pm.ContentType, nil
}
func (m *dbManifests) Put(ctx context.Context, repo string, target string, digest string, mf *model.Manifest) *rerr.RepositoryError {
var (
err error
pm = &PackageManifest{
Repo: repo,
Target: target,
Digest: digest,
ContentType: mf.ContentType,
Content: mf.Blob,
}
)
// todo on conflict
if err = m.db.TX(tools.Timeout(5)).Create(pm).Error; err == nil {
return nil
}
logrus.
WithField("path", "dbManifests.Put.Create").
WithField("err", err.Error()).
Trace()
if err = m.db.TX(tools.Timeout(5)).Model(&PackageManifest{}).
Where("(repo = ? AND target = ?) OR (digest = ?)", repo, target, digest).
Updates(map[string]any{
"repo": repo,
"target": target,
"digest": digest,
"content_type": mf.ContentType,
"content": mf.Blob,
}).
Error; err != nil {
logrus.
WithField("path", "dbManifests.Put.Updates").
WithField("err", err.Error()).
Debug()
return rerr.ErrInternal(err)
}
return nil
}
func (m *dbManifests) Delete(ctx context.Context, repo string, target string) *rerr.RepositoryError {
var (
err error
)
if err = m.db.TX(tools.Timeout(5)).
Where("repo", repo).
Where("target", target).
Delete(&PackageManifest{}).
Error; err != nil {
return rerr.ErrInternal(err)
}
return nil
}
func (m *dbManifests) Catelog(ctx context.Context, limit int, last int) (*model.Catalog, *rerr.RepositoryError) {
var (
err error
list = make([]*PackageManifest, 0)
)
if err = m.db.TX(tools.Timeout(5)).Model(&PackageManifest{}).
Order("updated_at").
Offset(last).
Limit(limit).
Find(&list).
Error; err != nil {
return nil, rerr.ErrInternal(err)
}
return &model.Catalog{
Repos: lo.Map(list, func(item *PackageManifest, index int) string {
return item.Repo
}),
}, nil
}
func (m *dbManifests) Tags(ctx context.Context, repo string, limit, last int) (*model.Tag, *rerr.RepositoryError) {
var (
err error
list = make([]*PackageManifest, 0)
)
if err = m.db.TX(tools.Timeout(5)).Model(&PackageManifest{}).
Where("repo", repo).
Order("updated_at").
Offset(last).
Limit(limit).
Find(&list).
Error; err != nil {
return nil, rerr.ErrInternal(err)
}
return &model.Tag{
Name: repo,
Tags: lo.Map(list, func(item *PackageManifest, index int) string {
return item.Target
}),
}, nil
}
func (m *dbManifests) Referrers(ctx context.Context, repo string, target string) (*model.IndexManifest, *rerr.RepositoryError) {
var (
err error
pm = new(PackageManifest)
manifest = &model.IndexManifest{}
tx = m.db.TX(tools.Timeout(5)).Model(pm)
)
h, err := model.NewHash(target)
if err != nil {
tx = tx.Where("repo", repo).Where("digest", h.String())
} else {
tx = tx.Where("repo", repo).Where("target", target)
}
if err = m.db.TX(tools.Timeout(5)).Model(pm).
Take(pm).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: fmt.Sprintf("Unknown name: %s@%s", repo, target),
}
}
logrus.
WithField("path", "dbManifests.Referrers.Take").
WithField("repo", repo).
WithField("target", target).
WithField("err", err.Error()).
Debug()
return nil, rerr.ErrInternal(err)
}
if err = json.Unmarshal(pm.Content, manifest); err != nil {
logrus.
WithField("path", "dbManifests.Referrers.Unmarshal").
WithField("repo", repo).
WithField("target", target).
WithField("err", err.Error()).
Debug()
return nil, rerr.ErrInternal(err)
}
return manifest, nil
}
func NewManifestDBHandler(tx interfaces.Database) interfaces.ManifestHandler {
var (
err error
)
if err = tx.TX(tools.Timeout(5)).AutoMigrate(&PackageManifest{}); err != nil {
logrus.
WithField("path", "NewManifestDBHandler").
WithField("method", "AutoMigrate").
Panic(err)
}
return &dbManifests{db: tx}
}

View File

@ -0,0 +1,278 @@
package manifests
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
"io"
"net/http"
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/model/types"
"nf-repo/internal/util/rerr"
"sort"
"strings"
"sync"
)
type memManifest struct {
sync.RWMutex
m map[string]map[string]*model.Manifest
}
func (m *memManifest) Referrers(ctx context.Context, repo string, target string) (*model.IndexManifest, *rerr.RepositoryError) {
m.RLock()
defer m.RUnlock()
digestToManifestMap, repoExists := m.m[repo]
if !repoExists {
return nil, &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: "Unknown name",
}
}
im := &model.IndexManifest{
SchemaVersion: 2,
MediaType: types.OCIImageIndex,
Manifests: []model.Descriptor{},
}
for digest, manifest := range digestToManifestMap {
h, err := model.NewHash(digest)
if err != nil {
continue
}
var refPointer struct {
Subject *model.Descriptor `json:"subject"`
}
json.Unmarshal(manifest.Blob, &refPointer)
if refPointer.Subject == nil {
continue
}
referenceDigest := refPointer.Subject.Digest
if referenceDigest.String() != target {
continue
}
// At this point, we know the current digest references the target
var imageAsArtifact struct {
Config struct {
MediaType string `json:"mediaType"`
} `json:"config"`
}
json.Unmarshal(manifest.Blob, &imageAsArtifact)
im.Manifests = append(im.Manifests, model.Descriptor{
MediaType: types.MediaType(manifest.ContentType),
Size: int64(len(manifest.Blob)),
Digest: h,
ArtifactType: imageAsArtifact.Config.MediaType,
})
}
return im, nil
}
func (m *memManifest) Tags(ctx context.Context, repo string, limit int, last int) (*model.Tag, *rerr.RepositoryError) {
m.RLock()
defer m.RUnlock()
c, ok := m.m[repo]
if !ok {
return nil, &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: "Unknown name",
}
}
var tags []string
for tag := range c {
if !strings.Contains(tag, "sha256:") {
tags = append(tags, tag)
}
}
sort.Strings(tags)
// https://github.com/opencontainers/distribution-spec/blob/b505e9cc53ec499edbd9c1be32298388921bb705/detail.md#tags-paginated
// Offset using last query parameter.
//if last := ctx.Query("last"); last != "" {
// for i, t := range tags {
// if t > last {
// tags = tags[i:]
// break
// }
// }
//}
//
//// Limit using n query parameter.
//if ns := ctx.Query("n"); ns != "" {
// if n, err := strconv.Atoi(ns); err != nil {
// return rerr.Error(ctx, &rerr.RepositoryError{
// Status: http.StatusBadRequest,
// Code: "BAD_REQUEST",
// Message: fmt.Sprintf("parsing n: %v", err),
// })
// } else if n < len(tags) {
// tags = tags[:n]
// }
//}
tagsToList := &model.Tag{
Name: repo,
Tags: tags,
}
return tagsToList, nil
}
func (m *memManifest) Catelog(ctx context.Context, limit, last int) (*model.Catalog, *rerr.RepositoryError) {
m.RLock()
defer m.RUnlock()
var repos []string
countRepos := 0
// TODO: implement pagination
for key := range m.m {
if countRepos >= limit {
break
}
countRepos++
repos = append(repos, key)
}
repositoriesToList := &model.Catalog{
Repos: repos,
}
return repositoriesToList, nil
}
func (m *memManifest) Put(ctx context.Context, repo string, target string, digest string, mf *model.Manifest) *rerr.RepositoryError {
// If the manifest
// list's constituent manifests are already uploaded.
// This isn't strictly required by the registry API, but some
// registries require this.
if types.MediaType(mf.ContentType).IsIndex() {
if err := func() *rerr.RepositoryError {
m.RLock()
defer m.RUnlock()
im, err := model.ParseIndexManifest(bytes.NewReader(mf.Blob))
if err != nil {
return &rerr.RepositoryError{
Status: http.StatusBadRequest,
Code: "MANIFEST_INVALID",
Message: err.Error(),
}
}
for _, desc := range im.Manifests {
if !desc.MediaType.IsDistributable() {
continue
}
if desc.MediaType.IsIndex() || desc.MediaType.IsImage() {
if _, found := m.m[repo][desc.Digest.String()]; !found {
return &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "MANIFEST_UNKNOWN",
Message: fmt.Sprintf("Sub-manifest %q not found", desc.Digest),
}
}
} else {
// TODO: Probably want to do an existence check for blobs.
logrus.Warnf("TODO: Check blobs for %q", desc.Digest)
}
}
return nil
}(); err != nil {
return err
}
}
m.Lock()
defer m.Unlock()
if _, ok := m.m[repo]; !ok {
m.m[repo] = make(map[string]*model.Manifest, 2)
}
// Allow future references by target (tag) and immutable digest.
// See https://docs.docker.com/engine/reference/commandline/pull/#pull-an-image-by-digest-immutable-identifier.
m.m[repo][digest] = mf
m.m[repo][target] = mf
return nil
}
func (m *memManifest) Delete(ctx context.Context, repo string, target string) *rerr.RepositoryError {
m.Lock()
defer m.Unlock()
if _, ok := m.m[repo]; !ok {
return &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: "Unknown name",
}
}
_, ok := m.m[repo][target]
if !ok {
return &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "MANIFEST_UNKNOWN",
Message: "Unknown manifest",
}
}
delete(m.m[repo], target)
if len(m.m[repo]) == 0 {
delete(m.m, repo)
}
return nil
}
func (m *memManifest) Get(ctx context.Context, repo string, target string) (io.ReadCloser, string, *rerr.RepositoryError) {
m.RLock()
defer m.RUnlock()
c, ok := m.m[repo]
if !ok {
return nil, "", &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: "Unknown name",
}
}
f, ok := c[target]
if !ok {
return nil, "", &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "MANIFEST_UNKNOWN",
Message: "Unknown manifest",
}
}
reader := io.NopCloser(bytes.NewReader(f.Blob))
return reader, f.ContentType, nil
}
func NewManifestMemHandler() interfaces.ManifestHandler {
return &memManifest{m: make(map[string]map[string]*model.Manifest)}
}

View File

@ -0,0 +1,44 @@
package tx
import (
"context"
"github.com/glebarez/sqlite"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"nf-repo/internal/interfaces"
)
type tx struct {
db *gorm.DB
}
func (t *tx) TX(ctx context.Context) *gorm.DB {
return t.db.Session(&gorm.Session{}).WithContext(ctx)
}
func newTX(db *gorm.DB) interfaces.Database {
return &tx{db: db}
}
func Must(database interfaces.Database, err error) interfaces.Database {
if err != nil {
logrus.
WithField("path", "tx.Must").
WithField("err", err.Error()).
Panic()
}
if database == nil {
logrus.
WithField("path", "tx.Must").
WithField("err", "database is nil").
Panic()
}
return database
}
func NewSqliteTX(filepath string) (interfaces.Database, error) {
db, err := gorm.Open(sqlite.Open(filepath), &gorm.Config{})
return newTX(db), err
}

View File

@ -0,0 +1,14 @@
package interfaces
import (
"context"
"io"
"nf-repo/internal/model"
"nf-repo/internal/util/rerr"
)
type UploadHandler interface {
UploadId() string
Write(ctx context.Context, id string, reader io.ReadCloser, start, end int) (int, *rerr.RepositoryError)
Done(ctx context.Context, bh BlobHandler, id string, closer io.ReadCloser, contentLength int, repo string, hash model.Hash) *rerr.RepositoryError
}

View File

@ -0,0 +1,139 @@
package uploads
import (
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"io"
"net/http"
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/util/rerr"
"nf-repo/internal/verify"
"os"
"path"
"sync"
"time"
)
type localUploader struct {
lock sync.Mutex
basedir string
sizeMap map[string]int
}
func (l *localUploader) UploadId() string {
id := fmt.Sprintf("%d", time.Now().UnixNano())
l.lock.Lock()
l.sizeMap[id] = 0
l.lock.Unlock()
return id
}
func (l *localUploader) Write(ctx context.Context, id string, reader io.ReadCloser, start, end int) (int, *rerr.RepositoryError) {
var (
err error
filename = path.Join(l.basedir, id)
f *os.File
ok bool
flag = os.O_CREATE | os.O_RDWR | os.O_TRUNC
copied int64
)
if _, ok = l.sizeMap[id]; !ok {
return 0, &rerr.RepositoryError{
Status: http.StatusRequestedRangeNotSatisfiable,
Code: "BLOB_UPLOAD_UNKNOWN",
Message: "Your content range doesn't match what we have",
}
}
if start > 0 {
flag = os.O_APPEND | os.O_RDWR
}
if f, err = os.OpenFile(filename, flag, 0644); err != nil {
return 0, rerr.ErrInternal(err)
}
if copied, err = io.Copy(f, reader); err != nil {
return 0, rerr.ErrInternal(err)
}
reader.Close()
l.lock.Lock()
l.sizeMap[id] += int(copied)
l.lock.Unlock()
return l.sizeMap[id], nil
}
func (l *localUploader) Done(ctx context.Context, bh interfaces.BlobHandler, id string, reader io.ReadCloser, contentLength int, repo string, hash model.Hash) *rerr.RepositoryError {
size := verify.SizeUnknown
if contentLength > 0 {
size = l.sizeMap[id] + contentLength
}
var (
err error
f *os.File
filename = path.Join(l.basedir, id)
)
if f, err = os.OpenFile(filename, os.O_RDONLY, 0644); err != nil {
return rerr.ErrInternal(err)
}
in := io.NopCloser(io.MultiReader(f, reader))
vrc, err := verify.ReadCloser(in, int64(size), hash)
if err != nil {
return rerr.ErrInternal(err)
}
defer vrc.Close()
if err := bh.Put(ctx, repo, hash, vrc); err != nil {
if errors.As(err, &verify.Error{}) {
logrus.
WithField("path", "handleBlobs.Put").
WithField("repo", repo).
WithField("hash", hash.String()).
WithField("err", fmt.Sprintf("Digest mismatch: %v", err)).Debug()
return rerr.ErrDigestMismatch
}
return rerr.ErrInternal(err)
}
delete(l.sizeMap, id)
f.Close()
if err = os.Remove(filename); err != nil {
logrus.
WithField("path", "localUploader.Done.Remove").
WithField("filename", filename).
WithField("err", err.Error()).
Warn()
}
return nil
}
func NewLocalUploader(basedir string) interfaces.UploadHandler {
var (
err error
)
if err = os.MkdirAll(basedir, 0755); err != nil {
logrus.
WithField("path", "uploads.localUploader.NewLocalUploader").
WithField("basedir", basedir).
WithField("err", err.Error()).
Panic()
}
return &localUploader{lock: sync.Mutex{}, basedir: basedir, sizeMap: make(map[string]int)}
}

View File

@ -0,0 +1,101 @@
package uploads
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"io"
"net/http"
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/util/rerr"
"nf-repo/internal/verify"
"strconv"
"sync"
"time"
)
type memUploader struct {
lock sync.Mutex
uploads map[string][]byte
}
func (m *memUploader) UploadId() string {
m.lock.Lock()
m.lock.Unlock()
id := strconv.Itoa(int(time.Now().UnixNano()))
m.uploads[id] = []byte{}
return id
}
func (m *memUploader) Write(ctx context.Context, id string, reader io.ReadCloser, start, end int) (int, *rerr.RepositoryError) {
m.lock.Lock()
defer m.lock.Unlock()
if start != len(m.uploads[id]) {
return 0, &rerr.RepositoryError{
Status: http.StatusRequestedRangeNotSatisfiable,
Code: "BLOB_UPLOAD_UNKNOWN",
Message: "Your content range doesn't match what we have",
}
}
l := bytes.NewBuffer(m.uploads[id])
size, err := io.Copy(l, reader)
if err != nil {
return 0, rerr.ErrInternal(err)
}
_ = size
m.uploads[id] = l.Bytes()
return len(m.uploads[id]), nil
}
func (m *memUploader) Done(ctx context.Context, bh interfaces.BlobHandler, id string, reader io.ReadCloser, contentLength int, repo string, hash model.Hash) *rerr.RepositoryError {
size := verify.SizeUnknown
if contentLength > 0 {
size = len(m.uploads[id]) + contentLength
}
m.lock.Lock()
defer m.lock.Unlock()
in := io.NopCloser(io.MultiReader(bytes.NewBuffer(m.uploads[id]), reader))
vrc, err := verify.ReadCloser(in, int64(size), hash)
if err != nil {
return rerr.ErrInternal(err)
}
defer vrc.Close()
if err := bh.Put(ctx, repo, hash, vrc); err != nil {
if errors.As(err, &verify.Error{}) {
logrus.
WithField("path", "handleBlobs.Put").
WithField("repo", repo).
WithField("hash", hash.String()).
WithField("err", fmt.Sprintf("Digest mismatch: %v", err)).Debug()
return rerr.ErrDigestMismatch
}
return rerr.ErrInternal(err)
}
m.uploads[id] = nil
delete(m.uploads, id)
return nil
}
func NewMemUploader() interfaces.UploadHandler {
return &memUploader{
lock: sync.Mutex{},
uploads: make(map[string][]byte),
}
}