feat: add registry config, image upload/download, and OCI format support
Backend: - Add registry_address configuration API (GET/POST) - Add tar image upload with OCI and Docker format support - Add image download with streaming optimization - Fix blob download using c.Send (Fiber v3 SendStream bug) - Add registry_address prefix stripping for all OCI v2 endpoints - Add AGENTS.md for project documentation Frontend: - Add settings store with Snackbar notifications - Add image upload dialog with progress bar - Add download state tracking with multi-stage feedback - Replace alert() with MUI Snackbar messages - Display image names without registry_address prefix 🤖 Generated with [Qoder](https://qoder.com)
This commit is contained in:
370
internal/module/registry/handler.download.go
Normal file
370
internal/module/registry/handler.download.go
Normal file
@@ -0,0 +1,370 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.loveuer.com/loveuer/cluster/internal/model"
|
||||
"gitea.loveuer.com/loveuer/cluster/pkg/resp"
|
||||
"gitea.loveuer.com/loveuer/cluster/pkg/store"
|
||||
"github.com/gofiber/fiber/v3"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// RegistryImageDownload downloads an image as a tar file
|
||||
func RegistryImageDownload(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
|
||||
return func(c fiber.Ctx) error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Get image name from wildcard parameter (Fiber automatically decodes URL)
|
||||
fullImageName := c.Params("*")
|
||||
if fullImageName == "" {
|
||||
return resp.R400(c, "MISSING_IMAGE_NAME", nil, "image name is required")
|
||||
}
|
||||
|
||||
// Additional URL decode in case Fiber didn't decode it
|
||||
decodedImageName, err := url.PathUnescape(fullImageName)
|
||||
if err == nil {
|
||||
fullImageName = decodedImageName
|
||||
}
|
||||
|
||||
log.Printf("[Download] Start downloading: %s", fullImageName)
|
||||
|
||||
// Get current registry_address to strip it from the request
|
||||
var registryConfig model.RegistryConfig
|
||||
registryAddress := ""
|
||||
if err := db.Where("key = ?", "registry_address").First(®istryConfig).Error; err == nil {
|
||||
registryAddress = registryConfig.Value
|
||||
}
|
||||
if registryAddress == "" {
|
||||
registryAddress = "localhost:9119"
|
||||
}
|
||||
|
||||
// Strip registry_address prefix if present
|
||||
// e.g., "test.com/docker.io/redis:latest" -> "docker.io/redis:latest"
|
||||
imageName := fullImageName
|
||||
if strings.HasPrefix(imageName, registryAddress+"/") {
|
||||
imageName = strings.TrimPrefix(imageName, registryAddress+"/")
|
||||
}
|
||||
|
||||
// Parse image name (repository:tag) to extract repository and tag
|
||||
var repository, tag string
|
||||
if strings.Contains(imageName, ":") {
|
||||
parts := strings.SplitN(imageName, ":", 2)
|
||||
repository = parts[0]
|
||||
tag = parts[1]
|
||||
} else {
|
||||
// If no tag specified, default to "latest"
|
||||
repository = imageName
|
||||
tag = "latest"
|
||||
}
|
||||
|
||||
log.Printf("[Download] Parsed - repository: %s, tag: %s", repository, tag)
|
||||
|
||||
// Find the repository
|
||||
t1 := time.Now()
|
||||
var repositoryModel model.Repository
|
||||
if err := db.Where("name = ?", repository).First(&repositoryModel).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return resp.R404(c, "IMAGE_NOT_FOUND", nil, fmt.Sprintf("image %s not found", repository))
|
||||
}
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
log.Printf("[Download] DB query repository: %v", time.Since(t1))
|
||||
|
||||
// Find the tag record
|
||||
t2 := time.Now()
|
||||
var tagRecord model.Tag
|
||||
if err := db.Where("repository = ? AND tag = ?", repository, tag).First(&tagRecord).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
// Try to get the first available tag
|
||||
if err := db.Where("repository = ?", repository).First(&tagRecord).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return resp.R404(c, "TAG_NOT_FOUND", nil, fmt.Sprintf("no tag found for image %s", repository))
|
||||
}
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
// Update tag to the found tag
|
||||
tag = tagRecord.Tag
|
||||
} else {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
}
|
||||
log.Printf("[Download] DB query tag: %v", time.Since(t2))
|
||||
|
||||
// Get the manifest
|
||||
t3 := time.Now()
|
||||
var manifest model.Manifest
|
||||
if err := db.Where("digest = ?", tagRecord.Digest).First(&manifest).Error; err != nil {
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
return resp.R404(c, "MANIFEST_NOT_FOUND", nil, "manifest not found")
|
||||
}
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
log.Printf("[Download] DB query manifest: %v", time.Since(t3))
|
||||
|
||||
// Read manifest content - try from database first, then from store
|
||||
t4 := time.Now()
|
||||
var manifestContent []byte
|
||||
if len(manifest.Content) > 0 {
|
||||
manifestContent = manifest.Content
|
||||
} else {
|
||||
var err error
|
||||
manifestContent, err = store.ReadManifest(c.Context(), manifest.Digest)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
}
|
||||
log.Printf("[Download] Read manifest content: %v", time.Since(t4))
|
||||
|
||||
// Parse manifest to extract layer digests
|
||||
t5 := time.Now()
|
||||
var manifestData map[string]interface{}
|
||||
if err := json.Unmarshal(manifestContent, &manifestData); err != nil {
|
||||
return resp.R500(c, "", nil, fmt.Errorf("failed to parse manifest: %w", err))
|
||||
}
|
||||
|
||||
// Debug: check manifest keys
|
||||
manifestKeys := make([]string, 0, len(manifestData))
|
||||
for k := range manifestData {
|
||||
manifestKeys = append(manifestKeys, k)
|
||||
}
|
||||
if _, ok := manifestData["layers"]; !ok {
|
||||
return resp.R500(c, "", nil, fmt.Errorf("manifest keys: %v, no layers key found", manifestKeys))
|
||||
}
|
||||
|
||||
// Extract layers from manifest
|
||||
layers, err := extractLayers(manifestData)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, fmt.Errorf("failed to extract layers: %w", err))
|
||||
}
|
||||
if len(layers) == 0 {
|
||||
// Debug: check if layers key exists
|
||||
if layersValue, ok := manifestData["layers"]; ok {
|
||||
return resp.R500(c, "", nil, fmt.Errorf("no layers found in manifest, but layers key exists: %T", layersValue))
|
||||
}
|
||||
return resp.R500(c, "", nil, fmt.Errorf("no layers found in manifest"))
|
||||
}
|
||||
log.Printf("[Download] Parse manifest and extract %d layers: %v", len(layers), time.Since(t5))
|
||||
log.Printf("[Download] Preparation completed in %v, starting tar generation", time.Since(startTime))
|
||||
|
||||
// Set response headers for file download
|
||||
filename := fmt.Sprintf("%s-%s.tar", strings.ReplaceAll(repository, "/", "_"), tag)
|
||||
c.Set("Content-Type", "application/x-tar")
|
||||
c.Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename))
|
||||
|
||||
// Create a pipe for streaming tar content
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
// Use buffered writer for better performance
|
||||
bufWriter := bufio.NewWriterSize(pw, 1024*1024) // 1MB buffer
|
||||
tarWriter := tar.NewWriter(bufWriter)
|
||||
|
||||
// Write tar content in a goroutine
|
||||
go func() {
|
||||
defer pw.Close()
|
||||
defer tarWriter.Close()
|
||||
defer bufWriter.Flush()
|
||||
|
||||
// Get config digest from manifest
|
||||
configDigest := ""
|
||||
if configValue, ok := manifestData["config"].(map[string]interface{}); ok {
|
||||
if digest, ok := configValue["digest"].(string); ok {
|
||||
configDigest = digest
|
||||
}
|
||||
}
|
||||
|
||||
// Build Docker save format manifest.json (array format)
|
||||
manifestItems := []map[string]interface{}{
|
||||
{
|
||||
"Config": strings.TrimPrefix(configDigest, "sha256:") + ".json",
|
||||
"RepoTags": []string{repository + ":" + tag},
|
||||
"Layers": make([]string, 0, len(layers)),
|
||||
},
|
||||
}
|
||||
|
||||
// Add layer paths to manifest
|
||||
for _, layerDigest := range layers {
|
||||
layerPath := strings.TrimPrefix(layerDigest, "sha256:") + "/layer"
|
||||
manifestItems[0]["Layers"] = append(manifestItems[0]["Layers"].([]string), layerPath)
|
||||
}
|
||||
|
||||
// Convert manifest to JSON
|
||||
manifestJSON, err := json.Marshal(manifestItems)
|
||||
if err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to marshal manifest: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Write manifest.json (Docker save format)
|
||||
if err := writeTarFile(tarWriter, "manifest.json", manifestJSON); err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to write manifest: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Write repositories file (Docker save format)
|
||||
repositoriesMap := map[string]map[string]string{
|
||||
repository: {
|
||||
tag: strings.TrimPrefix(tagRecord.Digest, "sha256:"),
|
||||
},
|
||||
}
|
||||
repositoriesJSON, err := json.Marshal(repositoriesMap)
|
||||
if err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to marshal repositories: %w", err))
|
||||
return
|
||||
}
|
||||
if err := writeTarFile(tarWriter, "repositories", repositoriesJSON); err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to write repositories: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Write config file if exists
|
||||
if configDigest != "" {
|
||||
configReader, err := store.ReadBlob(c.Context(), configDigest)
|
||||
if err == nil {
|
||||
configFileName := strings.TrimPrefix(configDigest, "sha256:") + ".json"
|
||||
if err := writeTarFileStream(tarWriter, configFileName, configReader); err != nil {
|
||||
configReader.Close()
|
||||
pw.CloseWithError(fmt.Errorf("failed to write config: %w", err))
|
||||
return
|
||||
}
|
||||
configReader.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Write all layer blobs in Docker save format (digest/layer)
|
||||
for _, layerDigest := range layers {
|
||||
// Read blob
|
||||
blobReader, err := store.ReadBlob(c.Context(), layerDigest)
|
||||
if err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to read blob %s: %w", layerDigest, err))
|
||||
return
|
||||
}
|
||||
|
||||
// Write blob to tar in Docker save format: {digest}/layer
|
||||
digestOnly := strings.TrimPrefix(layerDigest, "sha256:")
|
||||
layerPath := digestOnly + "/layer"
|
||||
|
||||
if err := writeTarFileStream(tarWriter, layerPath, blobReader); err != nil {
|
||||
blobReader.Close()
|
||||
pw.CloseWithError(fmt.Errorf("failed to write blob: %w", err))
|
||||
return
|
||||
}
|
||||
blobReader.Close()
|
||||
}
|
||||
|
||||
// Close tar writer and pipe
|
||||
if err := tarWriter.Close(); err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to close tar writer: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
if err := bufWriter.Flush(); err != nil {
|
||||
pw.CloseWithError(fmt.Errorf("failed to flush buffer: %w", err))
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Stream the tar content to response
|
||||
return c.SendStream(pr)
|
||||
}
|
||||
}
|
||||
|
||||
// extractLayers extracts layer digests from manifest
|
||||
func extractLayers(manifestData map[string]interface{}) ([]string, error) {
|
||||
var layers []string
|
||||
|
||||
// Try Docker manifest v2 schema 2 format or OCI manifest format
|
||||
if layersValue, ok := manifestData["layers"]; ok {
|
||||
if layersArray, ok := layersValue.([]interface{}); ok {
|
||||
for _, layer := range layersArray {
|
||||
if layerMap, ok := layer.(map[string]interface{}); ok {
|
||||
if digest, ok := layerMap["digest"].(string); ok {
|
||||
layers = append(layers, digest)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(layers) > 0 {
|
||||
return layers, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try manifest list format (multi-arch)
|
||||
if _, ok := manifestData["manifests"].([]interface{}); ok {
|
||||
// For manifest list, we would need to fetch the actual manifest
|
||||
// For now, return error
|
||||
return nil, fmt.Errorf("manifest list format not supported for direct download")
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no layers found in manifest")
|
||||
}
|
||||
|
||||
// writeTarFile writes a file to tar archive
|
||||
func writeTarFile(tw *tar.Writer, filename string, content []byte) error {
|
||||
header := &tar.Header{
|
||||
Name: filename,
|
||||
Size: int64(len(content)),
|
||||
Mode: 0644,
|
||||
}
|
||||
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tw.Write(content); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeTarFileStream writes a file to tar archive from an io.Reader (streaming)
|
||||
func writeTarFileStream(tw *tar.Writer, filename string, reader io.Reader) error {
|
||||
// First, we need to get the size by reading into a buffer
|
||||
// For true streaming without reading all content, we'd need to know size beforehand
|
||||
// Since we're reading from files, we can use io.Copy with a CountingReader
|
||||
|
||||
// Create a temporary buffer to count size
|
||||
var buf []byte
|
||||
var err error
|
||||
|
||||
// If reader is a *os.File, we can get size directly
|
||||
if file, ok := reader.(interface{ Stat() (interface{ Size() int64 }, error) }); ok {
|
||||
if stat, err := file.Stat(); err == nil {
|
||||
size := stat.Size()
|
||||
header := &tar.Header{
|
||||
Name: filename,
|
||||
Size: size,
|
||||
Mode: 0644,
|
||||
}
|
||||
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Stream copy without loading to memory
|
||||
if _, err := io.Copy(tw, reader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: read all content (for readers that don't support Stat)
|
||||
buf, err = io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return writeTarFile(tw, filename, buf)
|
||||
}
|
||||
Reference in New Issue
Block a user