feat: complete OCI registry implementation with docker push/pull support
A lightweight OCI (Open Container Initiative) registry implementation written in Go.
This commit is contained in:
376
internal/module/registry/blob.go
Normal file
376
internal/module/registry/blob.go
Normal file
@@ -0,0 +1,376 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"gitea.loveuer.com/loveuer/cluster/internal/model"
|
||||
"gitea.loveuer.com/loveuer/cluster/pkg/resp"
|
||||
"gitea.loveuer.com/loveuer/cluster/pkg/store"
|
||||
"github.com/gofiber/fiber/v3"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// HandleBlobs ?? blob ????
|
||||
// POST /v2/{repo}/blobs/uploads/ - ????
|
||||
// PATCH /v2/{repo}/blobs/uploads/{uuid} - ?????
|
||||
// PUT /v2/{repo}/blobs/uploads/{uuid}?digest={digest} - ????
|
||||
// GET /v2/{repo}/blobs/{digest} - ?? blob
|
||||
// HEAD /v2/{repo}/blobs/{digest} - ?? blob ????
|
||||
func HandleBlobs(c fiber.Ctx, db *gorm.DB, store store.Store) error {
|
||||
path := c.Path()
|
||||
method := c.Method()
|
||||
|
||||
// ????: /v2/{repo}/blobs/...
|
||||
// ??????????? "test/redis"
|
||||
pathWithoutV2 := strings.TrimPrefix(path, "/v2/")
|
||||
parts := strings.Split(pathWithoutV2, "/")
|
||||
if len(parts) < 2 {
|
||||
return resp.R404(c, "INVALID_PATH", nil, "invalid path")
|
||||
}
|
||||
|
||||
// ?? "blobs" ????????????????
|
||||
blobsIndex := -1
|
||||
for i, part := range parts {
|
||||
if part == "blobs" {
|
||||
blobsIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if blobsIndex < 1 {
|
||||
return resp.R404(c, "INVALID_PATH", nil, "invalid path: blobs not found")
|
||||
}
|
||||
|
||||
// ???? blobs ???????
|
||||
repo := strings.Join(parts[:blobsIndex], "/")
|
||||
// ???? parts??????????? parts[0] ? "blobs"
|
||||
parts = parts[blobsIndex:]
|
||||
|
||||
switch method {
|
||||
case "POST":
|
||||
// POST /v2/{repo}/blobs/uploads/ - ????
|
||||
// parts ??? ["blobs", "uploads", ""] ? ["blobs", "uploads"]
|
||||
if len(parts) >= 2 && parts[0] == "blobs" && parts[1] == "uploads" {
|
||||
return handleBlobUploadStart(c, db, store, repo)
|
||||
}
|
||||
|
||||
case "PATCH":
|
||||
// PATCH /v2/{repo}/blobs/uploads/{uuid} - ?????
|
||||
// parts ??? ["blobs", "uploads", "uuid"]
|
||||
if len(parts) >= 3 && parts[0] == "blobs" && parts[1] == "uploads" {
|
||||
uuid := parts[2]
|
||||
return handleBlobUploadChunk(c, db, store, repo, uuid)
|
||||
}
|
||||
|
||||
case "PUT":
|
||||
// PUT /v2/{repo}/blobs/uploads/{uuid}?digest={digest} - ????
|
||||
// parts ??? ["blobs", "uploads", "uuid"]
|
||||
if len(parts) >= 3 && parts[0] == "blobs" && parts[1] == "uploads" {
|
||||
uuid := parts[2]
|
||||
digest := c.Query("digest")
|
||||
if digest == "" {
|
||||
return resp.R400(c, "MISSING_DIGEST", nil, "digest parameter is required")
|
||||
}
|
||||
return handleBlobUploadComplete(c, db, store, repo, uuid, digest)
|
||||
}
|
||||
|
||||
case "GET":
|
||||
// GET /v2/{repo}/blobs/{digest} - ?? blob
|
||||
// parts ??? ["blobs", "digest"]
|
||||
if len(parts) >= 2 && parts[0] == "blobs" {
|
||||
digest := parts[1]
|
||||
return handleBlobDownload(c, db, store, repo, digest)
|
||||
}
|
||||
|
||||
case "HEAD":
|
||||
// HEAD /v2/{repo}/blobs/{digest} - ?? blob ????
|
||||
// parts ??? ["blobs", "digest"]
|
||||
if len(parts) >= 2 && parts[0] == "blobs" {
|
||||
digest := parts[1]
|
||||
return handleBlobHead(c, db, store, repo, digest)
|
||||
}
|
||||
}
|
||||
|
||||
return resp.R404(c, "NOT_FOUND", nil, "endpoint not found")
|
||||
}
|
||||
|
||||
// handleBlobUploadStart ?? blob ??
|
||||
func handleBlobUploadStart(c fiber.Ctx, db *gorm.DB, store store.Store, repo string) error {
|
||||
// ?? UUID
|
||||
uuidBytes := make([]byte, 16)
|
||||
if _, err := rand.Read(uuidBytes); err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
uuid := hex.EncodeToString(uuidBytes)
|
||||
|
||||
// ??????
|
||||
upload := &model.BlobUpload{
|
||||
UUID: uuid,
|
||||
Repository: repo,
|
||||
Path: uuid, // ?? UUID ??????
|
||||
Size: 0,
|
||||
}
|
||||
|
||||
if err := db.Create(upload).Error; err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ??????
|
||||
w, err := store.CreateUpload(c.Context(), uuid)
|
||||
if err != nil {
|
||||
db.Delete(upload)
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
w.Close()
|
||||
|
||||
// ???? URL
|
||||
uploadURL := fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid)
|
||||
c.Set("Location", uploadURL)
|
||||
c.Set("Docker-Upload-UUID", uuid)
|
||||
c.Set("Range", "0-0")
|
||||
return c.SendStatus(202)
|
||||
}
|
||||
|
||||
// handleBlobUploadChunk ?? blob ???
|
||||
func handleBlobUploadChunk(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, uuid string) error {
|
||||
// ??????
|
||||
var upload model.BlobUpload
|
||||
if err := db.Where("uuid = ? AND repository = ?", uuid, repo).First(&upload).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return resp.R404(c, "UPLOAD_NOT_FOUND", nil, "upload session not found")
|
||||
}
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ?????
|
||||
body := c.Body()
|
||||
if len(body) == 0 {
|
||||
return resp.R400(c, "EMPTY_BODY", nil, "request body is empty")
|
||||
}
|
||||
|
||||
// ??????? bytes.NewReader ????????
|
||||
n, err := store.AppendUpload(c.Context(), uuid, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ??????
|
||||
upload.Size += n
|
||||
if err := db.Save(&upload).Error; err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ???? URL ???
|
||||
uploadURL := fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid)
|
||||
c.Set("Location", uploadURL)
|
||||
c.Set("Docker-Upload-UUID", uuid)
|
||||
c.Set("Range", fmt.Sprintf("0-%d", upload.Size-1))
|
||||
return c.SendStatus(202)
|
||||
}
|
||||
|
||||
// handleBlobUploadComplete ?? blob ??
|
||||
func handleBlobUploadComplete(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, uuid string, digest string) error {
|
||||
// ??????
|
||||
var upload model.BlobUpload
|
||||
if err := db.Where("uuid = ? AND repository = ?", uuid, repo).First(&upload).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return resp.R404(c, "UPLOAD_NOT_FOUND", nil, "upload session not found")
|
||||
}
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ??????????????PUT ???????????
|
||||
body := c.Body()
|
||||
if len(body) > 0 {
|
||||
if _, err := store.AppendUpload(c.Context(), uuid, bytes.NewReader(body)); err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ?????????????
|
||||
if err := store.FinalizeUpload(c.Context(), uuid, digest); err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ????????
|
||||
size, err := store.GetBlobSize(c.Context(), digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// ????? blob ??
|
||||
var blob model.Blob
|
||||
if err := db.Where("digest = ?", digest).First(&blob).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
blob = model.Blob{
|
||||
Digest: digest,
|
||||
Size: size,
|
||||
Repository: repo,
|
||||
}
|
||||
if err := db.Create(&blob).Error; err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
} else {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ??????
|
||||
db.Delete(&upload)
|
||||
store.DeleteUpload(c.Context(), uuid)
|
||||
|
||||
// ?? blob URL
|
||||
blobURL := fmt.Sprintf("/v2/%s/blobs/%s", repo, digest)
|
||||
c.Set("Location", blobURL)
|
||||
c.Set("Content-Length", fmt.Sprintf("%d", size))
|
||||
c.Set("Docker-Content-Digest", digest)
|
||||
return c.SendStatus(201)
|
||||
}
|
||||
|
||||
// parseRangeHeader parses Range header and returns start and end positions
|
||||
func parseRangeHeader(rangeHeader string, size int64) (start, end int64, valid bool) {
|
||||
if rangeHeader == "" {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
|
||||
// Range header format: "bytes=start-end" or "bytes=start-"
|
||||
if !strings.HasPrefix(rangeHeader, "bytes=") {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
|
||||
rangeSpec := strings.TrimPrefix(rangeHeader, "bytes=")
|
||||
parts := strings.Split(rangeSpec, "-")
|
||||
if len(parts) != 2 {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
|
||||
var err error
|
||||
if parts[0] == "" {
|
||||
// Suffix range: "bytes=-suffix"
|
||||
suffix, err := strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil || suffix <= 0 {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
start = size - suffix
|
||||
if start < 0 {
|
||||
start = 0
|
||||
}
|
||||
end = size - 1
|
||||
} else if parts[1] == "" {
|
||||
// Start range: "bytes=start-"
|
||||
start, err = strconv.ParseInt(parts[0], 10, 64)
|
||||
if err != nil || start < 0 || start >= size {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
end = size - 1
|
||||
} else {
|
||||
// Full range: "bytes=start-end"
|
||||
start, err = strconv.ParseInt(parts[0], 10, 64)
|
||||
if err != nil || start < 0 || start >= size {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
end, err = strconv.ParseInt(parts[1], 10, 64)
|
||||
if err != nil || end < start || end >= size {
|
||||
return 0, size - 1, false
|
||||
}
|
||||
}
|
||||
|
||||
return start, end, true
|
||||
}
|
||||
|
||||
// handleBlobDownload ?? blob
|
||||
func handleBlobDownload(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, digest string) error {
|
||||
// Check if blob exists
|
||||
exists, err := store.BlobExists(c.Context(), digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
if !exists {
|
||||
return resp.R404(c, "BLOB_NOT_FOUND", nil, "blob not found")
|
||||
}
|
||||
|
||||
// Get blob size
|
||||
size, err := store.GetBlobSize(c.Context(), digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// Read blob
|
||||
reader, err := store.ReadBlob(c.Context(), digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// Check for Range request
|
||||
rangeHeader := c.Get("Range")
|
||||
start, end, hasRange := parseRangeHeader(rangeHeader, size)
|
||||
|
||||
if hasRange {
|
||||
// Handle Range request
|
||||
// Seek to start position
|
||||
if seeker, ok := reader.(io.Seeker); ok {
|
||||
if _, err := seeker.Seek(start, io.SeekStart); err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
} else {
|
||||
// If not seekable, read and discard bytes
|
||||
if _, err := io.CopyN(io.Discard, reader, start); err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create limited reader
|
||||
limitedReader := io.LimitReader(reader, end-start+1)
|
||||
|
||||
// Set partial content headers
|
||||
c.Set("Content-Type", "application/octet-stream")
|
||||
c.Set("Content-Length", fmt.Sprintf("%d", end-start+1))
|
||||
c.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
|
||||
c.Set("Accept-Ranges", "bytes")
|
||||
c.Set("Docker-Content-Digest", digest)
|
||||
c.Status(206) // Partial Content
|
||||
|
||||
// Send partial content
|
||||
return c.SendStream(limitedReader)
|
||||
}
|
||||
|
||||
// Full blob download
|
||||
c.Set("Content-Type", "application/octet-stream")
|
||||
c.Set("Content-Length", fmt.Sprintf("%d", size))
|
||||
c.Set("Accept-Ranges", "bytes")
|
||||
c.Set("Docker-Content-Digest", digest)
|
||||
|
||||
// Send full blob stream
|
||||
return c.SendStream(reader)
|
||||
}
|
||||
|
||||
// handleBlobHead ?? blob ????
|
||||
func handleBlobHead(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, digest string) error {
|
||||
// Check if blob exists
|
||||
exists, err := store.BlobExists(c.Context(), digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
if !exists {
|
||||
return resp.R404(c, "BLOB_NOT_FOUND", nil, "blob not found")
|
||||
}
|
||||
|
||||
// Get blob size
|
||||
size, err := store.GetBlobSize(c.Context(), digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
// Set response headers
|
||||
c.Set("Content-Type", "application/octet-stream")
|
||||
c.Set("Content-Length", fmt.Sprintf("%d", size))
|
||||
c.Set("Accept-Ranges", "bytes")
|
||||
c.Set("Docker-Content-Digest", digest)
|
||||
return c.SendStatus(200)
|
||||
}
|
||||
Reference in New Issue
Block a user