feat: proxy download image

This commit is contained in:
loveuer
2024-04-15 18:02:54 +08:00
parent c5d0b8e45b
commit 410a4c0d8d
57 changed files with 10913 additions and 316 deletions

View File

@ -7,6 +7,7 @@ import (
"nf-repo/internal/interfaces"
"nf-repo/internal/model"
"nf-repo/internal/opt"
"nf-repo/internal/verify"
"os"
"path"
"sync"
@ -48,12 +49,13 @@ func (l *localHandler) Stat(ctx context.Context, repo string, hash model.Hash) (
var (
err error
info os.FileInfo
sp = l.path(hash)
)
l.Lock()
defer l.Unlock()
if info, err = os.Stat(l.path(hash)); err != nil {
if info, err = os.Stat(sp); err != nil {
if errors.Is(err, os.ErrNotExist) {
return 0, opt.ErrNotFound
}
@ -68,11 +70,17 @@ func (l *localHandler) Put(ctx context.Context, repo string, hash model.Hash, rc
var (
err error
f *os.File
nrc io.ReadCloser
)
l.Lock()
defer l.Unlock()
if nrc, err = verify.ReadCloser(rc, verify.SizeUnknown, hash); err != nil {
return err
}
defer nrc.Close()
if f, err = os.OpenFile(l.path(hash), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644); err != nil {
return err
}

View File

@ -0,0 +1,11 @@
package interfaces
type Enum interface {
Value() int64
Code() string
Label() string
MarshalJSON() ([]byte, error)
All() []Enum
}

View File

@ -9,10 +9,10 @@ import (
type ManifestHandler interface {
Get(ctx context.Context, repo string, target string) (io.ReadCloser, string, *rerr.RepositoryError)
Put(ctx context.Context, repo string, target string, digest string, mf *model.Manifest) *rerr.RepositoryError
Put(ctx context.Context, repo string, target string, digest string, mf *model.RepoSimpleManifest) *rerr.RepositoryError
Delete(ctx context.Context, repo string, target string) *rerr.RepositoryError
Catelog(ctx context.Context, limit int, last int) (*model.Catalog, *rerr.RepositoryError)
Tags(ctx context.Context, repo string, limit, last int) (*model.Tag, *rerr.RepositoryError)
Catalog(ctx context.Context, limit int, last int, keyword string) (*model.Catalog, *rerr.RepositoryError)
Tags(ctx context.Context, repo string, limit, last int, keyword string) (*model.Tag, *rerr.RepositoryError)
Referrers(ctx context.Context, repo string, target string) (*model.IndexManifest, *rerr.RepositoryError)
}

View File

@ -9,6 +9,7 @@ import (
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"io"
"net/http"
"nf-repo/internal/interfaces"
@ -17,16 +18,10 @@ import (
"nf-repo/internal/util/tools"
)
type PackageManifest struct {
Id uint64 `json:"id" gorm:"primaryKey;column:id"`
CreatedAt int64 `json:"created_at" gorm:"column:created_at;autoCreateTime:milli"`
UpdatedAt int64 `json:"updated_at" gorm:"column:updated_at;autoUpdateTime:milli"`
Repo string `json:"repo" gorm:"uniqueIndex:repo_tag_idx;column:repo"`
Target string `json:"target" gorm:"uniqueIndex:repo_tag_idx;column:target"`
Digest string `json:"digest" gorm:"unique;column:digest"`
ContentType string `json:"content_type" gorm:"column:content_type"`
Content []byte `json:"content" gorm:"column:content;type:bytes"`
type pm struct {
model.PackageManifest
Digest string `json:"digest" gorm:"column:digest"`
Size int `json:"size" gorm:"column:size"`
}
type dbManifests struct {
@ -35,20 +30,38 @@ type dbManifests struct {
func (m *dbManifests) Get(ctx context.Context, repo string, target string) (io.ReadCloser, string, *rerr.RepositoryError) {
var (
err error
pm = new(PackageManifest)
h model.Hash
tx = m.db.TX(tools.Timeout(5)).Model(pm)
err error
hash model.Hash
pd = new(model.PackageDigest)
)
if h, err = model.NewHash(target); err == nil {
tx = tx.Where("digest", h.String())
} else {
tx = tx.Where("repo", repo).
Where("target", target)
if hash, err = model.NewHash(target); err == nil {
if err = m.db.TX(tools.Timeout(5)).
Model(&model.PackageDigest{}).
Where("digest", hash.String()).
Take(pd).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, "", &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: fmt.Sprintf("Unknown name: %s@%s", repo, target),
}
}
return nil, "", rerr.ErrInternal(err)
}
return io.NopCloser(bytes.NewReader(pd.Content)), pd.ContentType, nil
}
if err = tx.
var pm = new(model.PackageManifest)
if err = m.db.TX(tools.Timeout(5)).
Model(&model.PackageManifest{}).
Where("repo", repo).
Where("tag", target).
Take(pm).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
@ -62,39 +75,85 @@ func (m *dbManifests) Get(ctx context.Context, repo string, target string) (io.R
return nil, "", rerr.ErrInternal(err)
}
return io.NopCloser(bytes.NewReader(pm.Content)), pm.ContentType, nil
if err = m.db.TX(tools.Timeout(5)).
Model(&model.PackageDigest{}).
Where("id", pm.DigestId).
Take(pd).
Error; err != nil {
logrus.
WithField("path", "dbManifests.Get").
WithField("digest", pm.DigestId).
WithField("err", err.Error()).
Error()
return nil, "", rerr.ErrInternal(err)
}
return io.NopCloser(bytes.NewReader(pd.Content)), pd.ContentType, nil
}
func (m *dbManifests) Put(ctx context.Context, repo string, target string, digest string, mf *model.Manifest) *rerr.RepositoryError {
func (m *dbManifests) Put(ctx context.Context, repo string, tag string, digest string, mf *model.RepoSimpleManifest) *rerr.RepositoryError {
var (
err error
pm = &PackageManifest{
Repo: repo,
Target: target,
pm = &model.PackageManifest{
Repo: repo,
Tag: tag,
}
pd = &model.PackageDigest{
Digest: digest,
ContentType: mf.ContentType,
Content: mf.Blob,
}
blob = new(model.RepoSimpleManifestBlob)
)
// todo on conflict
if err = json.Unmarshal(mf.Blob, blob); err != nil {
return rerr.ErrInternal(err)
}
pd.Size = blob.CountSize()
if err = m.db.TX(tools.Timeout(5)).
Clauses(clause.Returning{}).
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "digest"}},
DoUpdates: clause.Set{
{
Column: clause.Column{Name: "content_type"},
Value: mf.ContentType,
},
{
Column: clause.Column{Name: "content"},
Value: mf.Blob,
},
{
Column: clause.Column{Name: "size"},
Value: pd.Size,
},
}}).
Create(pd).
Error; err != nil {
logrus.
WithField("path", "dbManifests.Put.Create").
WithField("err", err.Error()).
Error()
return rerr.ErrInternal(err)
}
pm.DigestId = pd.Id
if err = m.db.TX(tools.Timeout(5)).Create(pm).Error; err == nil {
return nil
}
logrus.
WithField("path", "dbManifests.Put.Create").
WithField("err", err.Error()).
Trace()
if err = m.db.TX(tools.Timeout(5)).Model(&PackageManifest{}).
Where("(repo = ? AND target = ?) OR (digest = ?)", repo, target, digest).
if err = m.db.TX(tools.Timeout(5)).Model(&model.PackageManifest{}).
Where("(repo = ? AND tag = ?)", repo, tag).
Updates(map[string]any{
"repo": repo,
"target": target,
"digest": digest,
"content_type": mf.ContentType,
"content": mf.Blob,
"repo": repo,
"tag": tag,
"digest_id": pm.DigestId,
}).
Error; err != nil {
logrus.
@ -115,7 +174,7 @@ func (m *dbManifests) Delete(ctx context.Context, repo string, target string) *r
if err = m.db.TX(tools.Timeout(5)).
Where("repo", repo).
Where("target", target).
Delete(&PackageManifest{}).
Delete(&model.PackageManifest{}).
Error; err != nil {
return rerr.ErrInternal(err)
}
@ -123,14 +182,23 @@ func (m *dbManifests) Delete(ctx context.Context, repo string, target string) *r
return nil
}
func (m *dbManifests) Catelog(ctx context.Context, limit int, last int) (*model.Catalog, *rerr.RepositoryError) {
func (m *dbManifests) Catalog(ctx context.Context, limit int, last int, keyword string) (*model.Catalog, *rerr.RepositoryError) {
var (
err error
list = make([]*PackageManifest, 0)
list = make([]*pm, 0)
tx = m.db.TX(tools.Timeout(5))
)
if err = m.db.TX(tools.Timeout(5)).Model(&PackageManifest{}).
Order("updated_at").
tx = tx.Model(&model.PackageManifest{}).
Select("\"package_manifests\".*", "\"pd\".\"digest\" as digest", "\"pd\".\"size\" as size")
if keyword != "" {
k := fmt.Sprintf("%%%s%%", keyword)
tx = tx.Where("package_manifests.repo like ?", k)
}
if err = tx.Group("\"package_manifests\".\"repo\"").
Joins("LEFT JOIN package_digests pd on \"pd\".\"id\" = \"package_manifests\".\"digest_id\"").
Order("updated_at DESC, tag = 'latest' DESC").
Offset(last).
Limit(limit).
Find(&list).
@ -139,20 +207,39 @@ func (m *dbManifests) Catelog(ctx context.Context, limit int, last int) (*model.
}
return &model.Catalog{
Repos: lo.Map(list, func(item *PackageManifest, index int) string {
Repositories: lo.Map(list, func(item *pm, index int) string {
return item.Repo
}),
Repos: lo.Map(list, func(item *pm, index int) *model.PackageManifest {
item.PackageManifest.Digest = item.Digest
item.PackageManifest.Size = item.Size
return &item.PackageManifest
}),
}, nil
}
func (m *dbManifests) Tags(ctx context.Context, repo string, limit, last int) (*model.Tag, *rerr.RepositoryError) {
func (m *dbManifests) Tags(ctx context.Context, repo string, limit, last int, keyword string) (*model.Tag, *rerr.RepositoryError) {
var (
err error
list = make([]*PackageManifest, 0)
err error
list = make([]*pm, 0)
tx = m.db.TX(tools.Timeout(5)).Model(&model.PackageManifest{}).Select("\"package_manifests\".*", "\"pd\".\"digest\" as digest")
txc = m.db.TX(tools.Timeout(5)).Model(&model.PackageManifest{}).Select("COUNT(id)")
total int
)
if err = m.db.TX(tools.Timeout(5)).Model(&PackageManifest{}).
Where("repo", repo).
if keyword != "" {
k := fmt.Sprintf("%%%s%%", keyword)
tx = tx.Where("\"package_manifests\".\"tag\" like ?", k)
txc = txc.Where("tag like ?", k)
}
if err = txc.Find(&total).Error; err != nil {
return nil, rerr.ErrInternal(err)
}
if err = tx.
Where("package_manifests.repo", repo).
Joins("LEFT JOIN package_digests pd on \"pd\".\"id\" = \"package_manifests\".\"digest_id\"").
Order("updated_at").
Offset(last).
Limit(limit).
@ -163,49 +250,90 @@ func (m *dbManifests) Tags(ctx context.Context, repo string, limit, last int) (*
return &model.Tag{
Name: repo,
Tags: lo.Map(list, func(item *PackageManifest, index int) string {
return item.Target
Tags: lo.Map(list, func(item *pm, index int) string {
return item.Tag
}),
RepoTags: lo.Map(list, func(item *pm, index int) *model.PackageManifest {
item.PackageManifest.Digest = item.Digest
return &item.PackageManifest
}),
Total: total,
}, nil
}
func (m *dbManifests) Referrers(ctx context.Context, repo string, target string) (*model.IndexManifest, *rerr.RepositoryError) {
var (
err error
pm = new(PackageManifest)
pm = new(model.PackageManifest)
pd = new(model.PackageDigest)
manifest = &model.IndexManifest{}
tx = m.db.TX(tools.Timeout(5)).Model(pm)
tx = m.db.TX(tools.Timeout(5))
hash model.Hash
)
h, err := model.NewHash(target)
if err != nil {
tx = tx.Where("repo", repo).Where("digest", h.String())
} else {
tx = tx.Where("repo", repo).Where("target", target)
}
if hash, err = model.NewHash(target); err == nil {
if err = tx.Model(&model.PackageDigest{}).
Where("repo", repo).
Where("digest", hash.String()).
Take(pd).
Error; err != nil {
if err = m.db.TX(tools.Timeout(5)).Model(pm).
Take(pm).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: fmt.Sprintf("Unknown name: %s@%s", repo, target),
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: fmt.Sprintf("Unknown name: %s@%s", repo, target),
}
}
logrus.
WithField("path", "dbManifests.Referrers.Take").
WithField("repo", repo).
WithField("target", target).
WithField("err", err.Error()).
Debug()
return nil, rerr.ErrInternal(err)
}
} else {
if err = tx.Model(&model.PackageManifest{}).
Where("repo", repo).
Where("tag", target).
Take(pd).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, &rerr.RepositoryError{
Status: http.StatusNotFound,
Code: "NAME_UNKNOWN",
Message: fmt.Sprintf("Unknown name: %s@%s", repo, target),
}
}
logrus.
WithField("path", "dbManifests.Referrers.Take").
WithField("repo", repo).
WithField("target", target).
Error()
return nil, rerr.ErrInternal(err)
}
logrus.
WithField("path", "dbManifests.Referrers.Take").
WithField("repo", repo).
WithField("target", target).
WithField("err", err.Error()).
Debug()
if err = tx.Model(&model.PackageDigest{}).
Where("digest_id", pm.DigestId).
Take(pd).
Error; err != nil {
return nil, rerr.ErrInternal(err)
logrus.
WithField("path", "dbManifests.Referrers.Take").
WithField("repo", repo).
WithField("target", target).
Error()
return nil, rerr.ErrInternal(err)
}
}
if err = json.Unmarshal(pm.Content, manifest); err != nil {
if err = json.Unmarshal(pd.Content, manifest); err != nil {
logrus.
WithField("path", "dbManifests.Referrers.Unmarshal").
WithField("repo", repo).
@ -223,7 +351,10 @@ func NewManifestDBHandler(tx interfaces.Database) interfaces.ManifestHandler {
err error
)
if err = tx.TX(tools.Timeout(5)).AutoMigrate(&PackageManifest{}); err != nil {
if err = tx.TX(tools.Timeout(5)).AutoMigrate(
&model.PackageManifest{},
&model.PackageDigest{},
); err != nil {
logrus.
WithField("path", "NewManifestDBHandler").
WithField("method", "AutoMigrate").

View File

@ -19,7 +19,7 @@ import (
type memManifest struct {
sync.RWMutex
m map[string]map[string]*model.Manifest
m map[string]map[string]*model.RepoSimpleManifest
}
func (m *memManifest) Referrers(ctx context.Context, repo string, target string) (*model.IndexManifest, *rerr.RepositoryError) {
@ -81,7 +81,7 @@ func (m *memManifest) Referrers(ctx context.Context, repo string, target string)
return im, nil
}
func (m *memManifest) Tags(ctx context.Context, repo string, limit int, last int) (*model.Tag, *rerr.RepositoryError) {
func (m *memManifest) Tags(ctx context.Context, repo string, limit int, last int, keyword string) (*model.Tag, *rerr.RepositoryError) {
m.RLock()
defer m.RUnlock()
@ -96,10 +96,20 @@ func (m *memManifest) Tags(ctx context.Context, repo string, limit int, last int
var tags []string
for tag := range c {
if !strings.Contains(tag, "sha256:") {
if strings.Contains(tag, "sha256:") {
continue
}
if keyword == "" {
tags = append(tags, tag)
continue
}
if strings.Contains(tag, keyword) {
tags = append(tags, tag)
}
}
sort.Strings(tags)
// https://github.com/opencontainers/distribution-spec/blob/b505e9cc53ec499edbd9c1be32298388921bb705/detail.md#tags-paginated
@ -134,7 +144,7 @@ func (m *memManifest) Tags(ctx context.Context, repo string, limit int, last int
return tagsToList, nil
}
func (m *memManifest) Catelog(ctx context.Context, limit, last int) (*model.Catalog, *rerr.RepositoryError) {
func (m *memManifest) Catalog(ctx context.Context, limit, last int, keyword string) (*model.Catalog, *rerr.RepositoryError) {
m.RLock()
defer m.RUnlock()
@ -146,23 +156,24 @@ func (m *memManifest) Catelog(ctx context.Context, limit, last int) (*model.Cata
if countRepos >= limit {
break
}
if keyword != "" && !strings.Contains(key, keyword) {
continue
}
countRepos++
repos = append(repos, key)
}
repositoriesToList := &model.Catalog{
Repos: repos,
Repositories: repos,
}
return repositoriesToList, nil
}
func (m *memManifest) Put(ctx context.Context, repo string, target string, digest string, mf *model.Manifest) *rerr.RepositoryError {
// If the manifest
// list's constituent manifests are already uploaded.
// This isn't strictly required by the registry API, but some
// registries require this.
func (m *memManifest) Put(ctx context.Context, repo string, target string, digest string, mf *model.RepoSimpleManifest) *rerr.RepositoryError {
if types.MediaType(mf.ContentType).IsIndex() {
if err := func() *rerr.RepositoryError {
m.RLock()
@ -205,7 +216,7 @@ func (m *memManifest) Put(ctx context.Context, repo string, target string, diges
defer m.Unlock()
if _, ok := m.m[repo]; !ok {
m.m[repo] = make(map[string]*model.Manifest, 2)
m.m[repo] = make(map[string]*model.RepoSimpleManifest, 2)
}
// Allow future references by target (tag) and immutable digest.
@ -274,5 +285,5 @@ func (m *memManifest) Get(ctx context.Context, repo string, target string) (io.R
}
func NewManifestMemHandler() interfaces.ManifestHandler {
return &memManifest{m: make(map[string]map[string]*model.Manifest)}
return &memManifest{m: make(map[string]map[string]*model.RepoSimpleManifest)}
}

View File

@ -6,6 +6,9 @@ import (
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"nf-repo/internal/interfaces"
"nf-repo/internal/opt"
"os"
"path"
)
type tx struct {
@ -13,7 +16,13 @@ type tx struct {
}
func (t *tx) TX(ctx context.Context) *gorm.DB {
return t.db.Session(&gorm.Session{}).WithContext(ctx)
db := t.db.Session(&gorm.Session{}).WithContext(ctx)
if opt.Debug {
db = db.Debug()
}
return db
}
func newTX(db *gorm.DB) interfaces.Database {
@ -39,6 +48,10 @@ func Must(database interfaces.Database, err error) interfaces.Database {
}
func NewSqliteTX(filepath string) (interfaces.Database, error) {
if err := os.MkdirAll(path.Dir(filepath), 0755); err != nil {
return nil, err
}
db, err := gorm.Open(sqlite.Open(filepath), &gorm.Config{})
return newTX(db), err
}

View File

@ -96,6 +96,12 @@ func (l *localUploader) Done(ctx context.Context, bh interfaces.BlobHandler, id
}
defer vrc.Close()
logrus.
WithField("path", "localUploader.Done").
WithField("id", id).
WithField("size", size).
Error()
if err := bh.Put(ctx, repo, hash, vrc); err != nil {
if errors.As(err, &verify.Error{}) {
logrus.