Fix SSE connection handling and optimize Dockerfile
- Fixed SSE connection not being properly closed when pod logs dialog is closed - Added proper cleanup for EventSource connections in K8sResourceList.tsx - Added debugging logs to track SSE connection lifecycle - Optimized Dockerfile to avoid copying frontend files during Go build stage - Fixed backend handler to properly use context from request for log streaming 🤖 Generated with [Qoder][https://qoder.com]
This commit is contained in:
@@ -15,6 +15,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
@@ -22,12 +23,11 @@ import (
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"sigs.k8s.io/yaml"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) {
|
||||
var config model.ClusterConfig
|
||||
|
||||
|
||||
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
|
||||
return nil, fmt.Errorf("kubeconfig not found: %w", err)
|
||||
}
|
||||
@@ -53,7 +53,7 @@ func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) {
|
||||
|
||||
func getK8sConfig(db *gorm.DB) (*rest.Config, error) {
|
||||
var config model.ClusterConfig
|
||||
|
||||
|
||||
if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil {
|
||||
return nil, fmt.Errorf("kubeconfig not found: %w", err)
|
||||
}
|
||||
@@ -96,7 +96,7 @@ func K8sDeploymentList(ctx context.Context, db *gorm.DB, store store.Store) fibe
|
||||
return func(c fiber.Ctx) error {
|
||||
namespace := c.Query("namespace", "")
|
||||
name := c.Query("name", "")
|
||||
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
@@ -129,7 +129,7 @@ func K8sStatefulSetList(ctx context.Context, db *gorm.DB, store store.Store) fib
|
||||
return func(c fiber.Ctx) error {
|
||||
namespace := c.Query("namespace", "")
|
||||
name := c.Query("name", "")
|
||||
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
@@ -162,7 +162,7 @@ func K8sConfigMapList(ctx context.Context, db *gorm.DB, store store.Store) fiber
|
||||
return func(c fiber.Ctx) error {
|
||||
namespace := c.Query("namespace", "")
|
||||
name := c.Query("name", "")
|
||||
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
@@ -195,7 +195,7 @@ func K8sPodList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl
|
||||
return func(c fiber.Ctx) error {
|
||||
namespace := c.Query("namespace", "")
|
||||
name := c.Query("name", "")
|
||||
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
@@ -246,7 +246,7 @@ func K8sPVCList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl
|
||||
return func(c fiber.Ctx) error {
|
||||
namespace := c.Query("namespace", "")
|
||||
name := c.Query("name", "")
|
||||
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
@@ -279,7 +279,7 @@ func K8sServiceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.H
|
||||
return func(c fiber.Ctx) error {
|
||||
namespace := c.Query("namespace", "")
|
||||
name := c.Query("name", "")
|
||||
|
||||
|
||||
clientset, err := getK8sClient(db)
|
||||
if err != nil {
|
||||
return resp.R500(c, "", nil, err)
|
||||
@@ -386,25 +386,25 @@ func K8sResourceGet(ctx context.Context, db *gorm.DB, store store.Store) fiber.H
|
||||
|
||||
func getResourceName(kind string) string {
|
||||
kindToResource := map[string]string{
|
||||
"Namespace": "namespaces",
|
||||
"Deployment": "deployments",
|
||||
"StatefulSet": "statefulsets",
|
||||
"Service": "services",
|
||||
"ConfigMap": "configmaps",
|
||||
"Pod": "pods",
|
||||
"PersistentVolume": "persistentvolumes",
|
||||
"PersistentVolumeClaim": "persistentvolumeclaims",
|
||||
"Secret": "secrets",
|
||||
"Ingress": "ingresses",
|
||||
"DaemonSet": "daemonsets",
|
||||
"Job": "jobs",
|
||||
"CronJob": "cronjobs",
|
||||
"ReplicaSet": "replicasets",
|
||||
"ServiceAccount": "serviceaccounts",
|
||||
"Role": "roles",
|
||||
"RoleBinding": "rolebindings",
|
||||
"ClusterRole": "clusterroles",
|
||||
"ClusterRoleBinding": "clusterrolebindings",
|
||||
"Namespace": "namespaces",
|
||||
"Deployment": "deployments",
|
||||
"StatefulSet": "statefulsets",
|
||||
"Service": "services",
|
||||
"ConfigMap": "configmaps",
|
||||
"Pod": "pods",
|
||||
"PersistentVolume": "persistentvolumes",
|
||||
"PersistentVolumeClaim": "persistentvolumeclaims",
|
||||
"Secret": "secrets",
|
||||
"Ingress": "ingresses",
|
||||
"DaemonSet": "daemonsets",
|
||||
"Job": "jobs",
|
||||
"CronJob": "cronjobs",
|
||||
"ReplicaSet": "replicasets",
|
||||
"ServiceAccount": "serviceaccounts",
|
||||
"Role": "roles",
|
||||
"RoleBinding": "rolebindings",
|
||||
"ClusterRole": "clusterroles",
|
||||
"ClusterRoleBinding": "clusterrolebindings",
|
||||
}
|
||||
|
||||
if resource, ok := kindToResource[kind]; ok {
|
||||
@@ -725,23 +725,24 @@ func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl
|
||||
}
|
||||
|
||||
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts)
|
||||
|
||||
logCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logCtx, cancel := context.WithCancel(c.Context())
|
||||
|
||||
stream, err := req.Stream(logCtx)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return resp.R500(c, "", nil, fmt.Errorf("failed to get pod logs: %w", err))
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
// Use the existing SSE manager from resp package
|
||||
manager := resp.SSE(c, "pod-logs")
|
||||
|
||||
|
||||
// Start streaming logs in a goroutine
|
||||
go func() {
|
||||
defer stream.Close()
|
||||
defer manager.Close()
|
||||
|
||||
defer cancel()
|
||||
|
||||
reader := bufio.NewReader(stream)
|
||||
for {
|
||||
select {
|
||||
@@ -751,20 +752,18 @@ func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
manager.Send("[EOF]")
|
||||
manager.JSON(map[string]any{"type": "EOF"})
|
||||
return
|
||||
}
|
||||
manager.Send(fmt.Sprintf("error: %v", err))
|
||||
manager.JSON(map[string]any{"type": "error", "data": err.Error()})
|
||||
return
|
||||
}
|
||||
manager.Send(line)
|
||||
manager.JSON(map[string]any{"data": line, "type": "log"})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Return nil since we're handling the response directly
|
||||
c.Context().SetBodyStreamWriter(manager.Writer())
|
||||
return nil
|
||||
return c.SendStreamWriter(manager.Writer())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user