feat: add pod logs and delete operations
- Add pod logs button with SSE streaming (WIP: SSE connection issues with HTTP/2) - Add pod delete button with confirmation dialog - Use existing resp.SSE package for log streaming - Force HTTP/1.1 for k8s client to avoid stream closing issues - Update frontend to handle pod actions and dialogs 🤖 Generated with [Qoder][https://qoder.com]
This commit is contained in:
@@ -1,19 +1,23 @@
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"gitea.loveuer.com/loveuer/cluster/internal/model"
|
||||
"gitea.loveuer.com/loveuer/cluster/pkg/resp"
|
||||
"gitea.loveuer.com/loveuer/cluster/pkg/store"
|
||||
"github.com/gofiber/fiber/v3"
|
||||
"gorm.io/gorm"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"sigs.k8s.io/yaml"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -35,6 +39,8 @@ func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) {
|
||||
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
clientConfig.Timeout = 0
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(clientConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create k8s client: %w", err)
|
||||
@@ -43,6 +49,29 @@ func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) {
|
||||
return clientset, nil
|
||||
}
|
||||
|
||||
func getK8sConfig(db *gorm.DB) (*rest.Config, error) {
|
||||
var config model.ClusterConfig
|
||||
|
||||
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
|
||||
return nil, fmt.Errorf("kubeconfig not found: %w", err)
|
||||
}
|
||||
|
||||
if config.Value == "" {
|
||||
return nil, fmt.Errorf("kubeconfig is empty")
|
||||
}
|
||||
|
||||
clientConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.Value))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
|
||||
}
|
||||
|
||||
// Disable HTTP/2 to avoid stream closing issues
|
||||
clientConfig.TLSClientConfig.NextProtos = []string{"http/1.1"}
|
||||
clientConfig.Timeout = 0
|
||||
|
||||
return clientConfig, nil
|
||||
}
|
||||
|
||||
func K8sNamespaceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
|
||||
return func(c fiber.Ctx) error {
|
||||
clientset, err := getK8sClient(db)
|
||||
@@ -299,3 +328,105 @@ func getResourceName(kind string) string {
|
||||
|
||||
return kind + "s"
|
||||
}
|
||||
|
||||
func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
|
||||
return func(c fiber.Ctx) error {
|
||||
podName := c.Query("name", "")
|
||||
namespace := c.Query("namespace", "")
|
||||
tailLines := int64(1000)
|
||||
follow := c.Query("follow", "") == "true"
|
||||
|
||||
if podName == "" || namespace == "" {
|
||||
return resp.R400(c, "", nil, fmt.Errorf("name and namespace are required"))
|
||||
}
|
||||
|
||||
restConfig, err := getK8sConfig(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(restConfig)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
podLogOpts := &corev1.PodLogOptions{
|
||||
TailLines: &tailLines,
|
||||
Follow: follow,
|
||||
}
|
||||
|
||||
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
|
||||
|
||||
logCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
stream, err := req.Stream(logCtx)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, fmt.Errorf("failed to get pod logs: %w", err))
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
// Use the existing SSE manager from resp package
|
||||
manager := resp.SSE(c, "pod-logs")
|
||||
|
||||
// Start streaming logs in a goroutine
|
||||
go func() {
|
||||
defer manager.Close()
|
||||
|
||||
reader := bufio.NewReader(stream)
|
||||
for {
|
||||
select {
|
||||
case <-logCtx.Done():
|
||||
return
|
||||
default:
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
manager.Send("[EOF]")
|
||||
return
|
||||
}
|
||||
manager.Send(fmt.Sprintf("error: %v", err))
|
||||
return
|
||||
}
|
||||
manager.Send(line)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Return nil since we're handling the response directly
|
||||
c.Context().SetBodyStreamWriter(manager.Writer())
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func K8sPodDelete(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handler {
|
||||
return func(c fiber.Ctx) error {
|
||||
var req struct {
|
||||
Name string `json:"name"`
|
||||
Namespace string `json:"namespace"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(c.Body(), &req); err != nil {
|
||||
return resp.R400(c, "", nil, err)
|
||||
}
|
||||
|
||||
if req.Name == "" || req.Namespace == "" {
|
||||
return resp.R400(c, "", nil, fmt.Errorf("name and namespace are required"))
|
||||
}
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
}
|
||||
|
||||
err = clientset.CoreV1().Pods(req.Namespace).Delete(c.Context(), req.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, fmt.Errorf("failed to delete pod: %w", err))
|
||||
}
|
||||
|
||||
return resp.R200(c, map[string]any{
|
||||
"name": req.Name,
|
||||
"namespace": req.Namespace,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user