Files
cluster/internal/module/k8s/handler.resource.go

802 lines
22 KiB
Go

package k8s
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"gitea.loveuer.com/loveuer/cluster/internal/model"
"gitea.loveuer.com/loveuer/cluster/pkg/resp"
"gitea.loveuer.com/loveuer/cluster/pkg/store"
"github.com/gofiber/fiber/v3"
"gorm.io/gorm"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/yaml"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) {
var config model.ClusterConfig
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
return nil, fmt.Errorf("kubeconfig not found: %w", err)
}
if config.Value == "" {
return nil, fmt.Errorf("kubeconfig is empty")
}
clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value))
if err != nil {
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
}
clientConfig.Timeout = 0
clientset, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}
return clientset, nil
}
func getK8sConfig(db *gorm.DB) (*rest.Config, error) {
var config model.ClusterConfig
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
return nil, fmt.Errorf("kubeconfig not found: %w", err)
}
if config.Value == "" {
return nil, fmt.Errorf("kubeconfig is empty")
}
clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value))
if err != nil {
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
}
// Disable HTTP/2 to avoid stream closing issues
clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"}
clientConfig.Timeout = 0
return clientConfig, nil
}
func K8sNamespaceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
namespaces, err := clientset.CoreV1().Namespaces().List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
return resp.R200(c, map[string]any{
"items": namespaces.Items,
})
}
}
func K8sDeploymentList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
namespace := c.Query("namespace", "")
name := c.Query("name", "")
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
deployments, err := clientset.AppsV1().Deployments(namespace).List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
// Filter by name if provided
var filtered []appsv1.Deployment
if name != "" {
for _, deployment := range deployments.Items {
if strings.Contains(deployment.Name, name) {
filtered = append(filtered, deployment)
}
}
} else {
filtered = deployments.Items
}
return resp.R200(c, map[string]any{
"items": filtered,
})
}
}
func K8sStatefulSetList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
namespace := c.Query("namespace", "")
name := c.Query("name", "")
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
statefulsets, err := clientset.AppsV1().StatefulSets(namespace).List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
// Filter by name if provided
var filtered []appsv1.StatefulSet
if name != "" {
for _, statefulset := range statefulsets.Items {
if strings.Contains(statefulset.Name, name) {
filtered = append(filtered, statefulset)
}
}
} else {
filtered = statefulsets.Items
}
return resp.R200(c, map[string]any{
"items": filtered,
})
}
}
func K8sConfigMapList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
namespace := c.Query("namespace", "")
name := c.Query("name", "")
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
configmaps, err := clientset.CoreV1().ConfigMaps(namespace).List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
// Filter by name if provided
var filtered []corev1.ConfigMap
if name != "" {
for _, configmap := range configmaps.Items {
if strings.Contains(configmap.Name, name) {
filtered = append(filtered, configmap)
}
}
} else {
filtered = configmaps.Items
}
return resp.R200(c, map[string]any{
"items": filtered,
})
}
}
func K8sPodList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
namespace := c.Query("namespace", "")
name := c.Query("name", "")
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
pods, err := clientset.CoreV1().Pods(namespace).List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
// Filter by name if provided
var filtered []corev1.Pod
if name != "" {
for _, pod := range pods.Items {
if strings.Contains(pod.Name, name) {
filtered = append(filtered, pod)
}
}
} else {
filtered = pods.Items
}
return resp.R200(c, map[string]any{
"items": filtered,
})
}
}
func K8sPVList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
pvs, err := clientset.CoreV1().PersistentVolumes().List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
return resp.R200(c, map[string]any{
"items": pvs.Items,
})
}
}
func K8sPVCList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
namespace := c.Query("namespace", "")
name := c.Query("name", "")
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
pvcs, err := clientset.CoreV1().PersistentVolumeClaims(namespace).List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
// Filter by name if provided
var filtered []corev1.PersistentVolumeClaim
if name != "" {
for _, pvc := range pvcs.Items {
if strings.Contains(pvc.Name, name) {
filtered = append(filtered, pvc)
}
}
} else {
filtered = pvcs.Items
}
return resp.R200(c, map[string]any{
"items": filtered,
})
}
}
func K8sServiceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
namespace := c.Query("namespace", "")
name := c.Query("name", "")
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
services, err := clientset.CoreV1().Services(namespace).List(c.Context(), metav1.ListOptions{})
if err != nil {
return resp.R500(c, "", nil, err)
}
// Filter by name if provided
var filtered []corev1.Service
if name != "" {
for _, service := range services.Items {
if strings.Contains(service.Name, name) {
filtered = append(filtered, service)
}
}
} else {
filtered = services.Items
}
return resp.R200(c, map[string]any{
"items": filtered,
})
}
}
func K8sResourceGet(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
name := c.Query("name", "")
namespace := c.Query("namespace", "")
kind := c.Query("kind", "")
if name == "" || kind == "" {
return resp.R400(c, "", nil, fmt.Errorf("name and kind are required"))
}
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
var yamlData []byte
switch kind {
case "Deployment":
var deployment *appsv1.Deployment
if namespace != "" {
deployment, err = clientset.AppsV1().Deployments(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Deployment"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get deployment: %w", err))
}
yamlData, err = yaml.Marshal(deployment)
case "StatefulSet":
var statefulset *appsv1.StatefulSet
if namespace != "" {
statefulset, err = clientset.AppsV1().StatefulSets(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for StatefulSet"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get statefulset: %w", err))
}
yamlData, err = yaml.Marshal(statefulset)
case "Service":
var service *corev1.Service
if namespace != "" {
service, err = clientset.CoreV1().Services(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Service"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get service: %w", err))
}
yamlData, err = yaml.Marshal(service)
case "ConfigMap":
var configmap *corev1.ConfigMap
if namespace != "" {
configmap, err = clientset.CoreV1().ConfigMaps(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for ConfigMap"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get configmap: %w", err))
}
yamlData, err = yaml.Marshal(configmap)
default:
return resp.R400(c, "", nil, fmt.Errorf("unsupported resource kind: %s", kind))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to marshal resource to yaml: %w", err))
}
return resp.R200(c, map[string]any{
"yaml": string(yamlData),
})
}
}
func getResourceName(kind string) string {
kindToResource := map[string]string{
"Namespace": "namespaces",
"Deployment": "deployments",
"StatefulSet": "statefulsets",
"Service": "services",
"ConfigMap": "configmaps",
"Pod": "pods",
"PersistentVolume": "persistentvolumes",
"PersistentVolumeClaim": "persistentvolumeclaims",
"Secret": "secrets",
"Ingress": "ingresses",
"DaemonSet": "daemonsets",
"Job": "jobs",
"CronJob": "cronjobs",
"ReplicaSet": "replicasets",
"ServiceAccount": "serviceaccounts",
"Role": "roles",
"RoleBinding": "rolebindings",
"ClusterRole": "clusterroles",
"ClusterRoleBinding": "clusterrolebindings",
}
if resource, ok := kindToResource[kind]; ok {
return resource
}
return kind + "s"
}
func K8sResourceApply(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
var req struct {
Yaml string `json:"yaml"`
}
if err := json.Unmarshal(c.Body(), &req); err != nil {
return resp.R400(c, "", nil, err)
}
if req.Yaml == "" {
return resp.R400(c, "", nil, fmt.Errorf("yaml content is empty"))
}
var config model.ClusterConfig
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
return resp.R500(c, "", nil, fmt.Errorf("kubeconfig not found: %w", err))
}
if config.Value == "" {
return resp.R500(c, "", nil, fmt.Errorf("kubeconfig is empty"))
}
clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value))
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to parse kubeconfig: %w", err))
}
// Force HTTP/1.1 to avoid stream closing issues
clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"}
clientConfig.Timeout = 0
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to create dynamic client: %w", err))
}
var obj unstructured.Unstructured
if err := yaml.Unmarshal([]byte(req.Yaml), &obj); err != nil {
return resp.R400(c, "", nil, fmt.Errorf("failed to parse yaml: %w", err))
}
gvk := obj.GroupVersionKind()
namespace := obj.GetNamespace()
name := obj.GetName()
if name == "" {
return resp.R400(c, "", nil, fmt.Errorf("resource name is required"))
}
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: getResourceName(gvk.Kind),
}
var result *unstructured.Unstructured
if namespace != "" {
result, err = dynamicClient.Resource(gvr).Namespace(namespace).Create(c.Context(), &obj, metav1.CreateOptions{})
} else {
result, err = dynamicClient.Resource(gvr).Create(c.Context(), &obj, metav1.CreateOptions{})
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to apply resource: %w", err))
}
return resp.R200(c, map[string]any{
"name": result.GetName(),
"namespace": result.GetNamespace(),
"kind": result.GetKind(),
})
}
}
func K8sResourceFetch(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
name := c.Query("name", "")
namespace := c.Query("namespace", "")
kind := c.Query("kind", "")
if name == "" || kind == "" {
return resp.R400(c, "", nil, fmt.Errorf("name and kind are required"))
}
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
var yamlData []byte
switch kind {
case "Deployment":
var deployment *appsv1.Deployment
if namespace != "" {
deployment, err = clientset.AppsV1().Deployments(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Deployment"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get deployment: %w", err))
}
// Ensure Kind and APIVersion are set
if deployment.Kind == "" {
deployment.Kind = "Deployment"
}
if deployment.APIVersion == "" {
deployment.APIVersion = "apps/v1"
}
// Clean up managed fields and other metadata that cause conflicts
deployment.ManagedFields = nil
deployment.ResourceVersion = ""
deployment.UID = ""
deployment.CreationTimestamp = metav1.Time{}
deployment.SelfLink = ""
deployment.Generation = 0
yamlData, err = yaml.Marshal(deployment)
case "StatefulSet":
var statefulset *appsv1.StatefulSet
if namespace != "" {
statefulset, err = clientset.AppsV1().StatefulSets(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for StatefulSet"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get statefulset: %w", err))
}
// Ensure Kind and APIVersion are set
if statefulset.Kind == "" {
statefulset.Kind = "StatefulSet"
}
if statefulset.APIVersion == "" {
statefulset.APIVersion = "apps/v1"
}
// Clean up managed fields and other metadata that cause conflicts
statefulset.ManagedFields = nil
statefulset.ResourceVersion = ""
statefulset.UID = ""
statefulset.CreationTimestamp = metav1.Time{}
statefulset.SelfLink = ""
statefulset.Generation = 0
yamlData, err = yaml.Marshal(statefulset)
case "Service":
var service *corev1.Service
if namespace != "" {
service, err = clientset.CoreV1().Services(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Service"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get service: %w", err))
}
// Ensure Kind and APIVersion are set
if service.Kind == "" {
service.Kind = "Service"
}
if service.APIVersion == "" {
service.APIVersion = "v1"
}
// Clean up managed fields and other metadata that cause conflicts
service.ManagedFields = nil
service.ResourceVersion = ""
service.UID = ""
service.CreationTimestamp = metav1.Time{}
service.SelfLink = ""
service.Generation = 0
// Don't clean spec fields as they contain important information like NodePort
yamlData, err = yaml.Marshal(service)
case "ConfigMap":
var configmap *corev1.ConfigMap
if namespace != "" {
configmap, err = clientset.CoreV1().ConfigMaps(namespace).Get(c.Context(), name, metav1.GetOptions{})
} else {
return resp.R400(c, "", nil, fmt.Errorf("namespace is required for ConfigMap"))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get configmap: %w", err))
}
// Ensure Kind and APIVersion are set
if configmap.Kind == "" {
configmap.Kind = "ConfigMap"
}
if configmap.APIVersion == "" {
configmap.APIVersion = "v1"
}
// Clean up managed fields and other metadata that cause conflicts
configmap.ManagedFields = nil
configmap.ResourceVersion = ""
configmap.UID = ""
configmap.CreationTimestamp = metav1.Time{}
configmap.SelfLink = ""
configmap.Generation = 0
yamlData, err = yaml.Marshal(configmap)
default:
return resp.R400(c, "", nil, fmt.Errorf("unsupported resource kind: %s", kind))
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to marshal resource to yaml: %w", err))
}
return resp.R200(c, map[string]any{
"yaml": string(yamlData),
})
}
}
func K8sResourceUpdate(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
var req struct {
Yaml string `json:"yaml"`
}
if err := json.Unmarshal(c.Body(), &req); err != nil {
return resp.R400(c, "", nil, err)
}
if req.Yaml == "" {
return resp.R400(c, "", nil, fmt.Errorf("yaml content is empty"))
}
var config model.ClusterConfig
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
return resp.R500(c, "", nil, fmt.Errorf("kubeconfig not found: %w", err))
}
if config.Value == "" {
return resp.R500(c, "", nil, fmt.Errorf("kubeconfig is empty"))
}
clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value))
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to parse kubeconfig: %w", err))
}
// Force HTTP/1.1 to avoid stream closing issues
clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"}
clientConfig.Timeout = 0
dynamicClient, err := dynamic.NewForConfig(clientConfig)
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to create dynamic client: %w", err))
}
var obj unstructured.Unstructured
if err := yaml.Unmarshal([]byte(req.Yaml), &obj); err != nil {
return resp.R400(c, "", nil, fmt.Errorf("failed to parse yaml: %w", err))
}
gvk := obj.GroupVersionKind()
namespace := obj.GetNamespace()
name := obj.GetName()
if name == "" {
return resp.R400(c, "", nil, fmt.Errorf("resource name is required"))
}
gvr := schema.GroupVersionResource{
Group: gvk.Group,
Version: gvk.Version,
Resource: getResourceName(gvk.Kind),
}
// Update the resource
var result *unstructured.Unstructured
if namespace != "" {
result, err = dynamicClient.Resource(gvr).Namespace(namespace).Update(c.Context(), &obj, metav1.UpdateOptions{})
} else {
result, err = dynamicClient.Resource(gvr).Update(c.Context(), &obj, metav1.UpdateOptions{})
}
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to update resource: %w", err))
}
return resp.R200(c, map[string]any{
"name": result.GetName(),
"namespace": result.GetNamespace(),
"kind": result.GetKind(),
})
}
}
func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
podName := c.Query("name", "")
namespace := c.Query("namespace", "")
tailLines := int64(1000)
follow := c.Query("follow", "") == "true"
if podName == "" || namespace == "" {
return resp.R400(c, "", nil, fmt.Errorf("name and namespace are required"))
}
restConfig, err := getK8sConfig(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return resp.R500(c, "", nil, err)
}
podLogOpts := &corev1.PodLogOptions{
TailLines: &tailLines,
Follow: follow,
}
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
logCtx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := req.Stream(logCtx)
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to get pod logs: %w", err))
}
defer stream.Close()
// Use the existing SSE manager from resp package
manager := resp.SSE(c, "pod-logs")
// Start streaming logs in a goroutine
go func() {
defer manager.Close()
reader := bufio.NewReader(stream)
for {
select {
case <-logCtx.Done():
return
default:
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
manager.Send("[EOF]")
return
}
manager.Send(fmt.Sprintf("error: %v", err))
return
}
manager.Send(line)
}
}
}()
// Return nil since we're handling the response directly
c.Context().SetBodyStreamWriter(manager.Writer())
return nil
}
}
func K8sPodDelete(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
return func(c fiber.Ctx) error {
var req struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
}
if err := json.Unmarshal(c.Body(), &req); err != nil {
return resp.R400(c, "", nil, err)
}
if req.Name == "" || req.Namespace == "" {
return resp.R400(c, "", nil, fmt.Errorf("name and namespace are required"))
}
clientset, err := getK8sClient(db)
if err != nil {
return resp.R500(c, "", nil, err)
}
err = clientset.CoreV1().Pods(req.Namespace).Delete(c.Context(), req.Name, metav1.DeleteOptions{})
if err != nil {
return resp.R500(c, "", nil, fmt.Errorf("failed to delete pod: %w", err))
}
return resp.R200(c, map[string]any{
"name": req.Name,
"namespace": req.Namespace,
})
}
}