package registry import ( "bytes" "crypto/rand" "encoding/hex" "fmt" "io" "log" "strconv" "strings" "gitea.loveuer.com/loveuer/cluster/pkg/model/registry" "gitea.loveuer.com/loveuer/cluster/pkg/resp" "gitea.loveuer.com/loveuer/cluster/pkg/store" "github.com/gofiber/fiber/v3" "gorm.io/gorm" ) // HandleBlobs ?? blob ???? // POST /v2/{repo}/blobs/uploads/ - ???? // PATCH /v2/{repo}/blobs/uploads/{uuid} - ????? // PUT /v2/{repo}/blobs/uploads/{uuid}?digest={digest} - ???? // GET /v2/{repo}/blobs/{digest} - ?? blob // HEAD /v2/{repo}/blobs/{digest} - ?? blob ???? func HandleBlobs(c fiber.Ctx, db *gorm.DB, store store.Store) error { path := c.Path() method := c.Method() // ????: /v2/{repo}/blobs/... // ??????????? "test/redis" pathWithoutV2 := strings.TrimPrefix(path, "/v2/") parts := strings.Split(pathWithoutV2, "/") if len(parts) < 2 { return resp.R404(c, "INVALID_PATH", nil, "invalid path") } // ?? "blobs" ???????????????? blobsIndex := -1 for i, part := range parts { if part == "blobs" { blobsIndex = i break } } if blobsIndex < 1 { return resp.R404(c, "INVALID_PATH", nil, "invalid path: blobs not found") } // ???? blobs ??????? repo := strings.Join(parts[:blobsIndex], "/") // Strip registry_address prefix from repo if present var registryConfig registry.RegistryConfig registryAddress := "" if err := db.Where("key = ?", "registry_address").First(®istryConfig).Error; err == nil { registryAddress = registryConfig.Value } if registryAddress != "" && strings.HasPrefix(repo, registryAddress+"/") { repo = strings.TrimPrefix(repo, registryAddress+"/") } // ???? parts??????????? parts[0] ? "blobs" parts = parts[blobsIndex:] switch method { case "POST": // POST /v2/{repo}/blobs/uploads/ - ???? // parts ??? ["blobs", "uploads", ""] ? ["blobs", "uploads"] if len(parts) >= 2 && parts[0] == "blobs" && parts[1] == "uploads" { return handleBlobUploadStart(c, db, store, repo) } case "PATCH": // PATCH /v2/{repo}/blobs/uploads/{uuid} - ????? // parts ??? ["blobs", "uploads", "uuid"] if len(parts) >= 3 && parts[0] == "blobs" && parts[1] == "uploads" { uuid := parts[2] return handleBlobUploadChunk(c, db, store, repo, uuid) } case "PUT": // PUT /v2/{repo}/blobs/uploads/{uuid}?digest={digest} - ???? // parts ??? ["blobs", "uploads", "uuid"] if len(parts) >= 3 && parts[0] == "blobs" && parts[1] == "uploads" { uuid := parts[2] digest := c.Query("digest") if digest == "" { return resp.R400(c, "MISSING_DIGEST", nil, "digest parameter is required") } return handleBlobUploadComplete(c, db, store, repo, uuid, digest) } case "GET": // GET /v2/{repo}/blobs/{digest} - ?? blob // parts ??? ["blobs", "digest"] if len(parts) >= 2 && parts[0] == "blobs" { digest := parts[1] return handleBlobDownload(c, db, store, repo, digest) } case "HEAD": // HEAD /v2/{repo}/blobs/{digest} - ?? blob ???? // parts ??? ["blobs", "digest"] if len(parts) >= 2 && parts[0] == "blobs" { digest := parts[1] return handleBlobHead(c, db, store, repo, digest) } } return resp.R404(c, "NOT_FOUND", nil, "endpoint not found") } // handleBlobUploadStart ?? blob ?? func handleBlobUploadStart(c fiber.Ctx, db *gorm.DB, store store.Store, repo string) error { // ?? UUID uuidBytes := make([]byte, 16) if _, err := rand.Read(uuidBytes); err != nil { return resp.R500(c, "", nil, err) } uuid := hex.EncodeToString(uuidBytes) // ?????? upload := ®istry.BlobUpload{ UUID: uuid, Repository: repo, Path: uuid, // ?? UUID ?????? Size: 0, } if err := db.Create(upload).Error; err != nil { return resp.R500(c, "", nil, err) } // ?????? w, err := store.CreateUpload(c.Context(), uuid) if err != nil { db.Delete(upload) return resp.R500(c, "", nil, err) } w.Close() // ???? URL uploadURL := fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid) c.Set("Location", uploadURL) c.Set("Docker-Upload-UUID", uuid) c.Set("Range", "0-0") return c.SendStatus(202) } // handleBlobUploadChunk ?? blob ??? func handleBlobUploadChunk(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, uuid string) error { // ?????? var upload registry.BlobUpload if err := db.Where("uuid = ? AND repository = ?", uuid, repo).First(&upload).Error; err != nil { if err == gorm.ErrRecordNotFound { return resp.R404(c, "UPLOAD_NOT_FOUND", nil, "upload session not found") } return resp.R500(c, "", nil, err) } // ????? body := c.Body() if len(body) == 0 { return resp.R400(c, "EMPTY_BODY", nil, "request body is empty") } // ??????? bytes.NewReader ???????? n, err := store.AppendUpload(c.Context(), uuid, bytes.NewReader(body)) if err != nil { return resp.R500(c, "", nil, err) } // ?????? upload.Size += n if err := db.Save(&upload).Error; err != nil { return resp.R500(c, "", nil, err) } // ???? URL ??? uploadURL := fmt.Sprintf("/v2/%s/blobs/uploads/%s", repo, uuid) c.Set("Location", uploadURL) c.Set("Docker-Upload-UUID", uuid) c.Set("Range", fmt.Sprintf("0-%d", upload.Size-1)) return c.SendStatus(202) } // handleBlobUploadComplete ?? blob ?? func handleBlobUploadComplete(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, uuid string, digest string) error { // ?????? var upload registry.BlobUpload if err := db.Where("uuid = ? AND repository = ?", uuid, repo).First(&upload).Error; err != nil { if err == gorm.ErrRecordNotFound { return resp.R404(c, "UPLOAD_NOT_FOUND", nil, "upload session not found") } return resp.R500(c, "", nil, err) } // ??????????????PUT ??????????? body := c.Body() if len(body) > 0 { if _, err := store.AppendUpload(c.Context(), uuid, bytes.NewReader(body)); err != nil { return resp.R500(c, "", nil, err) } } // ????????????? if err := store.FinalizeUpload(c.Context(), uuid, digest); err != nil { return resp.R500(c, "", nil, err) } // ???????? size, err := store.GetBlobSize(c.Context(), digest) if err != nil { return resp.R500(c, "", nil, err) } // ????? blob ?? var blob registry.Blob if err := db.Where("digest = ?", digest).First(&blob).Error; err != nil { if err == gorm.ErrRecordNotFound { blob = registry.Blob{ Digest: digest, Size: size, Repository: repo, } if err := db.Create(&blob).Error; err != nil { return resp.R500(c, "", nil, err) } } else { return resp.R500(c, "", nil, err) } } // ?????? db.Delete(&upload) store.DeleteUpload(c.Context(), uuid) // ?? blob URL blobURL := fmt.Sprintf("/v2/%s/blobs/%s", repo, digest) c.Set("Location", blobURL) c.Set("Content-Length", fmt.Sprintf("%d", size)) c.Set("Docker-Content-Digest", digest) return c.SendStatus(201) } // parseRangeHeader parses Range header and returns start and end positions func parseRangeHeader(rangeHeader string, size int64) (start, end int64, valid bool) { if rangeHeader == "" { return 0, size - 1, false } // Range header format: "bytes=start-end" or "bytes=start-" if !strings.HasPrefix(rangeHeader, "bytes=") { return 0, size - 1, false } rangeSpec := strings.TrimPrefix(rangeHeader, "bytes=") parts := strings.Split(rangeSpec, "-") if len(parts) != 2 { return 0, size - 1, false } var err error if parts[0] == "" { // Suffix range: "bytes=-suffix" suffix, err := strconv.ParseInt(parts[1], 10, 64) if err != nil || suffix <= 0 { return 0, size - 1, false } start = size - suffix if start < 0 { start = 0 } end = size - 1 } else if parts[1] == "" { // Start range: "bytes=start-" start, err = strconv.ParseInt(parts[0], 10, 64) if err != nil || start < 0 || start >= size { return 0, size - 1, false } end = size - 1 } else { // Full range: "bytes=start-end" start, err = strconv.ParseInt(parts[0], 10, 64) if err != nil || start < 0 || start >= size { return 0, size - 1, false } end, err = strconv.ParseInt(parts[1], 10, 64) if err != nil || end < start || end >= size { return 0, size - 1, false } } return start, end, true } // handleBlobDownload ?? blob func handleBlobDownload(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, digest string) error { log.Printf("[BlobDownload] Start: repo=%s, digest=%s", repo, digest) // Check if blob exists exists, err := store.BlobExists(c.Context(), digest) if err != nil { log.Printf("[BlobDownload] BlobExists error: %v", err) return resp.R500(c, "", nil, err) } if !exists { log.Printf("[BlobDownload] Blob not found: %s", digest) return resp.R404(c, "BLOB_NOT_FOUND", nil, "blob not found") } // Get blob size size, err := store.GetBlobSize(c.Context(), digest) if err != nil { log.Printf("[BlobDownload] GetBlobSize error: %v", err) return resp.R500(c, "", nil, err) } log.Printf("[BlobDownload] Blob size: %d bytes", size) // Read blob reader, err := store.ReadBlob(c.Context(), digest) if err != nil { log.Printf("[BlobDownload] ReadBlob error: %v", err) return resp.R500(c, "", nil, err) } defer reader.Close() log.Printf("[BlobDownload] Reader opened successfully") // Check for Range request rangeHeader := c.Get("Range") start, end, hasRange := parseRangeHeader(rangeHeader, size) if hasRange { log.Printf("[BlobDownload] Range request: %d-%d/%d", start, end, size) // Handle Range request // Seek to start position if seeker, ok := reader.(io.Seeker); ok { if _, err := seeker.Seek(start, io.SeekStart); err != nil { log.Printf("[BlobDownload] Seek error: %v", err) return resp.R500(c, "", nil, err) } } else { // If not seekable, read and discard bytes if _, err := io.CopyN(io.Discard, reader, start); err != nil { log.Printf("[BlobDownload] CopyN discard error: %v", err) return resp.R500(c, "", nil, err) } } // Create limited reader limitedReader := io.LimitReader(reader, end-start+1) // Set partial content headers c.Set("Content-Type", "application/octet-stream") c.Set("Content-Length", fmt.Sprintf("%d", end-start+1)) c.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, size)) c.Set("Accept-Ranges", "bytes") c.Set("Docker-Content-Digest", digest) c.Status(206) // Partial Content log.Printf("[BlobDownload] Sending partial content") // Read all content and send content, err := io.ReadAll(limitedReader) if err != nil { log.Printf("[BlobDownload] ReadAll error: %v", err) return resp.R500(c, "", nil, err) } return c.Send(content) } // Full blob download log.Printf("[BlobDownload] Full blob download, setting headers") c.Set("Content-Type", "application/octet-stream") c.Set("Content-Length", fmt.Sprintf("%d", size)) c.Set("Accept-Ranges", "bytes") c.Set("Docker-Content-Digest", digest) log.Printf("[BlobDownload] About to read all content, size=%d", size) // Read all content and send content, err := io.ReadAll(reader) if err != nil { log.Printf("[BlobDownload] ReadAll error: %v", err) return resp.R500(c, "", nil, err) } log.Printf("[BlobDownload] Read %d bytes, sending...", len(content)) return c.Send(content) } // handleBlobHead ?? blob ???? func handleBlobHead(c fiber.Ctx, db *gorm.DB, store store.Store, repo string, digest string) error { // Check if blob exists exists, err := store.BlobExists(c.Context(), digest) if err != nil { return resp.R500(c, "", nil, err) } if !exists { return resp.R404(c, "BLOB_NOT_FOUND", nil, "blob not found") } // Get blob size size, err := store.GetBlobSize(c.Context(), digest) if err != nil { return resp.R500(c, "", nil, err) } // Set response headers c.Set("Content-Type", "application/octet-stream") c.Set("Content-Length", fmt.Sprintf("%d", size)) c.Set("Accept-Ranges", "bytes") c.Set("Docker-Content-Digest", digest) return c.SendStatus(200) }