From 704d0fe0bf7d0c5ffd31f25b2f62789a15d0b20b Mon Sep 17 00:00:00 2001 From: loveuer Date: Mon, 17 Nov 2025 10:33:45 +0800 Subject: [PATCH] Fix SSE connection handling and optimize Dockerfile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fixed SSE connection not being properly closed when pod logs dialog is closed - Added proper cleanup for EventSource connections in K8sResourceList.tsx - Added debugging logs to track SSE connection lifecycle - Optimized Dockerfile to avoid copying frontend files during Go build stage - Fixed backend handler to properly use context from request for log streaming 🤖 Generated with [Qoder][https://qoder.com] --- Dockerfile | 63 ++++++++++++++++++++ docker-entrypoint.sh | 10 ++++ frontend/src/pages/K8sResourceList.tsx | 66 ++++++++++++++++++--- internal/module/k8s/handler.resource.go | 79 ++++++++++++------------- nginx.conf | 51 ++++++++++++++++ 5 files changed, 221 insertions(+), 48 deletions(-) create mode 100644 Dockerfile create mode 100755 docker-entrypoint.sh create mode 100644 nginx.conf diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4742a22 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,63 @@ +# Multi-stage build for Cluster application with Go backend and React frontend + +# Frontend build stage +FROM node:18 AS frontend-build + +WORKDIR /app + +# Copy package files +COPY frontend/package.json frontend/pnpm-lock.yaml ./ + +# Install pnpm globally +RUN npm install -g pnpm + +# Install frontend dependencies +RUN pnpm install --frozen-lockfile + +# Copy frontend source +COPY frontend/ . + +# Build frontend +RUN pnpm run build + +# Backend build stage +FROM golang:1.22 AS backend-build + +WORKDIR /app + +# Copy go mod files +COPY go.mod go.sum ./ + +# Download dependencies +RUN go mod download + +# Copy only backend source code +COPY main.go ./ +COPY internal/ ./internal/ +COPY pkg/ ./pkg/ + +# Build backend +RUN go build -o cluster . + +# Final stage - Nginx server +FROM nginx:latest + +# Copy nginx configuration +COPY nginx.conf /etc/nginx/nginx.conf + +# Copy backend binary +COPY --from=backend-build /app/cluster /app/cluster + +# Copy frontend build +COPY --from=frontend-build /app/dist /usr/share/nginx/html + +# Create data directory +RUN mkdir -p /app/x-storage + +# Expose ports +EXPOSE 80 + +# Start backend and nginx +COPY docker-entrypoint.sh /docker-entrypoint.sh +RUN chmod +x /docker-entrypoint.sh +ENTRYPOINT ["/docker-entrypoint.sh"] \ No newline at end of file diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100755 index 0000000..ba9d518 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +# Start the Go backend in the background +/app/cluster -address 127.0.0.1:9119 -data-dir /data & + +# Wait a moment for backend to start +sleep 2 + +# Start nginx in the foreground +nginx -g 'daemon off;' diff --git a/frontend/src/pages/K8sResourceList.tsx b/frontend/src/pages/K8sResourceList.tsx index 8700951..478333a 100644 --- a/frontend/src/pages/K8sResourceList.tsx +++ b/frontend/src/pages/K8sResourceList.tsx @@ -76,6 +76,7 @@ export default function K8sResourceList() { const [logsDialogOpen, setLogsDialogOpen] = useState(false) const [logs, setLogs] = useState([]) const [selectedPod, setSelectedPod] = useState<{ name: string; namespace: string } | null>(null) + const eventSourceRef = useRef(null) const [deleteDialogOpen, setDeleteDialogOpen] = useState(false) const [deleteTarget, setDeleteTarget] = useState<{ name: string; namespace: string } | null>(null) const [deleting, setDeleting] = useState(false) @@ -98,6 +99,17 @@ export default function K8sResourceList() { } }, [selectedKind, namespace, nameFilter]) + // Clean up SSE connection on component unmount + useEffect(() => { + return () => { + if (eventSourceRef.current) { + console.log('Cleaning up SSE connection on component unmount') + eventSourceRef.current.close() + eventSourceRef.current = null + } + } + }, []) + const fetchKubeconfig = async () => { try { const res = await fetch('/api/v1/k8s/config') @@ -171,24 +183,62 @@ export default function K8sResourceList() { } const handleViewLogs = (podName: string, podNamespace: string) => { + console.log('handleViewLogs called with:', { podName, podNamespace }) setSelectedPod({ name: podName, namespace: podNamespace }) setLogs([]) setLogsDialogOpen(true) + // Close any existing connection + if (eventSourceRef.current) { + console.log('Closing existing EventSource connection') + eventSourceRef.current.close() + eventSourceRef.current = null + } + const eventSource = new EventSource( `/api/v1/k8s/pod/logs?name=${encodeURIComponent(podName)}&namespace=${encodeURIComponent(podNamespace)}&tail=1000&follow=true` ) + + // Save reference to the EventSource + eventSourceRef.current = eventSource - eventSource.onmessage = (event) => { - setLogs((prev) => [...prev, event.data]) - setTimeout(() => logsEndRef.current?.scrollIntoView({ behavior: 'smooth' }), 100) - } + // Listen for the specific event type 'pod-logs' + eventSource.addEventListener('pod-logs', (event: MessageEvent) => { + try { + const message = JSON.parse(event.data) + if (message.type === 'log') { + setLogs((prev) => [...prev, message.data]) + setTimeout(() => logsEndRef.current?.scrollIntoView({ behavior: 'smooth' }), 100) + } else if (message.type === 'EOF') { + // Handle end of stream if needed + } else if (message.type === 'error') { + setLogs((prev) => [...prev, `Error: ${message.data}`]) + } + } catch (e) { + // If parsing fails, treat as plain text (fallback) + setLogs((prev) => [...prev, event.data]) + setTimeout(() => logsEndRef.current?.scrollIntoView({ behavior: 'smooth' }), 100) + } + }) eventSource.onerror = () => { - eventSource.close() + console.log('EventSource error occurred') + if (eventSourceRef.current) { + eventSourceRef.current.close() + eventSourceRef.current = null + } } + } - return () => eventSource.close() + const handleCloseLogsDialog = () => { + console.log('handleCloseLogsDialog called') + // Close the EventSource connection if it exists + if (eventSourceRef.current) { + console.log('Closing EventSource connection') + eventSourceRef.current.close() + eventSourceRef.current = null + } + setLogsDialogOpen(false) } const handleDeleteResource = async () => { @@ -843,7 +893,7 @@ export default function K8sResourceList() { setLogsDialogOpen(false)} + onClose={handleCloseLogsDialog} maxWidth="lg" fullWidth > @@ -852,7 +902,7 @@ export default function K8sResourceList() { Pod 日志: {selectedPod?.name} ({selectedPod?.namespace}) - setLogsDialogOpen(false)}> + diff --git a/internal/module/k8s/handler.resource.go b/internal/module/k8s/handler.resource.go index 8fcb4a1..039f8eb 100644 --- a/internal/module/k8s/handler.resource.go +++ b/internal/module/k8s/handler.resource.go @@ -15,6 +15,7 @@ import ( "gorm.io/gorm" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -22,12 +23,11 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "sigs.k8s.io/yaml" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) { var config model.ClusterConfig - + if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil { return nil, fmt.Errorf("kubeconfig not found: %w", err) } @@ -53,7 +53,7 @@ func getK8sClient(db *gorm.DB) (*kubernetes.Clientset, error) { func getK8sConfig(db *gorm.DB) (*rest.Config, error) { var config model.ClusterConfig - + if err := db.Where("key = ?", "kubeconfig").First(&config).Error; err != nil { return nil, fmt.Errorf("kubeconfig not found: %w", err) } @@ -96,7 +96,7 @@ func K8sDeploymentList(ctx context.Context, db *gorm.DB, store store.Store) fibe return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") - + clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) @@ -129,7 +129,7 @@ func K8sStatefulSetList(ctx context.Context, db *gorm.DB, store store.Store) fib return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") - + clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) @@ -162,7 +162,7 @@ func K8sConfigMapList(ctx context.Context, db *gorm.DB, store store.Store) fiber return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") - + clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) @@ -195,7 +195,7 @@ func K8sPodList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") - + clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) @@ -246,7 +246,7 @@ func K8sPVCList(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") - + clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) @@ -279,7 +279,7 @@ func K8sServiceList(ctx context.Context, db *gorm.DB, store store.Store) fiber.H return func(c fiber.Ctx) error { namespace := c.Query("namespace", "") name := c.Query("name", "") - + clientset, err := getK8sClient(db) if err != nil { return resp.R500(c, "", nil, err) @@ -386,25 +386,25 @@ func K8sResourceGet(ctx context.Context, db *gorm.DB, store store.Store) fiber.H func getResourceName(kind string) string { kindToResource := map[string]string{ - "Namespace": "namespaces", - "Deployment": "deployments", - "StatefulSet": "statefulsets", - "Service": "services", - "ConfigMap": "configmaps", - "Pod": "pods", - "PersistentVolume": "persistentvolumes", - "PersistentVolumeClaim": "persistentvolumeclaims", - "Secret": "secrets", - "Ingress": "ingresses", - "DaemonSet": "daemonsets", - "Job": "jobs", - "CronJob": "cronjobs", - "ReplicaSet": "replicasets", - "ServiceAccount": "serviceaccounts", - "Role": "roles", - "RoleBinding": "rolebindings", - "ClusterRole": "clusterroles", - "ClusterRoleBinding": "clusterrolebindings", + "Namespace": "namespaces", + "Deployment": "deployments", + "StatefulSet": "statefulsets", + "Service": "services", + "ConfigMap": "configmaps", + "Pod": "pods", + "PersistentVolume": "persistentvolumes", + "PersistentVolumeClaim": "persistentvolumeclaims", + "Secret": "secrets", + "Ingress": "ingresses", + "DaemonSet": "daemonsets", + "Job": "jobs", + "CronJob": "cronjobs", + "ReplicaSet": "replicasets", + "ServiceAccount": "serviceaccounts", + "Role": "roles", + "RoleBinding": "rolebindings", + "ClusterRole": "clusterroles", + "ClusterRoleBinding": "clusterrolebindings", } if resource, ok := kindToResource[kind]; ok { @@ -725,23 +725,24 @@ func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl } req := clientset.CoreV1().Pods(namespace).GetLogs(podName, podLogOpts) - - logCtx, cancel := context.WithCancel(context.Background()) - defer cancel() + + logCtx, cancel := context.WithCancel(c.Context()) stream, err := req.Stream(logCtx) if err != nil { + cancel() return resp.R500(c, "", nil, fmt.Errorf("failed to get pod logs: %w", err)) } - defer stream.Close() // Use the existing SSE manager from resp package manager := resp.SSE(c, "pod-logs") - + // Start streaming logs in a goroutine go func() { + defer stream.Close() defer manager.Close() - + defer cancel() + reader := bufio.NewReader(stream) for { select { @@ -751,20 +752,18 @@ func K8sPodLogs(ctx context.Context, db *gorm.DB, store store.Store) fiber.Handl line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { - manager.Send("[EOF]") + manager.JSON(map[string]any{"type": "EOF"}) return } - manager.Send(fmt.Sprintf("error: %v", err)) + manager.JSON(map[string]any{"type": "error", "data": err.Error()}) return } - manager.Send(line) + manager.JSON(map[string]any{"data": line, "type": "log"}) } } }() - // Return nil since we're handling the response directly - c.Context().SetBodyStreamWriter(manager.Writer()) - return nil + return c.SendStreamWriter(manager.Writer()) } } diff --git a/nginx.conf b/nginx.conf new file mode 100644 index 0000000..9ca20be --- /dev/null +++ b/nginx.conf @@ -0,0 +1,51 @@ +events { + worker_connections 1024; +} + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + upstream backend { + server 127.0.0.1:9119; + } + + server { + listen 80; + server_name localhost; + + # Serve static files + location / { + root /usr/share/nginx/html; + index index.html; + try_files $uri $uri/ /index.html; + } + + # Proxy API requests to backend + location /api/ { + proxy_pass http://backend; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + # Proxy OCI registry v2 requests to backend + location /v2/ { + proxy_pass http://backend; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + # Proxy registry requests to backend + location /registry/ { + proxy_pass http://backend; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + } +} \ No newline at end of file