package k8s import ( "bufio" "context" "encoding/json" "fmt" "io" "strings" "gitea.loveuer.com/loveuer/cluster/internal/model" "gitea.loveuer.com/loveuer/cluster/pkg/resp" "gitea.loveuer.com/loveuer/cluster/pkg/store" "github.com/gofiber/fiber/v3" "gorm.io/gorm" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/yaml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) { var config model.ClusterConfig if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil { return nil, fmt.Errorf("kubeconfig not found: %w", err) } if config.Value == "" { return nil, fmt.Errorf("kubeconfig is empty") } clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value)) if err != nil { return nil, fmt.Errorf("failed to parse kubeconfig: %w", err) } clientConfig.Timeout = 0 clientset, err := kubernetes.NewForConfig(clientConfig) if err != nil { return nil, fmt.Errorf("failed to create k8s client: %w", err) } return clientset, nil } func getK8sConfig(db *gorm.DB) (*rest.Config, error) { var config model.ClusterConfig if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil { return nil, fmt.Errorf("kubeconfig not found: %w", err) } if config.Value == "" { return nil, fmt.Errorf("kubeconfig is empty") } clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value)) if err != nil { return nil, fmt.Errorf("failed to parse kubeconfig: %w", err) } // Disable HTTP/2 to avoid stream closing issues clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"} clientConfig.Timeout = 0 return clientConfig, nil } func K8sNamespaceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } namespaces, err := clientset.CoreV1().Namespaces().List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } return resp.R200(c, map[string]any{ "items": namespaces.Items, }) } } func K8sDeploymentList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } deployments, err := clientset.AppsV1().Deployments(namespace).List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } // Filter by name if provided var filtered []appsv1.Deployment if name != "" { for _, deployment := range deployments.Items { if strings.Contains(deployment.Name, name) { filtered = append(filtered, deployment) } } } else { filtered = deployments.Items } return resp.R200(c, map[string]any{ "items": filtered, }) } } func K8sStatefulSetList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } statefulsets, err := clientset.AppsV1().StatefulSets(namespace).List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } // Filter by name if provided var filtered []appsv1.StatefulSet if name != "" { for _, statefulset := range statefulsets.Items { if strings.Contains(statefulset.Name, name) { filtered = append(filtered, statefulset) } } } else { filtered = statefulsets.Items } return resp.R200(c, map[string]any{ "items": filtered, }) } } func K8sConfigMapList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } configmaps, err := clientset.CoreV1().ConfigMaps(namespace).List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } // Filter by name if provided var filtered []corev1.ConfigMap if name != "" { for _, configmap := range configmaps.Items { if strings.Contains(configmap.Name, name) { filtered = append(filtered, configmap) } } } else { filtered = configmaps.Items } return resp.R200(c, map[string]any{ "items": filtered, }) } } func K8sPodList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } pods, err := clientset.CoreV1().Pods(namespace).List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } // Filter by name if provided var filtered []corev1.Pod if name != "" { for _, pod := range pods.Items { if strings.Contains(pod.Name, name) { filtered = append(filtered, pod) } } } else { filtered = pods.Items } return resp.R200(c, map[string]any{ "items": filtered, }) } } func K8sPVList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } pvs, err := clientset.CoreV1().PersistentVolumes().List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } return resp.R200(c, map[string]any{ "items": pvs.Items, }) } } func K8sPVCList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } pvcs, err := clientset.CoreV1().PersistentVolumeClaims(namespace).List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } // Filter by name if provided var filtered []corev1.PersistentVolumeClaim if name != "" { for _, pvc := range pvcs.Items { if strings.Contains(pvc.Name, name) { filtered = append(filtered, pvc) } } } else { filtered = pvcs.Items } return resp.R200(c, map[string]any{ "items": filtered, }) } } func K8sServiceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } services, err := clientset.CoreV1().Services(namespace).List(c.Context(), metav1.ListOptions{}) if err != nil { return resp.R500(c, "", nil, err) } // Filter by name if provided var filtered []corev1.Service if name != "" { for _, service := range services.Items { if strings.Contains(service.Name, name) { filtered = append(filtered, service) } } } else { filtered = services.Items } return resp.R200(c, map[string]any{ "items": filtered, }) } } func K8sResourceGet(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { name := c.Query("name", "") namespace := c.Query("namespace", "") kind := c.Query("kind", "") if name == "" || kind == "" { return resp.R400(c, "", nil, fmt.Errorf("name and kind are required")) } clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } var yamlData []byte switch kind { case "Deployment": var deployment *appsv1.Deployment if namespace != "" { deployment, err = clientset.AppsV1().Deployments(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Deployment")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get deployment: %w", err)) } yamlData, err = yaml.Marshal(deployment) case "StatefulSet": var statefulset *appsv1.StatefulSet if namespace != "" { statefulset, err = clientset.AppsV1().StatefulSets(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for StatefulSet")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get statefulset: %w", err)) } yamlData, err = yaml.Marshal(statefulset) case "Service": var service *corev1.Service if namespace != "" { service, err = clientset.CoreV1().Services(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Service")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get service: %w", err)) } yamlData, err = yaml.Marshal(service) case "ConfigMap": var configmap *corev1.ConfigMap if namespace != "" { configmap, err = clientset.CoreV1().ConfigMaps(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for ConfigMap")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get configmap: %w", err)) } yamlData, err = yaml.Marshal(configmap) default: return resp.R400(c, "", nil, fmt.Errorf("unsupported resource kind: %s", kind)) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to marshal resource to yaml: %w", err)) } return resp.R200(c, map[string]any{ "yaml": string(yamlData), }) } } func getResourceName(kind string) string { kindToResource := map[string]string{ "Namespace": "namespaces", "Deployment": "deployments", "StatefulSet": "statefulsets", "Service": "services", "ConfigMap": "configmaps", "Pod": "pods", "PersistentVolume": "persistentvolumes", "PersistentVolumeClaim": "persistentvolumeclaims", "Secret": "secrets", "Ingress": "ingresses", "DaemonSet": "daemonsets", "Job": "jobs", "CronJob": "cronjobs", "ReplicaSet": "replicasets", "ServiceAccount": "serviceaccounts", "Role": "roles", "RoleBinding": "rolebindings", "ClusterRole": "clusterroles", "ClusterRoleBinding": "clusterrolebindings", } if resource, ok := kindToResource[kind]; ok { return resource } return kind + "s" } func K8sResourceApply(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { var req struct { Yaml string `json:"yaml"` } if err := json.Unmarshal(c.Body(), &req); err != nil { return resp.R400(c, "", nil, err) } if req.Yaml == "" { return resp.R400(c, "", nil, fmt.Errorf("yaml content is empty")) } var config model.ClusterConfig if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil { return resp.R500(c, "", nil, fmt.Errorf("kubeconfig not found: %w", err)) } if config.Value == "" { return resp.R500(c, "", nil, fmt.Errorf("kubeconfig is empty")) } clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value)) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to parse kubeconfig: %w", err)) } // Force HTTP/1.1 to avoid stream closing issues clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"} clientConfig.Timeout = 0 dynamicClient, err := dynamic.NewForConfig(clientConfig) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to create dynamic client: %w", err)) } var obj unstructured.Unstructured if err := yaml.Unmarshal([]byte(req.Yaml), &obj); err != nil { return resp.R400(c, "", nil, fmt.Errorf("failed to parse yaml: %w", err)) } gvk := obj.GroupVersionKind() namespace := obj.GetNamespace() name := obj.GetName() if name == "" { return resp.R400(c, "", nil, fmt.Errorf("resource name is required")) } gvr := schema.GroupVersionResource{ Group: gvk.Group, Version: gvk.Version, Resource: getResourceName(gvk.Kind), } var result *unstructured.Unstructured if namespace != "" { result, err = dynamicClient.Resource(gvr).Namespace(namespace).Create(c.Context(), &obj, metav1.CreateOptions{}) } else { result, err = dynamicClient.Resource(gvr).Create(c.Context(), &obj, metav1.CreateOptions{}) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to apply resource: %w", err)) } return resp.R200(c, map[string]any{ "name": result.GetName(), "namespace": result.GetNamespace(), "kind": result.GetKind(), }) } } func K8sResourceFetch(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { name := c.Query("name", "") namespace := c.Query("namespace", "") kind := c.Query("kind", "") if name == "" || kind == "" { return resp.R400(c, "", nil, fmt.Errorf("name and kind are required")) } clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } var yamlData []byte switch kind { case "Deployment": var deployment *appsv1.Deployment if namespace != "" { deployment, err = clientset.AppsV1().Deployments(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Deployment")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get deployment: %w", err)) } // Clean up managed fields and other metadata deployment.ManagedFields = nil deployment.ResourceVersion = "" deployment.UID = "" deployment.CreationTimestamp = metav1.Time{} yamlData, err = yaml.Marshal(deployment) case "StatefulSet": var statefulset *appsv1.StatefulSet if namespace != "" { statefulset, err = clientset.AppsV1().StatefulSets(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for StatefulSet")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get statefulset: %w", err)) } // Clean up managed fields and other metadata statefulset.ManagedFields = nil statefulset.ResourceVersion = "" statefulset.UID = "" statefulset.CreationTimestamp = metav1.Time{} yamlData, err = yaml.Marshal(statefulset) case "Service": var service *corev1.Service if namespace != "" { service, err = clientset.CoreV1().Services(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for Service")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get service: %w", err)) } // Clean up managed fields and other metadata service.ManagedFields = nil service.ResourceVersion = "" service.UID = "" service.CreationTimestamp = metav1.Time{} yamlData, err = yaml.Marshal(service) case "ConfigMap": var configmap *corev1.ConfigMap if namespace != "" { configmap, err = clientset.CoreV1().ConfigMaps(namespace).Get(c.Context(), name, metav1.GetOptions{}) } else { return resp.R400(c, "", nil, fmt.Errorf("namespace is required for ConfigMap")) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get configmap: %w", err)) } // Clean up managed fields and other metadata configmap.ManagedFields = nil configmap.ResourceVersion = "" configmap.UID = "" configmap.CreationTimestamp = metav1.Time{} yamlData, err = yaml.Marshal(configmap) default: return resp.R400(c, "", nil, fmt.Errorf("unsupported resource kind: %s", kind)) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to marshal resource to yaml: %w", err)) } return resp.R200(c, map[string]any{ "yaml": string(yamlData), }) } } func K8sResourceUpdate(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { var req struct { Yaml string `json:"yaml"` } if err := json.Unmarshal(c.Body(), &req); err != nil { return resp.R400(c, "", nil, err) } if req.Yaml == "" { return resp.R400(c, "", nil, fmt.Errorf("yaml content is empty")) } var config model.ClusterConfig if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil { return resp.R500(c, "", nil, fmt.Errorf("kubeconfig not found: %w", err)) } if config.Value == "" { return resp.R500(c, "", nil, fmt.Errorf("kubeconfig is empty")) } clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value)) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to parse kubeconfig: %w", err)) } // Force HTTP/1.1 to avoid stream closing issues clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"} clientConfig.Timeout = 0 dynamicClient, err := dynamic.NewForConfig(clientConfig) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to create dynamic client: %w", err)) } var obj unstructured.Unstructured if err := yaml.Unmarshal([]byte(req.Yaml), &obj); err != nil { return resp.R400(c, "", nil, fmt.Errorf("failed to parse yaml: %w", err)) } gvk := obj.GroupVersionKind() namespace := obj.GetNamespace() name := obj.GetName() if name == "" { return resp.R400(c, "", nil, fmt.Errorf("resource name is required")) } gvr := schema.GroupVersionResource{ Group: gvk.Group, Version: gvk.Version, Resource: getResourceName(gvk.Kind), } // Update the resource var result *unstructured.Unstructured if namespace != "" { result, err = dynamicClient.Resource(gvr).Namespace(namespace).Update(c.Context(), &obj, metav1.UpdateOptions{}) } else { result, err = dynamicClient.Resource(gvr).Update(c.Context(), &obj, metav1.UpdateOptions{}) } if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to update resource: %w", err)) } return resp.R200(c, map[string]any{ "name": result.GetName(), "namespace": result.GetNamespace(), "kind": result.GetKind(), }) } } func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { podName := c.Query("name", "") namespace := c.Query("namespace", "") tailLines := int64(1000) follow := c.Query("follow", "") == "true" if podName == "" || namespace == "" { return resp.R400(c, "", nil, fmt.Errorf("name and namespace are required")) } restConfig, err := getK8sConfig(db) if err != nil { return resp.R500(c, "", nil, err) } clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { return resp.R500(c, "", nil, err) } podLogOpts := &corev1.PodLogOptions{ TailLines: &tailLines, Follow: follow, } req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts) logCtx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := req.Stream(logCtx) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to get pod logs: %w", err)) } defer stream.Close() // Use the existing SSE manager from resp package manager := resp.SSE(c, "pod-logs") // Start streaming logs in a goroutine go func() { defer manager.Close() reader := bufio.NewReader(stream) for { select { case <-logCtx.Done(): return default: line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { manager.Send("[EOF]") return } manager.Send(fmt.Sprintf("error: %v", err)) return } manager.Send(line) } } }() // Return nil since we're handling the response directly c.Context().SetBodyStreamWriter(manager.Writer()) return nil } } func K8sPodDelete(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler { return func(c fiber.Ctx) error { var req struct { Name string `json:"name"` Namespace string `json:"namespace"` } if err := json.Unmarshal(c.Body(), &req); err != nil { return resp.R400(c, "", nil, err) } if req.Name == "" || req.Namespace == "" { return resp.R400(c, "", nil, fmt.Errorf("name and namespace are required")) } clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) } err = clientset.CoreV1().Pods(req.Namespace).Delete(c.Context(), req.Name, metav1.DeleteOptions{}) if err != nil { return resp.R500(c, "", nil, fmt.Errorf("failed to delete pod: %w", err)) } return resp.R200(c, map[string]any{ "name": req.Name, "namespace": req.Namespace, }) } }