package registry import ( "archive/tar" "bufio" "context" "encoding/json" "fmt" "io" "log" "net/url" "strings" "time" "gitea.loveuer.com/loveuer/cluster/internal/model" "gitea.loveuer.com/loveuer/cluster/pkg/resp" "gitea.loveuer.com/loveuer/cluster/pkg/store" "github.com/gofiber/fiber/v3" "gorm.io/gorm" ) // RegistryImageDownload downloads an image as a tar file func RegistryImageDownload(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { startTime := time.Now() // Get image name from wildcard parameter (Fiber automatically decodes URL) fullImageName := c.Params("*") if fullImageName == "" { return resp.R400(c, "MISSING_IMAGE_NAME", nil, "image name is required") } // Additional URL decode in case Fiber didn't decode it decodedImageName, err := url.PathUnescape(fullImageName) if err == nil { fullImageName = decodedImageName } log.Printf("[Download] Start downloading: %s", fullImageName) // Get current registry_address to strip it from the request var registryConfig model.RegistryConfig registryAddress := "" if err := db.Where("key = ?", "registry_address").First(®istryConfig).Error; err == nil { registryAddress = registryConfig.Value } if registryAddress == "" { registryAddress = "localhost:9119" } // Strip registry_address prefix if present // e.g., "test.com/docker.io/redis:latest" -> "docker.io/redis:latest" imageName := fullImageName if strings.HasPrefix(imageName, registryAddress+"/") { imageName = strings.TrimPrefix(imageName, registryAddress+"/") } // Parse image name (repository:tag) to extract repository and tag var repository, tag string if strings.Contains(imageName, ":") { parts := strings.SplitN(imageName, ":", 2) repository = parts[0] tag = parts[1] } else { // If no tag specified, default to "latest" repository = imageName tag = "latest" } log.Printf("[Download] Parsed - repository: %s, tag: %s", repository, tag) // Find the repository t1 := time.Now() var repositoryModel model.Repository if err := db.Where("name = ?", repository).First(&repositoryModel).Error; err != nil { if err == gorm.ErrRecordNotFound { return resp.R404(c, "IMAGE_NOT_FOUND", nil, fmt.Sprintf("image %s not found", repository)) } return resp.R500(c, "", nil, err) } log.Printf("[Download] DB query repository: %v", time.Since(t1)) // Find the tag record t2 := time.Now() var tagRecord model.Tag if err := db.Where("repository = ? AND tag = ?", repository, tag).First(&tagRecord).Error; err != nil { if err == gorm.ErrRecordNotFound { // Try to get the first available tag if err := db.Where("repository = ?", repository).First(&tagRecord).Error; err != nil { if err == gorm.ErrRecordNotFound { return resp.R404(c, "TAG_NOT_FOUND", nil, fmt.Sprintf("no tag found for image %s", repository)) } return resp.R500(c, "", nil, err) } // Update tag to the found tag tag = tagRecord.Tag } else { return resp.R500(c, "", nil, err) } } log.Printf("[Download] DB query tag: %v", time.Since(t2)) // Get the manifest t3 := time.Now() var manifest model.Manifest if err := db.Where("digest = ?", tagRecord.Digest).First(&manifest).Error; err != nil { if err == gorm.ErrRecordNotFound { return resp.R404(c, "MANIFEST_NOT_FOUND", nil, "manifest not found") } return resp.R500(c, "", nil, err) } log.Printf("[Download] DB query manifest: %v", time.Since(t3)) // Read manifest content - try from database first, then from store t4 := time.Now() var manifestContent []byte if len(manifest.Content) > 0 { manifestContent = manifest.Content } else { var err error manifestContent, err = store.ReadManifest(c.Context(), manifest.Digest) if err != nil { return resp.R500(c, "", nil, err) } } log.Printf("[Download] Read manifest content: %v", time.Since(t4)) // Parse manifest to extract layer digests t5 := time.Now() var manifestData map[string]interface{} if err := json.Unmarshal(manifestContent, &manifestData); err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to parse manifest: %w", err)) } // Debug: check manifest keys manifestKeys := make([]string, 0, len(manifestData)) for k := range manifestData { manifestKeys = append(manifestKeys, k) } if _, ok := manifestData["layers"]; !ok { return resp.R500(c, "", nil, fmt.Errorf("manifest keys: %v, no layers key found", manifestKeys)) } // Extract layers from manifest layers, err := extractLayers(manifestData) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to extract layers: %w", err)) } if len(layers) == 0 { // Debug: check if layers key exists if layersValue, ok := manifestData["layers"]; ok { return resp.R500(c, "", nil, fmt.Errorf("no layers found in manifest, but layers key exists: %T", layersValue)) } return resp.R500(c, "", nil, fmt.Errorf("no layers found in manifest")) } log.Printf("[Download] Parse manifest and extract %d layers: %v", len(layers), time.Since(t5)) log.Printf("[Download] Preparation completed in %v, starting tar generation", time.Since(startTime)) // Set response headers for file download filename := fmt.Sprintf("%s-%s.tar", strings.ReplaceAll(repository, "/", "_"), tag) c.Set("Content-Type", "application/x-tar") c.Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filename)) // Create a pipe for streaming tar content pr, pw := io.Pipe() // Use buffered writer for better performance bufWriter := bufio.NewWriterSize(pw, 1024*1024) // 1MB buffer tarWriter := tar.NewWriter(bufWriter) // Write tar content in a goroutine go func() { defer pw.Close() defer tarWriter.Close() defer bufWriter.Flush() // Get config digest from manifest configDigest := "" if configValue, ok := manifestData["config"].(map[string]interface{}); ok { if digest, ok := configValue["digest"].(string); ok { configDigest = digest } } // Build Docker save format manifest.json (array format) manifestItems := []map[string]interface{}{ { "Config": strings.TrimPrefix(configDigest, "sha256:") + ".json", "RepoTags": []string{repository + ":" + tag}, "Layers": make([]string, 0, len(layers)), }, } // Add layer paths to manifest for _, layerDigest := range layers { layerPath := strings.TrimPrefix(layerDigest, "sha256:") + "/layer" manifestItems[0]["Layers"] = append(manifestItems[0]["Layers"].([]string), layerPath) } // Convert manifest to JSON manifestJSON, err := json.Marshal(manifestItems) if err != nil { pw.CloseWithError(fmt.Errorf("failed to marshal manifest: %w", err)) return } // Write manifest.json (Docker save format) if err := writeTarFile(tarWriter, "manifest.json", manifestJSON); err != nil { pw.CloseWithError(fmt.Errorf("failed to write manifest: %w", err)) return } // Write repositories file (Docker save format) repositoriesMap := map[string]map[string]string{ repository: { tag: strings.TrimPrefix(tagRecord.Digest, "sha256:"), }, } repositoriesJSON, err := json.Marshal(repositoriesMap) if err != nil { pw.CloseWithError(fmt.Errorf("failed to marshal repositories: %w", err)) return } if err := writeTarFile(tarWriter, "repositories", repositoriesJSON); err != nil { pw.CloseWithError(fmt.Errorf("failed to write repositories: %w", err)) return } // Write config file if exists if configDigest != "" { configReader, err := store.ReadBlob(c.Context(), configDigest) if err == nil { configFileName := strings.TrimPrefix(configDigest, "sha256:") + ".json" if err := writeTarFileStream(tarWriter, configFileName, configReader); err != nil { configReader.Close() pw.CloseWithError(fmt.Errorf("failed to write config: %w", err)) return } configReader.Close() } } // Write all layer blobs in Docker save format (digest/layer) for _, layerDigest := range layers { // Read blob blobReader, err := store.ReadBlob(c.Context(), layerDigest) if err != nil { pw.CloseWithError(fmt.Errorf("failed to read blob %s: %w", layerDigest, err)) return } // Write blob to tar in Docker save format: {digest}/layer digestOnly := strings.TrimPrefix(layerDigest, "sha256:") layerPath := digestOnly + "/layer" if err := writeTarFileStream(tarWriter, layerPath, blobReader); err != nil { blobReader.Close() pw.CloseWithError(fmt.Errorf("failed to write blob: %w", err)) return } blobReader.Close() } // Close tar writer and pipe if err := tarWriter.Close(); err != nil { pw.CloseWithError(fmt.Errorf("failed to close tar writer: %w", err)) return } if err := bufWriter.Flush(); err != nil { pw.CloseWithError(fmt.Errorf("failed to flush buffer: %w", err)) return } }() // Stream the tar content to response return c.SendStream(pr) } } // extractLayers extracts layer digests from manifest func extractLayers(manifestData map[string]interface{}) ([]string, error) { var layers []string // Try Docker manifest v2 schema 2 format or OCI manifest format if layersValue, ok := manifestData["layers"]; ok { if layersArray, ok := layersValue.([]interface{}); ok { for _, layer := range layersArray { if layerMap, ok := layer.(map[string]interface{}); ok { if digest, ok := layerMap["digest"].(string); ok { layers = append(layers, digest) } } } if len(layers) > 0 { return layers, nil } } } // Try manifest list format (multi-arch) if _, ok := manifestData["manifests"].([]interface{}); ok { // For manifest list, we would need to fetch the actual manifest // For now, return error return nil, fmt.Errorf("manifest list format not supported for direct download") } return nil, fmt.Errorf("no layers found in manifest") } // writeTarFile writes a file to tar archive func writeTarFile(tw *tar.Writer, filename string, content []byte) error { header := &tar.Header{ Name: filename, Size: int64(len(content)), Mode: 0644, } if err := tw.WriteHeader(header); err != nil { return err } if _, err := tw.Write(content); err != nil { return err } return nil } // writeTarFileStream writes a file to tar archive from an io.Reader (streaming) func writeTarFileStream(tw *tar.Writer, filename string, reader io.Reader) error { // First, we need to get the size by reading into a buffer // For true streaming without reading all content, we'd need to know size beforehand // Since we're reading from files, we can use io.Copy with a CountingReader // Create a temporary buffer to count size var buf []byte var err error // If reader is a *os.File, we can get size directly if file, ok := reader.(interface{ Stat() (interface{ Size() int64 }, error) }); ok { if stat, err := file.Stat(); err == nil { size := stat.Size() header := &tar.Header{ Name: filename, Size: size, Mode: 0644, } if err := tw.WriteHeader(header); err != nil { return err } // Stream copy without loading to memory if _, err := io.Copy(tw, reader); err != nil { return err } return nil } } // Fallback: read all content (for readers that don't support Stat) buf, err = io.ReadAll(reader) if err != nil { return err } return writeTarFile(tw, filename, buf) }