Files
cluster/internal/module/registry/blob.go
loveuer 8b655d3496 refactor: reorganize models to pkg/model and add authentication module
- Move ORM models from internal/model to pkg/model organized by module (auth/k8s/registry)
- Add authentication module with login, user management handlers
- Update all import paths to use new model locations
- Add frontend auth pages (Login, UserManagement) and authStore
- Remove deprecated internal/model/model.go
2025-11-20 14:55:48 +08:00

413 lines
12 KiB
Go

package registry
import (
"bytes"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"log"
"strconv"
"strings"
"gitea.loveuer.com/loveuer/cluster/pkg/model/registry"
"gitea.loveuer.com/loveuer/cluster/pkg/resp"
"gitea.loveuer.com/loveuer/cluster/pkg/store"
"github.com/gofiber/fiber/v3"
"gorm.io/gorm"
)
// HandleBlobs ?? blob ????
// POST /v2/{repo}/blobs/uploads/ - ????
// PATCH /v2/{repo}/blobs/uploads/{uuid} - ?????
// PUT /v2/{repo}/blobs/uploads/{uuid}?digest={digest} - ????
// GET /v2/{repo}/blobs/{digest} - ?? blob
// HEAD /v2/{repo}/blobs/{digest} - ?? blob ????
func HandleBlobs(c fiber.Ctx, db *gorm.DB, store store.Store) error {
path := c.Path()
method := c.Method()
// ????: /v2/{repo}/blobs/...
// ??????????? "test/redis"
pathWithoutV2 := strings.TrimPrefix(path, "/v2/")
parts := strings.Split(pathWithoutV2, "/")
if len(parts) < 2 {
return resp.R404(c, "INVALID_PATH", nil, "invalid path")
}
// ?? "blobs" ????????????????
blobsIndex := -1
for i, part := range parts {
if part == "blobs" {
blobsIndex = i
break
}
}
if blobsIndex < 1 {
return resp.R404(c, "INVALID_PATH", nil, "invalid path: blobs not found")
}
// ???? blobs ???????
repo := strings.Join(parts[:blobsIndex], "/")
// Strip registry_address prefix from repo if present
var registryConfig registry.RegistryConfig
registryAddress := ""
if err := db.Where("key = ?", "registry_address").First(&registryConfig).Error; err == nil {
registryAddress = registryConfig.Value
}
if registryAddress != "" && strings.HasPrefix(repo, registryAddress+"/") {
repo = strings.TrimPrefix(repo, registryAddress+"/")
}
// ???? parts??????????? parts[0] ? "blobs"
parts = parts[blobsIndex:]
switch method {
case "POST":
// POST /v2/{repo}/blobs/uploads/ - ????
// parts ??? ["blobs", "uploads", ""] ? ["blobs", "uploads"]
if len(parts) >= 2 && parts[0] == "blobs" && parts[1] == "uploads" {
return handleBlobUploadStart(c, db, store, repo)
}
case "PATCH":
// PATCH /v2/{repo}/blobs/uploads/{uuid} - ?????
// parts ??? ["blobs", "uploads", "uuid"]
if len(parts) >= 3 && parts[0] == "blobs" && parts[1] == "uploads" {
uuid := parts[2]
return handleBlobUploadChunk(c, db, store, repo, uuid)
}
case "PUT":
// PUT /v2/{repo}/blobs/uploads/{uuid}?digest={digest} - ????
// parts ??? ["blobs", "uploads", "uuid"]
if len(parts) >= 3 && parts[0] == "blobs" && parts[1] == "uploads" {
uuid := parts[2]
digest := c.Query("digest")
if digest == "" {
return resp.R400(c, "MISSING_DIGEST", nil, "digest parameter is required")
}
return handleBlobUploadComplete(c, db, store, repo, uuid, digest)
}
case "GET":
// GET /v2/{repo}/blobs/{digest} - ?? blob
// parts ??? ["blobs", "digest"]
if len(parts) >= 2 && parts[0] == "blobs" {
digest := parts[1]
return handleBlobDownload(c, db, store, repo, digest)
}
case "HEAD":
// HEAD /v2/{repo}/blobs/{digest} - ?? blob ????
// parts ??? ["blobs", "digest"]
if len(parts) >= 2 && parts[0] == "blobs" {
digest := parts[1]
return handleBlobHead(c, db, store, repo, digest)
}
}
return resp.R404(c, "NOT_FOUND", nil, "endpoint not found")
}
// handleBlobUploadStart ?? blob ??
func handleBlobUploadStart(c fiber.Ctx, db *gorm.DB, store store.Store, repo string) error {
// ?? UUID
uuidBytes := make([]byte, 16)
if _, err := rand.Read(uuidBytes); err != nil {
return resp.R500(c, "", nil, err)
}
uuid := hex.EncodeToString(uuidBytes)
// ??????
upload := &registry.BlobUpload{
UUID: uuid,
Repository: repo,
Path: uuid, // ?? UUID ??????
Size: 0,
}
if err := db.Create(upload).Error; err != nil {
return resp.R500(c, "", nil, err)
}
// ??????
w, err := store.CreateUpload(c.Context(), uuid)
if err != nil {
db.Delete(upload)
return resp.R500(c, "", nil, err)
}
w.Close()
// ???? URL
uploadURL := fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid)
c.Set("Location", uploadURL)
c.Set("Docker-Upload-UUID", uuid)
c.Set("Range", "0-0")
return c.SendStatus(202)
}
// handleBlobUploadChunk ?? blob ???
func handleBlobUploadChunk(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, uuid string) error {
// ??????
var upload registry.BlobUpload
if err := db.Where("uuid = ? AND repository = ?", uuid, repo).First(&upload).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return resp.R404(c, "UPLOAD_NOT_FOUND", nil, "upload session not found")
}
return resp.R500(c, "", nil, err)
}
// ?????
body := c.Body()
if len(body) == 0 {
return resp.R400(c, "EMPTY_BODY", nil, "request body is empty")
}
// ??????? bytes.NewReader ????????
n, err := store.AppendUpload(c.Context(), uuid, bytes.NewReader(body))
if err != nil {
return resp.R500(c, "", nil, err)
}
// ??????
upload.Size += n
if err := db.Save(&upload).Error; err != nil {
return resp.R500(c, "", nil, err)
}
// ???? URL ???
uploadURL := fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid)
c.Set("Location", uploadURL)
c.Set("Docker-Upload-UUID", uuid)
c.Set("Range", fmt.Sprintf("0-%d", upload.Size-1))
return c.SendStatus(202)
}
// handleBlobUploadComplete ?? blob ??
func handleBlobUploadComplete(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, uuid string, digest string) error {
// ??????
var upload registry.BlobUpload
if err := db.Where("uuid = ? AND repository = ?", uuid, repo).First(&upload).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return resp.R404(c, "UPLOAD_NOT_FOUND", nil, "upload session not found")
}
return resp.R500(c, "", nil, err)
}
// ??????????????PUT ???????????
body := c.Body()
if len(body) > 0 {
if _, err := store.AppendUpload(c.Context(), uuid, bytes.NewReader(body)); err != nil {
return resp.R500(c, "", nil, err)
}
}
// ?????????????
if err := store.FinalizeUpload(c.Context(), uuid, digest); err != nil {
return resp.R500(c, "", nil, err)
}
// ????????
size, err := store.GetBlobSize(c.Context(), digest)
if err != nil {
return resp.R500(c, "", nil, err)
}
// ????? blob ??
var blob registry.Blob
if err := db.Where("digest = ?", digest).First(&blob).Error; err != nil {
if err == gorm.ErrRecordNotFound {
blob = registry.Blob{
Digest: digest,
Size: size,
Repository: repo,
}
if err := db.Create(&blob).Error; err != nil {
return resp.R500(c, "", nil, err)
}
} else {
return resp.R500(c, "", nil, err)
}
}
// ??????
db.Delete(&upload)
store.DeleteUpload(c.Context(), uuid)
// ?? blob URL
blobURL := fmt.Sprintf("/v2/%s/blobs/%s", repo, digest)
c.Set("Location", blobURL)
c.Set("Content-Length", fmt.Sprintf("%d", size))
c.Set("Docker-Content-Digest", digest)
return c.SendStatus(201)
}
// parseRangeHeader parses Range header and returns start and end positions
func parseRangeHeader(rangeHeader string, size int64) (start, end int64, valid bool) {
if rangeHeader == "" {
return 0, size - 1, false
}
// Range header format: "bytes=start-end" or "bytes=start-"
if !strings.HasPrefix(rangeHeader, "bytes=") {
return 0, size - 1, false
}
rangeSpec := strings.TrimPrefix(rangeHeader, "bytes=")
parts := strings.Split(rangeSpec, "-")
if len(parts) != 2 {
return 0, size - 1, false
}
var err error
if parts[0] == "" {
// Suffix range: "bytes=-suffix"
suffix, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil || suffix <= 0 {
return 0, size - 1, false
}
start = size - suffix
if start < 0 {
start = 0
}
end = size - 1
} else if parts[1] == "" {
// Start range: "bytes=start-"
start, err = strconv.ParseInt(parts[0], 10, 64)
if err != nil || start < 0 || start >= size {
return 0, size - 1, false
}
end = size - 1
} else {
// Full range: "bytes=start-end"
start, err = strconv.ParseInt(parts[0], 10, 64)
if err != nil || start < 0 || start >= size {
return 0, size - 1, false
}
end, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil || end < start || end >= size {
return 0, size - 1, false
}
}
return start, end, true
}
// handleBlobDownload ?? blob
func handleBlobDownload(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, digest string) error {
log.Printf("[BlobDownload] Start: repo=%s, digest=%s", repo, digest)
// Check if blob exists
exists, err := store.BlobExists(c.Context(), digest)
if err != nil {
log.Printf("[BlobDownload] BlobExists error: %v", err)
return resp.R500(c, "", nil, err)
}
if !exists {
log.Printf("[BlobDownload] Blob not found: %s", digest)
return resp.R404(c, "BLOB_NOT_FOUND", nil, "blob not found")
}
// Get blob size
size, err := store.GetBlobSize(c.Context(), digest)
if err != nil {
log.Printf("[BlobDownload] GetBlobSize error: %v", err)
return resp.R500(c, "", nil, err)
}
log.Printf("[BlobDownload] Blob size: %d bytes", size)
// Read blob
reader, err := store.ReadBlob(c.Context(), digest)
if err != nil {
log.Printf("[BlobDownload] ReadBlob error: %v", err)
return resp.R500(c, "", nil, err)
}
defer reader.Close()
log.Printf("[BlobDownload] Reader opened successfully")
// Check for Range request
rangeHeader := c.Get("Range")
start, end, hasRange := parseRangeHeader(rangeHeader, size)
if hasRange {
log.Printf("[BlobDownload] Range request: %d-%d/%d", start, end, size)
// Handle Range request
// Seek to start position
if seeker, ok := reader.(io.Seeker); ok {
if _, err := seeker.Seek(start, io.SeekStart); err != nil {
log.Printf("[BlobDownload] Seek error: %v", err)
return resp.R500(c, "", nil, err)
}
} else {
// If not seekable, read and discard bytes
if _, err := io.CopyN(io.Discard, reader, start); err != nil {
log.Printf("[BlobDownload] CopyN discard error: %v", err)
return resp.R500(c, "", nil, err)
}
}
// Create limited reader
limitedReader := io.LimitReader(reader, end-start+1)
// Set partial content headers
c.Set("Content-Type", "application/octet-stream")
c.Set("Content-Length", fmt.Sprintf("%d", end-start+1))
c.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size))
c.Set("Accept-Ranges", "bytes")
c.Set("Docker-Content-Digest", digest)
c.Status(206) // Partial Content
log.Printf("[BlobDownload] Sending partial content")
// Read all content and send
content, err := io.ReadAll(limitedReader)
if err != nil {
log.Printf("[BlobDownload] ReadAll error: %v", err)
return resp.R500(c, "", nil, err)
}
return c.Send(content)
}
// Full blob download
log.Printf("[BlobDownload] Full blob download, setting headers")
c.Set("Content-Type", "application/octet-stream")
c.Set("Content-Length", fmt.Sprintf("%d", size))
c.Set("Accept-Ranges", "bytes")
c.Set("Docker-Content-Digest", digest)
log.Printf("[BlobDownload] About to read all content, size=%d", size)
// Read all content and send
content, err := io.ReadAll(reader)
if err != nil {
log.Printf("[BlobDownload] ReadAll error: %v", err)
return resp.R500(c, "", nil, err)
}
log.Printf("[BlobDownload] Read %d bytes, sending...", len(content))
return c.Send(content)
}
// handleBlobHead ?? blob ????
func handleBlobHead(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, digest string) error {
// Check if blob exists
exists, err := store.BlobExists(c.Context(), digest)
if err != nil {
return resp.R500(c, "", nil, err)
}
if !exists {
return resp.R404(c, "BLOB_NOT_FOUND", nil, "blob not found")
}
// Get blob size
size, err := store.GetBlobSize(c.Context(), digest)
if err != nil {
return resp.R500(c, "", nil, err)
}
// Set response headers
c.Set("Content-Type", "application/octet-stream")
c.Set("Content-Length", fmt.Sprintf("%d", size))
c.Set("Accept-Ranges", "bytes")
c.Set("Docker-Content-Digest", digest)
return c.SendStatus(200)
}