- Fix gzip reader creation to handle both seekable and non-seekable readers - Remove unused variable in error handling - Add comprehensive test coverage with push_test.go - Tests include parsing, digest computation, tar extraction, blob upload, manifest upload, gzip detection, and benchmarks - All tests pass and benchmarks show good performance
439 lines
14 KiB
Go
439 lines
14 KiB
Go
package oci
|
||
|
||
import (
|
||
"archive/tar"
|
||
"bufio"
|
||
"bytes"
|
||
"compress/gzip"
|
||
"context"
|
||
"crypto/sha256"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
|
||
"gitea.loveuer.com/loveuer/upkg/tool"
|
||
)
|
||
|
||
type OCIUploadOpt func(*ociUploadOpt)
|
||
|
||
type ociUploadOpt struct {
|
||
PlainHTTP bool // 使用 HTTP 而不是 HTTPS
|
||
SkipTLSVerify bool // 跳过 TLS 验证
|
||
Username string // 认证用户名
|
||
Password string // 认证密码
|
||
}
|
||
|
||
// WithPushPlainHTTP 使用 HTTP
|
||
func WithPushPlainHTTP(plainHTTP bool) OCIUploadOpt {
|
||
return func(o *ociUploadOpt) {
|
||
o.PlainHTTP = plainHTTP
|
||
}
|
||
}
|
||
|
||
// WithPushSkipTLSVerify 跳过 TLS 验证
|
||
func WithPushSkipTLSVerify(skip bool) OCIUploadOpt {
|
||
return func(o *ociUploadOpt) {
|
||
o.SkipTLSVerify = skip
|
||
}
|
||
}
|
||
|
||
// WithPushAuth 设置认证信息
|
||
func WithPushAuth(username, password string) OCIUploadOpt {
|
||
return func(o *ociUploadOpt) {
|
||
o.Username = username
|
||
o.Password = password
|
||
}
|
||
}
|
||
|
||
// PushImage 上传镜像
|
||
// 通过原生 HTTP 方法上传 tar 镜像到 OCI 镜像仓库,而不是调用 docker push 命令
|
||
// file: tar 格式的镜像文件
|
||
// address: 完整的镜像地址,格式:<registry>/<repository>:<tag>
|
||
//
|
||
// 例如: localhost:5000/myapp:latest, 192.168.1.1:5000/library/nginx:1.20
|
||
// <registry> 可以是 IP、域名,可带端口号
|
||
func PushImage(ctx context.Context, file io.Reader, address string, opts ...OCIUploadOpt) error {
|
||
opt := &ociUploadOpt{
|
||
PlainHTTP: false,
|
||
SkipTLSVerify: false,
|
||
}
|
||
|
||
for _, fn := range opts {
|
||
fn(opt)
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "PushImage: starting upload, address=%s, plainHTTP=%v, skipTLSVerify=%v", address, opt.PlainHTTP, opt.SkipTLSVerify)
|
||
|
||
// 自动识别 gzip 格式
|
||
br := bufio.NewReader(file)
|
||
header, err := br.Peek(2)
|
||
if err == nil && len(header) >= 2 && header[0] == 0x1f && header[1] == 0x8b {
|
||
// logger.DebugCtx(ctx, "PushImage: detected gzip format, decompressing...")
|
||
// 重置 reader 到开头,然后创建 gzip reader
|
||
// 通过组合原始文件 reader 创建新的 reader
|
||
if seeker, ok := file.(io.Seeker); ok {
|
||
// 如果支持 seek,回到开头
|
||
seeker.Seek(0, io.SeekStart)
|
||
gz, err := gzip.NewReader(file)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: create gzip reader failed, err=%v", err)
|
||
return fmt.Errorf("create gzip reader failed: %w", err)
|
||
}
|
||
defer gz.Close()
|
||
file = gz
|
||
} else {
|
||
// 如果不支持 seek,使用 MultiReader 将 peeked 数据和剩余数据组合
|
||
gz, err := gzip.NewReader(io.MultiReader(bytes.NewReader(header), br))
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: create gzip reader failed, err=%v", err)
|
||
return fmt.Errorf("create gzip reader failed: %w", err)
|
||
}
|
||
defer gz.Close()
|
||
file = gz
|
||
}
|
||
} else {
|
||
file = br
|
||
}
|
||
|
||
// 解析镜像地址
|
||
registry, repository, tag, err := parseImageAddress(address)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: parse image address failed, address=%s, err=%v", address, err)
|
||
return fmt.Errorf("parse image address failed: %w", err)
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "PushImage: parsed image address, registry=%s, repository=%s, tag=%s", registry, repository, tag)
|
||
|
||
// 创建 HTTP 客户端
|
||
client := tool.NewClient(opt.SkipTLSVerify, "")
|
||
|
||
// 从 tar 文件中提取镜像信息
|
||
// logger.DebugCtx(ctx, "PushImage: extracting image from tar file")
|
||
manifest, config, layers, err := extractImageFromTar(file)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: extract image from tar failed, err=%v", err)
|
||
return fmt.Errorf("extract image from tar failed: %w", err)
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "PushImage: extracted image info, layers=%d, config_digest=%s", len(layers), config.digest)
|
||
|
||
// 1. 上传所有层(layers)
|
||
// logger.DebugCtx(ctx, "PushImage: uploading %d layers", len(layers))
|
||
for _, layer := range layers {
|
||
// logger.DebugCtx(ctx, "PushImage: uploading layer %d/%d, digest=%s, size=%d", i+1, len(layers), layer.digest, len(layer.data))
|
||
if err = uploadBlob(ctx, client, registry, repository, layer.data, layer.digest, opt); err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: upload layer %s failed, err=%v", layer.digest, err)
|
||
return fmt.Errorf("upload layer %s failed: %w", layer.digest, err)
|
||
}
|
||
// logger.DebugCtx(ctx, "PushImage: layer %d/%d uploaded successfully", i+1, len(layers))
|
||
}
|
||
|
||
// 2. 上传配置(config)
|
||
// logger.DebugCtx(ctx, "PushImage: uploading config, digest=%s, size=%d", config.digest, len(config.data))
|
||
if err = uploadBlob(ctx, client, registry, repository, config.data, config.digest, opt); err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: upload config failed, err=%v", err)
|
||
return fmt.Errorf("upload config failed: %w", err)
|
||
}
|
||
// logger.DebugCtx(ctx, "PushImage: config uploaded successfully")
|
||
|
||
// 3. 上传清单(manifest)
|
||
// logger.DebugCtx(ctx, "PushImage: uploading manifest, tag=%s, size=%d", tag, len(manifest))
|
||
if err = uploadManifest(ctx, client, registry, repository, tag, manifest, opt); err != nil {
|
||
// logger.ErrorCtx(ctx, "PushImage: upload manifest failed, err=%v", err)
|
||
return fmt.Errorf("upload manifest failed: %w", err)
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "PushImage: image uploaded successfully, address=%s", address)
|
||
return nil
|
||
}
|
||
|
||
// parseImageAddress 解析镜像地址
|
||
func parseImageAddress(address string) (registry, repository, tag string, err error) {
|
||
parts := strings.SplitN(address, "/", 2)
|
||
if len(parts) < 2 {
|
||
return "", "", "", fmt.Errorf("invalid image address: %s", address)
|
||
}
|
||
|
||
registry = parts[0]
|
||
|
||
// 分离 repository 和 tag
|
||
repoParts := strings.SplitN(parts[1], ":", 2)
|
||
repository = repoParts[0]
|
||
if len(repoParts) == 2 {
|
||
tag = repoParts[1]
|
||
} else {
|
||
tag = "latest"
|
||
}
|
||
|
||
//fmt.Printf("[DEBUG] parseImageAddress: address=%s, registry=%s, repository=%s, tag=%s\n", address, registry, repository, tag)
|
||
|
||
return registry, repository, tag, nil
|
||
}
|
||
|
||
type blobData struct {
|
||
digest string
|
||
data []byte
|
||
}
|
||
|
||
// extractImageFromTar 从 tar 文件中提取镜像信息
|
||
func extractImageFromTar(file io.Reader) (manifest []byte, config blobData, layers []blobData, err error) {
|
||
tr := tar.NewReader(file)
|
||
|
||
// 存储文件内容
|
||
files := make(map[string][]byte)
|
||
|
||
// 读取 tar 文件中的所有文件
|
||
for {
|
||
hdr, err := tr.Next()
|
||
if err == io.EOF {
|
||
break
|
||
}
|
||
if err != nil {
|
||
return nil, blobData{}, nil, err
|
||
}
|
||
|
||
if hdr.Typeflag == tar.TypeReg {
|
||
data := make([]byte, hdr.Size)
|
||
if _, err := io.ReadFull(tr, data); err != nil {
|
||
return nil, blobData{}, nil, err
|
||
}
|
||
files[hdr.Name] = data
|
||
}
|
||
}
|
||
|
||
// 读取 manifest.json
|
||
manifestData, ok := files["manifest.json"]
|
||
if !ok {
|
||
return nil, blobData{}, nil, fmt.Errorf("manifest.json not found in tar")
|
||
}
|
||
|
||
// 解析 Docker manifest
|
||
var dockerManifests []struct {
|
||
Config string `json:"Config"`
|
||
RepoTags []string `json:"RepoTags"`
|
||
Layers []string `json:"Layers"`
|
||
}
|
||
if err := json.Unmarshal(manifestData, &dockerManifests); err != nil {
|
||
return nil, blobData{}, nil, err
|
||
}
|
||
|
||
if len(dockerManifests) == 0 {
|
||
return nil, blobData{}, nil, fmt.Errorf("no manifest found")
|
||
}
|
||
|
||
dockerManifest := dockerManifests[0]
|
||
|
||
// 读取配置文件
|
||
configData, ok := files[dockerManifest.Config]
|
||
if !ok {
|
||
return nil, blobData{}, nil, fmt.Errorf("config file not found: %s", dockerManifest.Config)
|
||
}
|
||
|
||
configDigest := computeDigest(configData)
|
||
config = blobData{
|
||
digest: configDigest,
|
||
data: configData,
|
||
}
|
||
|
||
// 读取所有层
|
||
type layerDescriptor struct {
|
||
MediaType string `json:"mediaType"`
|
||
Digest string `json:"digest"`
|
||
Size int64 `json:"size"`
|
||
}
|
||
|
||
var layerDescriptors []layerDescriptor
|
||
for _, layerPath := range dockerManifest.Layers {
|
||
layerData, ok := files[layerPath]
|
||
if !ok {
|
||
return nil, blobData{}, nil, fmt.Errorf("layer file not found: %s", layerPath)
|
||
}
|
||
|
||
layerDigest := computeDigest(layerData)
|
||
layers = append(layers, blobData{
|
||
digest: layerDigest,
|
||
data: layerData,
|
||
})
|
||
|
||
layerDescriptors = append(layerDescriptors, layerDescriptor{
|
||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||
Digest: layerDigest,
|
||
Size: int64(len(layerData)),
|
||
})
|
||
}
|
||
|
||
// 创建 OCI manifest
|
||
ociManifest := map[string]interface{}{
|
||
"schemaVersion": 2,
|
||
"mediaType": "application/vnd.oci.image.manifest.v1+json",
|
||
"config": map[string]interface{}{
|
||
"mediaType": "application/vnd.oci.image.config.v1+json",
|
||
"digest": configDigest,
|
||
"size": int64(len(configData)),
|
||
},
|
||
"layers": layerDescriptors,
|
||
}
|
||
|
||
manifest, err = json.Marshal(ociManifest)
|
||
if err != nil {
|
||
return nil, blobData{}, nil, err
|
||
}
|
||
|
||
return manifest, config, layers, nil
|
||
}
|
||
|
||
// computeDigest 计算数据的 SHA256 摘要
|
||
func computeDigest(data []byte) string {
|
||
hash := sha256.Sum256(data)
|
||
return fmt.Sprintf("sha256:%x", hash)
|
||
}
|
||
|
||
// uploadBlob 上传 blob(层或配置)
|
||
func uploadBlob(ctx context.Context, client *http.Client, registry, repository string, data []byte, dgst string, opt *ociUploadOpt) error {
|
||
scheme := "https"
|
||
if opt.PlainHTTP {
|
||
scheme = "http"
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "uploadBlob: uploading blob, registry=%s, repository=%s, digest=%s, size=%d", registry, repository, dgst, len(data))
|
||
|
||
// 1. 检查 blob 是否已存在
|
||
checkURL := fmt.Sprintf("%s://%s/v2/%s/blobs/%s", scheme, registry, repository, dgst)
|
||
// logger.DebugCtx(ctx, "uploadBlob: checking blob existence, url=%s", checkURL)
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodHead, checkURL, nil)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: failed to create HEAD request, err=%v", err)
|
||
return err
|
||
}
|
||
|
||
if opt.Username != "" && opt.Password != "" {
|
||
req.SetBasicAuth(opt.Username, opt.Password)
|
||
}
|
||
|
||
resp, err := client.Do(req)
|
||
if err == nil && resp.StatusCode == http.StatusOK {
|
||
// logger.DebugCtx(ctx, "uploadBlob: blob already exists, skipping upload, digest=%s", dgst)
|
||
resp.Body.Close()
|
||
return nil
|
||
}
|
||
if resp != nil {
|
||
resp.Body.Close()
|
||
}
|
||
|
||
// 2. 启动上传会话
|
||
uploadURL := fmt.Sprintf("%s://%s/v2/%s/blobs/uploads/", scheme, registry, repository)
|
||
// logger.DebugCtx(ctx, "uploadBlob: starting upload session, url=%s", uploadURL)
|
||
req, err = http.NewRequestWithContext(ctx, http.MethodPost, uploadURL, nil)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: failed to create POST request, err=%v", err)
|
||
return err
|
||
}
|
||
|
||
if opt.Username != "" && opt.Password != "" {
|
||
req.SetBasicAuth(opt.Username, opt.Password)
|
||
}
|
||
|
||
resp, err = client.Do(req)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: failed to start upload session, err=%v", err)
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusAccepted {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: start upload failed with status %d", resp.StatusCode)
|
||
return fmt.Errorf("start upload failed: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 3. 获取上传地址
|
||
location := resp.Header.Get("Location")
|
||
if location == "" {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: no location header in upload response")
|
||
return fmt.Errorf("no location header in upload response")
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "uploadBlob: got upload location, location=%s", location)
|
||
|
||
// 处理相对路径
|
||
if !strings.HasPrefix(location, "http") {
|
||
location = fmt.Sprintf("%s://%s%s", scheme, registry, location)
|
||
// logger.DebugCtx(ctx, "uploadBlob: converted relative location to absolute, location=%s", location)
|
||
}
|
||
|
||
// 4. 上传数据
|
||
var uploadDataURL string
|
||
if strings.Contains(location, "?") {
|
||
uploadDataURL = fmt.Sprintf("%s&digest=%s", location, dgst)
|
||
} else {
|
||
uploadDataURL = fmt.Sprintf("%s?digest=%s", location, dgst)
|
||
}
|
||
// logger.DebugCtx(ctx, "uploadBlob: uploading data, url=%s", uploadDataURL)
|
||
req, err = http.NewRequestWithContext(ctx, http.MethodPut, uploadDataURL, bytes.NewReader(data))
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: failed to create PUT request, err=%v", err)
|
||
return err
|
||
}
|
||
|
||
req.Header.Set("Content-Type", "application/octet-stream")
|
||
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(data)))
|
||
|
||
if opt.Username != "" && opt.Password != "" {
|
||
req.SetBasicAuth(opt.Username, opt.Password)
|
||
}
|
||
|
||
resp, err = client.Do(req)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadBlob: failed to upload blob data, err=%v", err)
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusCreated {
|
||
_ = resp.Body.Close()
|
||
// logger.ErrorCtx(ctx, "uploadBlob: upload blob failed with status %d", resp.StatusCode)
|
||
return fmt.Errorf("upload blob failed: %d", resp.StatusCode)
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "uploadBlob: blob uploaded successfully, digest=%s", dgst)
|
||
return nil
|
||
}
|
||
|
||
// uploadManifest 上传清单
|
||
func uploadManifest(ctx context.Context, client *http.Client, registry, repository, tag string, manifest []byte, opt *ociUploadOpt) error {
|
||
scheme := "https"
|
||
if opt.PlainHTTP {
|
||
scheme = "http"
|
||
}
|
||
|
||
manifestURL := fmt.Sprintf("%s://%s/v2/%s/manifests/%s", scheme, registry, repository, tag)
|
||
// logger.DebugCtx(ctx, "uploadManifest: uploading manifest, url=%s, tag=%s, size=%d", manifestURL, tag, len(manifest))
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, manifestURL, bytes.NewReader(manifest))
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadManifest: failed to create PUT request, err=%v", err)
|
||
return err
|
||
}
|
||
|
||
req.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
|
||
|
||
if opt.Username != "" && opt.Password != "" {
|
||
req.SetBasicAuth(opt.Username, opt.Password)
|
||
}
|
||
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
// logger.ErrorCtx(ctx, "uploadManifest: failed to upload manifest, err=%v", err)
|
||
return err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusCreated {
|
||
// logger.ErrorCtx(ctx, "uploadManifest: upload manifest failed with status %d, tag=%s", resp.StatusCode, tag)
|
||
return fmt.Errorf("upload manifest failed: %d", resp.StatusCode)
|
||
}
|
||
|
||
// logger.DebugCtx(ctx, "uploadManifest: manifest uploaded successfully, tag=%s", tag)
|
||
return nil
|
||
}
|