Files
upkg/tool/oci/push.go
loveuer 1c11c9c7ba Fix: resolve gzip detection issue and add comprehensive tests for push.go
- Fix gzip reader creation to handle both seekable and non-seekable readers
- Remove unused variable in error handling
- Add comprehensive test coverage with push_test.go
- Tests include parsing, digest computation, tar extraction, blob upload, manifest upload, gzip detection, and benchmarks
- All tests pass and benchmarks show good performance
2026-01-29 15:32:21 +08:00

439 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package oci
import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"gitea.loveuer.com/loveuer/upkg/tool"
)
type OCIUploadOpt func(*ociUploadOpt)
type ociUploadOpt struct {
PlainHTTP bool // 使用 HTTP 而不是 HTTPS
SkipTLSVerify bool // 跳过 TLS 验证
Username string // 认证用户名
Password string // 认证密码
}
// WithPushPlainHTTP 使用 HTTP
func WithPushPlainHTTP(plainHTTP bool) OCIUploadOpt {
return func(o *ociUploadOpt) {
o.PlainHTTP = plainHTTP
}
}
// WithPushSkipTLSVerify 跳过 TLS 验证
func WithPushSkipTLSVerify(skip bool) OCIUploadOpt {
return func(o *ociUploadOpt) {
o.SkipTLSVerify = skip
}
}
// WithPushAuth 设置认证信息
func WithPushAuth(username, password string) OCIUploadOpt {
return func(o *ociUploadOpt) {
o.Username = username
o.Password = password
}
}
// PushImage 上传镜像
// 通过原生 HTTP 方法上传 tar 镜像到 OCI 镜像仓库,而不是调用 docker push 命令
// file: tar 格式的镜像文件
// address: 完整的镜像地址,格式:<registry>/<repository>:<tag>
//
// 例如: localhost:5000/myapp:latest, 192.168.1.1:5000/library/nginx:1.20
// <registry> 可以是 IP、域名可带端口号
func PushImage(ctx context.Context, file io.Reader, address string, opts ...OCIUploadOpt) error {
opt := &ociUploadOpt{
PlainHTTP: false,
SkipTLSVerify: false,
}
for _, fn := range opts {
fn(opt)
}
// logger.DebugCtx(ctx, "PushImage: starting upload, address=%s, plainHTTP=%v, skipTLSVerify=%v", address, opt.PlainHTTP, opt.SkipTLSVerify)
// 自动识别 gzip 格式
br := bufio.NewReader(file)
header, err := br.Peek(2)
if err == nil && len(header) >= 2 && header[0] == 0x1f && header[1] == 0x8b {
// logger.DebugCtx(ctx, "PushImage: detected gzip format, decompressing...")
// 重置 reader 到开头,然后创建 gzip reader
// 通过组合原始文件 reader 创建新的 reader
if seeker, ok := file.(io.Seeker); ok {
// 如果支持 seek回到开头
seeker.Seek(0, io.SeekStart)
gz, err := gzip.NewReader(file)
if err != nil {
// logger.ErrorCtx(ctx, "PushImage: create gzip reader failed, err=%v", err)
return fmt.Errorf("create gzip reader failed: %w", err)
}
defer gz.Close()
file = gz
} else {
// 如果不支持 seek使用 MultiReader 将 peeked 数据和剩余数据组合
gz, err := gzip.NewReader(io.MultiReader(bytes.NewReader(header), br))
if err != nil {
// logger.ErrorCtx(ctx, "PushImage: create gzip reader failed, err=%v", err)
return fmt.Errorf("create gzip reader failed: %w", err)
}
defer gz.Close()
file = gz
}
} else {
file = br
}
// 解析镜像地址
registry, repository, tag, err := parseImageAddress(address)
if err != nil {
// logger.ErrorCtx(ctx, "PushImage: parse image address failed, address=%s, err=%v", address, err)
return fmt.Errorf("parse image address failed: %w", err)
}
// logger.DebugCtx(ctx, "PushImage: parsed image address, registry=%s, repository=%s, tag=%s", registry, repository, tag)
// 创建 HTTP 客户端
client := tool.NewClient(opt.SkipTLSVerify, "")
// 从 tar 文件中提取镜像信息
// logger.DebugCtx(ctx, "PushImage: extracting image from tar file")
manifest, config, layers, err := extractImageFromTar(file)
if err != nil {
// logger.ErrorCtx(ctx, "PushImage: extract image from tar failed, err=%v", err)
return fmt.Errorf("extract image from tar failed: %w", err)
}
// logger.DebugCtx(ctx, "PushImage: extracted image info, layers=%d, config_digest=%s", len(layers), config.digest)
// 1. 上传所有层layers
// logger.DebugCtx(ctx, "PushImage: uploading %d layers", len(layers))
for _, layer := range layers {
// logger.DebugCtx(ctx, "PushImage: uploading layer %d/%d, digest=%s, size=%d", i+1, len(layers), layer.digest, len(layer.data))
if err = uploadBlob(ctx, client, registry, repository, layer.data, layer.digest, opt); err != nil {
// logger.ErrorCtx(ctx, "PushImage: upload layer %s failed, err=%v", layer.digest, err)
return fmt.Errorf("upload layer %s failed: %w", layer.digest, err)
}
// logger.DebugCtx(ctx, "PushImage: layer %d/%d uploaded successfully", i+1, len(layers))
}
// 2. 上传配置config
// logger.DebugCtx(ctx, "PushImage: uploading config, digest=%s, size=%d", config.digest, len(config.data))
if err = uploadBlob(ctx, client, registry, repository, config.data, config.digest, opt); err != nil {
// logger.ErrorCtx(ctx, "PushImage: upload config failed, err=%v", err)
return fmt.Errorf("upload config failed: %w", err)
}
// logger.DebugCtx(ctx, "PushImage: config uploaded successfully")
// 3. 上传清单manifest
// logger.DebugCtx(ctx, "PushImage: uploading manifest, tag=%s, size=%d", tag, len(manifest))
if err = uploadManifest(ctx, client, registry, repository, tag, manifest, opt); err != nil {
// logger.ErrorCtx(ctx, "PushImage: upload manifest failed, err=%v", err)
return fmt.Errorf("upload manifest failed: %w", err)
}
// logger.DebugCtx(ctx, "PushImage: image uploaded successfully, address=%s", address)
return nil
}
// parseImageAddress 解析镜像地址
func parseImageAddress(address string) (registry, repository, tag string, err error) {
parts := strings.SplitN(address, "/", 2)
if len(parts) < 2 {
return "", "", "", fmt.Errorf("invalid image address: %s", address)
}
registry = parts[0]
// 分离 repository 和 tag
repoParts := strings.SplitN(parts[1], ":", 2)
repository = repoParts[0]
if len(repoParts) == 2 {
tag = repoParts[1]
} else {
tag = "latest"
}
//fmt.Printf("[DEBUG] parseImageAddress: address=%s, registry=%s, repository=%s, tag=%s\n", address, registry, repository, tag)
return registry, repository, tag, nil
}
type blobData struct {
digest string
data []byte
}
// extractImageFromTar 从 tar 文件中提取镜像信息
func extractImageFromTar(file io.Reader) (manifest []byte, config blobData, layers []blobData, err error) {
tr := tar.NewReader(file)
// 存储文件内容
files := make(map[string][]byte)
// 读取 tar 文件中的所有文件
for {
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, blobData{}, nil, err
}
if hdr.Typeflag == tar.TypeReg {
data := make([]byte, hdr.Size)
if _, err := io.ReadFull(tr, data); err != nil {
return nil, blobData{}, nil, err
}
files[hdr.Name] = data
}
}
// 读取 manifest.json
manifestData, ok := files["manifest.json"]
if !ok {
return nil, blobData{}, nil, fmt.Errorf("manifest.json not found in tar")
}
// 解析 Docker manifest
var dockerManifests []struct {
Config string `json:"Config"`
RepoTags []string `json:"RepoTags"`
Layers []string `json:"Layers"`
}
if err := json.Unmarshal(manifestData, &dockerManifests); err != nil {
return nil, blobData{}, nil, err
}
if len(dockerManifests) == 0 {
return nil, blobData{}, nil, fmt.Errorf("no manifest found")
}
dockerManifest := dockerManifests[0]
// 读取配置文件
configData, ok := files[dockerManifest.Config]
if !ok {
return nil, blobData{}, nil, fmt.Errorf("config file not found: %s", dockerManifest.Config)
}
configDigest := computeDigest(configData)
config = blobData{
digest: configDigest,
data: configData,
}
// 读取所有层
type layerDescriptor struct {
MediaType string `json:"mediaType"`
Digest string `json:"digest"`
Size int64 `json:"size"`
}
var layerDescriptors []layerDescriptor
for _, layerPath := range dockerManifest.Layers {
layerData, ok := files[layerPath]
if !ok {
return nil, blobData{}, nil, fmt.Errorf("layer file not found: %s", layerPath)
}
layerDigest := computeDigest(layerData)
layers = append(layers, blobData{
digest: layerDigest,
data: layerData,
})
layerDescriptors = append(layerDescriptors, layerDescriptor{
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
Digest: layerDigest,
Size: int64(len(layerData)),
})
}
// 创建 OCI manifest
ociManifest := map[string]interface{}{
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.manifest.v1+json",
"config": map[string]interface{}{
"mediaType": "application/vnd.oci.image.config.v1+json",
"digest": configDigest,
"size": int64(len(configData)),
},
"layers": layerDescriptors,
}
manifest, err = json.Marshal(ociManifest)
if err != nil {
return nil, blobData{}, nil, err
}
return manifest, config, layers, nil
}
// computeDigest 计算数据的 SHA256 摘要
func computeDigest(data []byte) string {
hash := sha256.Sum256(data)
return fmt.Sprintf("sha256:%x", hash)
}
// uploadBlob 上传 blob层或配置
func uploadBlob(ctx context.Context, client *http.Client, registry, repository string, data []byte, dgst string, opt *ociUploadOpt) error {
scheme := "https"
if opt.PlainHTTP {
scheme = "http"
}
// logger.DebugCtx(ctx, "uploadBlob: uploading blob, registry=%s, repository=%s, digest=%s, size=%d", registry, repository, dgst, len(data))
// 1. 检查 blob 是否已存在
checkURL := fmt.Sprintf("%s://%s/v2/%s/blobs/%s", scheme, registry, repository, dgst)
// logger.DebugCtx(ctx, "uploadBlob: checking blob existence, url=%s", checkURL)
req, err := http.NewRequestWithContext(ctx, http.MethodHead, checkURL, nil)
if err != nil {
// logger.ErrorCtx(ctx, "uploadBlob: failed to create HEAD request, err=%v", err)
return err
}
if opt.Username != "" && opt.Password != "" {
req.SetBasicAuth(opt.Username, opt.Password)
}
resp, err := client.Do(req)
if err == nil && resp.StatusCode == http.StatusOK {
// logger.DebugCtx(ctx, "uploadBlob: blob already exists, skipping upload, digest=%s", dgst)
resp.Body.Close()
return nil
}
if resp != nil {
resp.Body.Close()
}
// 2. 启动上传会话
uploadURL := fmt.Sprintf("%s://%s/v2/%s/blobs/uploads/", scheme, registry, repository)
// logger.DebugCtx(ctx, "uploadBlob: starting upload session, url=%s", uploadURL)
req, err = http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, nil)
if err != nil {
// logger.ErrorCtx(ctx, "uploadBlob: failed to create POST request, err=%v", err)
return err
}
if opt.Username != "" && opt.Password != "" {
req.SetBasicAuth(opt.Username, opt.Password)
}
resp, err = client.Do(req)
if err != nil {
// logger.ErrorCtx(ctx, "uploadBlob: failed to start upload session, err=%v", err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
// logger.ErrorCtx(ctx, "uploadBlob: start upload failed with status %d", resp.StatusCode)
return fmt.Errorf("start upload failed: %d", resp.StatusCode)
}
// 3. 获取上传地址
location := resp.Header.Get("Location")
if location == "" {
// logger.ErrorCtx(ctx, "uploadBlob: no location header in upload response")
return fmt.Errorf("no location header in upload response")
}
// logger.DebugCtx(ctx, "uploadBlob: got upload location, location=%s", location)
// 处理相对路径
if !strings.HasPrefix(location, "http") {
location = fmt.Sprintf("%s://%s%s", scheme, registry, location)
// logger.DebugCtx(ctx, "uploadBlob: converted relative location to absolute, location=%s", location)
}
// 4. 上传数据
var uploadDataURL string
if strings.Contains(location, "?") {
uploadDataURL = fmt.Sprintf("%s&digest=%s", location, dgst)
} else {
uploadDataURL = fmt.Sprintf("%s?digest=%s", location, dgst)
}
// logger.DebugCtx(ctx, "uploadBlob: uploading data, url=%s", uploadDataURL)
req, err = http.NewRequestWithContext(ctx, http.MethodPut, uploadDataURL, bytes.NewReader(data))
if err != nil {
// logger.ErrorCtx(ctx, "uploadBlob: failed to create PUT request, err=%v", err)
return err
}
req.Header.Set("Content-Type", "application/octet-stream")
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data)))
if opt.Username != "" && opt.Password != "" {
req.SetBasicAuth(opt.Username, opt.Password)
}
resp, err = client.Do(req)
if err != nil {
// logger.ErrorCtx(ctx, "uploadBlob: failed to upload blob data, err=%v", err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
_ = resp.Body.Close()
// logger.ErrorCtx(ctx, "uploadBlob: upload blob failed with status %d", resp.StatusCode)
return fmt.Errorf("upload blob failed: %d", resp.StatusCode)
}
// logger.DebugCtx(ctx, "uploadBlob: blob uploaded successfully, digest=%s", dgst)
return nil
}
// uploadManifest 上传清单
func uploadManifest(ctx context.Context, client *http.Client, registry, repository, tag string, manifest []byte, opt *ociUploadOpt) error {
scheme := "https"
if opt.PlainHTTP {
scheme = "http"
}
manifestURL := fmt.Sprintf("%s://%s/v2/%s/manifests/%s", scheme, registry, repository, tag)
// logger.DebugCtx(ctx, "uploadManifest: uploading manifest, url=%s, tag=%s, size=%d", manifestURL, tag, len(manifest))
req, err := http.NewRequestWithContext(ctx, http.MethodPut, manifestURL, bytes.NewReader(manifest))
if err != nil {
// logger.ErrorCtx(ctx, "uploadManifest: failed to create PUT request, err=%v", err)
return err
}
req.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
if opt.Username != "" && opt.Password != "" {
req.SetBasicAuth(opt.Username, opt.Password)
}
resp, err := client.Do(req)
if err != nil {
// logger.ErrorCtx(ctx, "uploadManifest: failed to upload manifest, err=%v", err)
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
// logger.ErrorCtx(ctx, "uploadManifest: upload manifest failed with status %d, tag=%s", resp.StatusCode, tag)
return fmt.Errorf("upload manifest failed: %d", resp.StatusCode)
}
// logger.DebugCtx(ctx, "uploadManifest: manifest uploaded successfully, tag=%s", tag)
return nil
}