package oci import ( "archive/tar" "bufio" "bytes" "compress/gzip" "context" "crypto/sha256" "encoding/json" "fmt" "io" "net/http" "strings" "gitea.loveuer.com/loveuer/upkg/tool" ) type OCIUploadOpt func(*ociUploadOpt) type ociUploadOpt struct { PlainHTTP bool // 使用 HTTP 而不是 HTTPS SkipTLSVerify bool // 跳过 TLS 验证 Username string // 认证用户名 Password string // 认证密码 } // WithPushPlainHTTP 使用 HTTP func WithPushPlainHTTP(plainHTTP bool) OCIUploadOpt { return func(o *ociUploadOpt) { o.PlainHTTP = plainHTTP } } // WithPushSkipTLSVerify 跳过 TLS 验证 func WithPushSkipTLSVerify(skip bool) OCIUploadOpt { return func(o *ociUploadOpt) { o.SkipTLSVerify = skip } } // WithPushAuth 设置认证信息 func WithPushAuth(username, password string) OCIUploadOpt { return func(o *ociUploadOpt) { o.Username = username o.Password = password } } // PushImage 上传镜像 // 通过原生 HTTP 方法上传 tar 镜像到 OCI 镜像仓库,而不是调用 docker push 命令 // file: tar 格式的镜像文件 // address: 完整的镜像地址,格式:/: // // 例如: localhost:5000/myapp:latest, 192.168.1.1:5000/library/nginx:1.20 // 可以是 IP、域名,可带端口号 func PushImage(ctx context.Context, file io.Reader, address string, opts ...OCIUploadOpt) error { opt := &ociUploadOpt{ PlainHTTP: false, SkipTLSVerify: false, } for _, fn := range opts { fn(opt) } // logger.DebugCtx(ctx, "PushImage: starting upload, address=%s, plainHTTP=%v, skipTLSVerify=%v", address, opt.PlainHTTP, opt.SkipTLSVerify) // 自动识别 gzip 格式 br := bufio.NewReader(file) header, err := br.Peek(2) if err == nil && len(header) >= 2 && header[0] == 0x1f && header[1] == 0x8b { // logger.DebugCtx(ctx, "PushImage: detected gzip format, decompressing...") // 重置 reader 到开头,然后创建 gzip reader // 通过组合原始文件 reader 创建新的 reader if seeker, ok := file.(io.Seeker); ok { // 如果支持 seek,回到开头 seeker.Seek(0, io.SeekStart) gz, err := gzip.NewReader(file) if err != nil { // logger.ErrorCtx(ctx, "PushImage: create gzip reader failed, err=%v", err) return fmt.Errorf("create gzip reader failed: %w", err) } defer gz.Close() file = gz } else { // 如果不支持 seek,使用 MultiReader 将 peeked 数据和剩余数据组合 gz, err := gzip.NewReader(io.MultiReader(bytes.NewReader(header), br)) if err != nil { // logger.ErrorCtx(ctx, "PushImage: create gzip reader failed, err=%v", err) return fmt.Errorf("create gzip reader failed: %w", err) } defer gz.Close() file = gz } } else { file = br } // 解析镜像地址 registry, repository, tag, err := parseImageAddress(address) if err != nil { // logger.ErrorCtx(ctx, "PushImage: parse image address failed, address=%s, err=%v", address, err) return fmt.Errorf("parse image address failed: %w", err) } // logger.DebugCtx(ctx, "PushImage: parsed image address, registry=%s, repository=%s, tag=%s", registry, repository, tag) // 创建 HTTP 客户端 client := tool.NewClient(opt.SkipTLSVerify, "") // 从 tar 文件中提取镜像信息 // logger.DebugCtx(ctx, "PushImage: extracting image from tar file") manifest, config, layers, err := extractImageFromTar(file) if err != nil { // logger.ErrorCtx(ctx, "PushImage: extract image from tar failed, err=%v", err) return fmt.Errorf("extract image from tar failed: %w", err) } // logger.DebugCtx(ctx, "PushImage: extracted image info, layers=%d, config_digest=%s", len(layers), config.digest) // 1. 上传所有层(layers) // logger.DebugCtx(ctx, "PushImage: uploading %d layers", len(layers)) for _, layer := range layers { // logger.DebugCtx(ctx, "PushImage: uploading layer %d/%d, digest=%s, size=%d", i+1, len(layers), layer.digest, len(layer.data)) if err = uploadBlob(ctx, client, registry, repository, layer.data, layer.digest, opt); err != nil { // logger.ErrorCtx(ctx, "PushImage: upload layer %s failed, err=%v", layer.digest, err) return fmt.Errorf("upload layer %s failed: %w", layer.digest, err) } // logger.DebugCtx(ctx, "PushImage: layer %d/%d uploaded successfully", i+1, len(layers)) } // 2. 上传配置(config) // logger.DebugCtx(ctx, "PushImage: uploading config, digest=%s, size=%d", config.digest, len(config.data)) if err = uploadBlob(ctx, client, registry, repository, config.data, config.digest, opt); err != nil { // logger.ErrorCtx(ctx, "PushImage: upload config failed, err=%v", err) return fmt.Errorf("upload config failed: %w", err) } // logger.DebugCtx(ctx, "PushImage: config uploaded successfully") // 3. 上传清单(manifest) // logger.DebugCtx(ctx, "PushImage: uploading manifest, tag=%s, size=%d", tag, len(manifest)) if err = uploadManifest(ctx, client, registry, repository, tag, manifest, opt); err != nil { // logger.ErrorCtx(ctx, "PushImage: upload manifest failed, err=%v", err) return fmt.Errorf("upload manifest failed: %w", err) } // logger.DebugCtx(ctx, "PushImage: image uploaded successfully, address=%s", address) return nil } // parseImageAddress 解析镜像地址 func parseImageAddress(address string) (registry, repository, tag string, err error) { parts := strings.SplitN(address, "/", 2) if len(parts) < 2 { return "", "", "", fmt.Errorf("invalid image address: %s", address) } registry = parts[0] // 分离 repository 和 tag repoParts := strings.SplitN(parts[1], ":", 2) repository = repoParts[0] if len(repoParts) == 2 { tag = repoParts[1] } else { tag = "latest" } //fmt.Printf("[DEBUG] parseImageAddress: address=%s, registry=%s, repository=%s, tag=%s\n", address, registry, repository, tag) return registry, repository, tag, nil } type blobData struct { digest string data []byte } // extractImageFromTar 从 tar 文件中提取镜像信息 func extractImageFromTar(file io.Reader) (manifest []byte, config blobData, layers []blobData, err error) { tr := tar.NewReader(file) // 存储文件内容 files := make(map[string][]byte) // 读取 tar 文件中的所有文件 for { hdr, err := tr.Next() if err == io.EOF { break } if err != nil { return nil, blobData{}, nil, err } if hdr.Typeflag == tar.TypeReg { data := make([]byte, hdr.Size) if _, err := io.ReadFull(tr, data); err != nil { return nil, blobData{}, nil, err } files[hdr.Name] = data } } // 读取 manifest.json manifestData, ok := files["manifest.json"] if !ok { return nil, blobData{}, nil, fmt.Errorf("manifest.json not found in tar") } // 解析 Docker manifest var dockerManifests []struct { Config string `json:"Config"` RepoTags []string `json:"RepoTags"` Layers []string `json:"Layers"` } if err := json.Unmarshal(manifestData, &dockerManifests); err != nil { return nil, blobData{}, nil, err } if len(dockerManifests) == 0 { return nil, blobData{}, nil, fmt.Errorf("no manifest found") } dockerManifest := dockerManifests[0] // 读取配置文件 configData, ok := files[dockerManifest.Config] if !ok { return nil, blobData{}, nil, fmt.Errorf("config file not found: %s", dockerManifest.Config) } configDigest := computeDigest(configData) config = blobData{ digest: configDigest, data: configData, } // 读取所有层 type layerDescriptor struct { MediaType string `json:"mediaType"` Digest string `json:"digest"` Size int64 `json:"size"` } var layerDescriptors []layerDescriptor for _, layerPath := range dockerManifest.Layers { layerData, ok := files[layerPath] if !ok { return nil, blobData{}, nil, fmt.Errorf("layer file not found: %s", layerPath) } layerDigest := computeDigest(layerData) layers = append(layers, blobData{ digest: layerDigest, data: layerData, }) layerDescriptors = append(layerDescriptors, layerDescriptor{ MediaType: "application/vnd.oci.image.layer.v1.tar+gzip", Digest: layerDigest, Size: int64(len(layerData)), }) } // 创建 OCI manifest ociManifest := map[string]interface{}{ "schemaVersion": 2, "mediaType": "application/vnd.oci.image.manifest.v1+json", "config": map[string]interface{}{ "mediaType": "application/vnd.oci.image.config.v1+json", "digest": configDigest, "size": int64(len(configData)), }, "layers": layerDescriptors, } manifest, err = json.Marshal(ociManifest) if err != nil { return nil, blobData{}, nil, err } return manifest, config, layers, nil } // computeDigest 计算数据的 SHA256 摘要 func computeDigest(data []byte) string { hash := sha256.Sum256(data) return fmt.Sprintf("sha256:%x", hash) } // uploadBlob 上传 blob(层或配置) func uploadBlob(ctx context.Context, client *http.Client, registry, repository string, data []byte, dgst string, opt *ociUploadOpt) error { scheme := "https" if opt.PlainHTTP { scheme = "http" } // logger.DebugCtx(ctx, "uploadBlob: uploading blob, registry=%s, repository=%s, digest=%s, size=%d", registry, repository, dgst, len(data)) // 1. 检查 blob 是否已存在 checkURL := fmt.Sprintf("%s://%s/v2/%s/blobs/%s", scheme, registry, repository, dgst) // logger.DebugCtx(ctx, "uploadBlob: checking blob existence, url=%s", checkURL) req, err := http.NewRequestWithContext(ctx, http.MethodHead, checkURL, nil) if err != nil { // logger.ErrorCtx(ctx, "uploadBlob: failed to create HEAD request, err=%v", err) return err } if opt.Username != "" && opt.Password != "" { req.SetBasicAuth(opt.Username, opt.Password) } resp, err := client.Do(req) if err == nil && resp.StatusCode == http.StatusOK { // logger.DebugCtx(ctx, "uploadBlob: blob already exists, skipping upload, digest=%s", dgst) resp.Body.Close() return nil } if resp != nil { resp.Body.Close() } // 2. 启动上传会话 uploadURL := fmt.Sprintf("%s://%s/v2/%s/blobs/uploads/", scheme, registry, repository) // logger.DebugCtx(ctx, "uploadBlob: starting upload session, url=%s", uploadURL) req, err = http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, nil) if err != nil { // logger.ErrorCtx(ctx, "uploadBlob: failed to create POST request, err=%v", err) return err } if opt.Username != "" && opt.Password != "" { req.SetBasicAuth(opt.Username, opt.Password) } resp, err = client.Do(req) if err != nil { // logger.ErrorCtx(ctx, "uploadBlob: failed to start upload session, err=%v", err) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusAccepted { // logger.ErrorCtx(ctx, "uploadBlob: start upload failed with status %d", resp.StatusCode) return fmt.Errorf("start upload failed: %d", resp.StatusCode) } // 3. 获取上传地址 location := resp.Header.Get("Location") if location == "" { // logger.ErrorCtx(ctx, "uploadBlob: no location header in upload response") return fmt.Errorf("no location header in upload response") } // logger.DebugCtx(ctx, "uploadBlob: got upload location, location=%s", location) // 处理相对路径 if !strings.HasPrefix(location, "http") { location = fmt.Sprintf("%s://%s%s", scheme, registry, location) // logger.DebugCtx(ctx, "uploadBlob: converted relative location to absolute, location=%s", location) } // 4. 上传数据 var uploadDataURL string if strings.Contains(location, "?") { uploadDataURL = fmt.Sprintf("%s&digest=%s", location, dgst) } else { uploadDataURL = fmt.Sprintf("%s?digest=%s", location, dgst) } // logger.DebugCtx(ctx, "uploadBlob: uploading data, url=%s", uploadDataURL) req, err = http.NewRequestWithContext(ctx, http.MethodPut, uploadDataURL, bytes.NewReader(data)) if err != nil { // logger.ErrorCtx(ctx, "uploadBlob: failed to create PUT request, err=%v", err) return err } req.Header.Set("Content-Type", "application/octet-stream") req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data))) if opt.Username != "" && opt.Password != "" { req.SetBasicAuth(opt.Username, opt.Password) } resp, err = client.Do(req) if err != nil { // logger.ErrorCtx(ctx, "uploadBlob: failed to upload blob data, err=%v", err) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusCreated { _ = resp.Body.Close() // logger.ErrorCtx(ctx, "uploadBlob: upload blob failed with status %d", resp.StatusCode) return fmt.Errorf("upload blob failed: %d", resp.StatusCode) } // logger.DebugCtx(ctx, "uploadBlob: blob uploaded successfully, digest=%s", dgst) return nil } // uploadManifest 上传清单 func uploadManifest(ctx context.Context, client *http.Client, registry, repository, tag string, manifest []byte, opt *ociUploadOpt) error { scheme := "https" if opt.PlainHTTP { scheme = "http" } manifestURL := fmt.Sprintf("%s://%s/v2/%s/manifests/%s", scheme, registry, repository, tag) // logger.DebugCtx(ctx, "uploadManifest: uploading manifest, url=%s, tag=%s, size=%d", manifestURL, tag, len(manifest)) req, err := http.NewRequestWithContext(ctx, http.MethodPut, manifestURL, bytes.NewReader(manifest)) if err != nil { // logger.ErrorCtx(ctx, "uploadManifest: failed to create PUT request, err=%v", err) return err } req.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") if opt.Username != "" && opt.Password != "" { req.SetBasicAuth(opt.Username, opt.Password) } resp, err := client.Do(req) if err != nil { // logger.ErrorCtx(ctx, "uploadManifest: failed to upload manifest, err=%v", err) return err } defer resp.Body.Close() if resp.StatusCode != http.StatusCreated { // logger.ErrorCtx(ctx, "uploadManifest: upload manifest failed with status %d, tag=%s", resp.StatusCode, tag) return fmt.Errorf("upload manifest failed: %d", resp.StatusCode) } // logger.DebugCtx(ctx, "uploadManifest: manifest uploaded successfully, tag=%s", tag) return nil }