wip: oci image management
This commit is contained in:
288
handler/registry_blob.go
Normal file
288
handler/registry_blob.go
Normal file
@@ -0,0 +1,288 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"gitea.loveuer.com/loveuer/cluster/internal/database"
|
||||
"gitea.loveuer.com/loveuer/cluster/internal/model"
|
||||
"gitea.loveuer.com/loveuer/cluster/internal/rerr"
|
||||
|
||||
"github.com/gofiber/fiber/v3"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
func handleBlobs(c fiber.Ctx) error {
|
||||
elem := strings.Split(c.Path(), "/")
|
||||
elem = elem[1:]
|
||||
if elem[len(elem)-1] == "" {
|
||||
elem = elem[:len(elem)-1]
|
||||
}
|
||||
|
||||
// Must have a path of form /v2/{name}/blobs/{upload,sha256:}
|
||||
if len(elem) < 4 {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "NAME_INVALID",
|
||||
Message: "blobs must be attached to a repo",
|
||||
})
|
||||
}
|
||||
|
||||
target := elem[len(elem)-1]
|
||||
service := elem[len(elem)-2]
|
||||
digest := c.Query("digest")
|
||||
contentRange := c.Get("Content-Range")
|
||||
rangeHeader := c.Get("Range")
|
||||
repo := strings.Join(elem[1:len(elem)-2], "/")
|
||||
|
||||
switch c.Method() {
|
||||
case http.MethodHead:
|
||||
h, err := model.NewHash(target)
|
||||
if err != nil {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "NAME_INVALID",
|
||||
Message: "invalid digest",
|
||||
})
|
||||
}
|
||||
|
||||
size, err := blobHandler.Stat(c.Context(), repo, h)
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
return rerr.Error(c, rerr.ErrBlobUnknown)
|
||||
} else if err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
|
||||
c.Set("Content-Length", fmt.Sprint(size))
|
||||
c.Set("Docker-Content-Digest", h.String())
|
||||
return c.Send(nil)
|
||||
|
||||
case http.MethodGet:
|
||||
h, err := model.NewHash(target)
|
||||
if err != nil {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "NAME_INVALID",
|
||||
Message: "invalid digest",
|
||||
})
|
||||
}
|
||||
|
||||
size, err := blobHandler.Stat(c.Context(), repo, h)
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
return rerr.Error(c, rerr.ErrBlobUnknown)
|
||||
} else if err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
|
||||
rc, err := blobHandler.Get(c.Context(), repo, h)
|
||||
if errors.Is(err, ErrNotFound) {
|
||||
return rerr.Error(c, rerr.ErrBlobUnknown)
|
||||
} else if err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
r := rc
|
||||
if rangeHeader != "" {
|
||||
start, end := int64(0), int64(0)
|
||||
if _, err := fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end); err != nil {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "BLOB_UNKNOWN",
|
||||
Message: "We don't understand your Range",
|
||||
})
|
||||
}
|
||||
|
||||
n := (end + 1) - start
|
||||
if ra, ok := r.(io.ReaderAt); ok {
|
||||
if end+1 > size {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "BLOB_UNKNOWN",
|
||||
Message: fmt.Sprintf("range end %d > %d size", end+1, size),
|
||||
})
|
||||
}
|
||||
sr := io.NewSectionReader(ra, start, n)
|
||||
r = io.NopCloser(sr)
|
||||
} else {
|
||||
if _, err := io.CopyN(io.Discard, r, start); err != nil {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "BLOB_UNKNOWN",
|
||||
Message: fmt.Sprintf("Failed to discard %d bytes", start),
|
||||
})
|
||||
}
|
||||
lr := io.LimitReader(r, n)
|
||||
r = io.NopCloser(lr)
|
||||
}
|
||||
|
||||
c.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
|
||||
c.Set("Content-Length", fmt.Sprint(n))
|
||||
c.Set("Docker-Content-Digest", h.String())
|
||||
c.Status(http.StatusPartialContent)
|
||||
} else {
|
||||
c.Set("Content-Length", fmt.Sprint(size))
|
||||
c.Set("Docker-Content-Digest", h.String())
|
||||
c.Status(http.StatusOK)
|
||||
}
|
||||
|
||||
_, err = io.Copy(c, r)
|
||||
return err
|
||||
|
||||
case http.MethodPost:
|
||||
if target != "uploads" {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "METHOD_UNKNOWN",
|
||||
Message: fmt.Sprintf("POST to /blobs must be followed by /uploads, got %s", target),
|
||||
})
|
||||
}
|
||||
|
||||
if digest != "" {
|
||||
h, err := model.NewHash(digest)
|
||||
if err != nil {
|
||||
return rerr.Error(c, rerr.ErrDigestInvalid)
|
||||
}
|
||||
|
||||
vrc := io.NopCloser(bytes.NewReader(c.Body()))
|
||||
defer vrc.Close()
|
||||
|
||||
if err = blobHandler.Put(c.Context(), repo, h, vrc); err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
c.Set("Docker-Content-Digest", h.String())
|
||||
return c.SendStatus(http.StatusCreated)
|
||||
}
|
||||
|
||||
id := uploadHandler.UploadId()
|
||||
|
||||
// Get or create repository
|
||||
var repository model.RegistryRepository
|
||||
if err := database.DB.Where("name = ?", repo).First(&repository).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
repository = model.RegistryRepository{Name: repo}
|
||||
if err := database.DB.Create(&repository).Error; err != nil {
|
||||
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
|
||||
if err := database.DB.Where("name = ?", repo).First(&repository).Error; err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
} else {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Create upload session
|
||||
uploadPath := model.GetUploadPath("./x-storage", id)
|
||||
if err := os.MkdirAll(path.Dir(uploadPath), 0755); err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
|
||||
session := model.RegistryUploadSession{
|
||||
RepositoryID: repository.ID,
|
||||
SessionID: id,
|
||||
Path: uploadPath,
|
||||
Size: 0,
|
||||
}
|
||||
if err := database.DB.Create(&session).Error; err != nil {
|
||||
return rerr.Error(c, rerr.ErrInternal(err))
|
||||
}
|
||||
|
||||
c.Set("Location", "/"+path.Join("v2", path.Join(elem[1:len(elem)-2]...), "blobs/uploads", id))
|
||||
c.Set("Range", "0-0")
|
||||
|
||||
return c.SendStatus(http.StatusAccepted)
|
||||
|
||||
case http.MethodPatch:
|
||||
if service != "uploads" {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "METHOD_UNKNOWN",
|
||||
Message: fmt.Sprintf("PATCH to /blobs must be followed by /uploads, got %s", service),
|
||||
})
|
||||
}
|
||||
|
||||
start, end := 0, 0
|
||||
if contentRange != "" {
|
||||
if _, err := fmt.Sscanf(contentRange, "%d-%d", &start, &end); err != nil {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "BLOB_UPLOAD_UNKNOWN",
|
||||
Message: "We don't understand your Content-Range",
|
||||
})
|
||||
}
|
||||
|
||||
expectedEnd := start + len(c.Body()) - 1
|
||||
if end != expectedEnd {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusRequestedRangeNotSatisfiable,
|
||||
Code: "BLOB_UPLOAD_INVALID",
|
||||
Message: fmt.Sprintf("blob upload content range mismatch: expected end %d, got %d", expectedEnd, end),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
end = start + len(c.Body()) - 1
|
||||
}
|
||||
|
||||
length, re := uploadHandler.Write(c.Context(), target, bytes.NewReader(c.Body()), start, end)
|
||||
if re != nil {
|
||||
return rerr.Error(c, re)
|
||||
}
|
||||
|
||||
c.Set("Location", "/"+path.Join("v2", path.Join(elem[1:len(elem)-3]...), "blobs/uploads", target))
|
||||
c.Set("Range", fmt.Sprintf("0-%d", length-1))
|
||||
return c.SendStatus(http.StatusNoContent)
|
||||
|
||||
case http.MethodPut:
|
||||
if service != "uploads" {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "METHOD_UNKNOWN",
|
||||
Message: fmt.Sprintf("PUT to /blobs must be followed by /uploads, got %s", service),
|
||||
})
|
||||
}
|
||||
|
||||
if digest == "" {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "DIGEST_INVALID",
|
||||
Message: "digest not specified",
|
||||
})
|
||||
}
|
||||
|
||||
hash, err := model.NewHash(digest)
|
||||
if err != nil {
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "NAME_INVALID",
|
||||
Message: "invalid digest",
|
||||
})
|
||||
}
|
||||
|
||||
re := uploadHandler.Done(c.Context(), blobHandler, target, bytes.NewReader(c.Body()), len(c.Body()), repo, hash)
|
||||
if re != nil {
|
||||
return rerr.Error(c, re)
|
||||
}
|
||||
|
||||
c.Set("Docker-Content-Digest", hash.String())
|
||||
return c.SendStatus(http.StatusCreated)
|
||||
|
||||
default:
|
||||
return rerr.Error(c, &rerr.RepositoryError{
|
||||
Status: http.StatusBadRequest,
|
||||
Code: "METHOD_UNKNOWN",
|
||||
Message: "We don't understand your method + url",
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user