diff --git a/frontend/src/api/auth.ts b/frontend/src/api/auth.ts index 5b29f78..7f9b084 100644 --- a/frontend/src/api/auth.ts +++ b/frontend/src/api/auth.ts @@ -1,5 +1,5 @@ import {useState} from "react"; -import {message} from "../component/message/u-message.tsx"; +import {message} from "../hook/message/u-message.tsx"; export interface User { id: number; diff --git a/frontend/src/component/message/u-message.tsx b/frontend/src/hook/message/u-message.tsx similarity index 100% rename from frontend/src/component/message/u-message.tsx rename to frontend/src/hook/message/u-message.tsx diff --git a/frontend/src/hook/websocket/u-ws.tsx b/frontend/src/hook/websocket/u-ws.tsx new file mode 100644 index 0000000..b8b0974 --- /dev/null +++ b/frontend/src/hook/websocket/u-ws.tsx @@ -0,0 +1,77 @@ +import { useCallback, useRef } from 'react'; + +export interface Prop { + /** 事件处理函数(可选) */ + fn?: (event: MessageEvent) => Promise; + /** 最大重试次数(可选) */ + retry?: number; +} + +export const useWebsocket = (prop?: Prop) => { + const wsRef = useRef(null); + const retryCountRef = useRef(0); + const reconnectTimerRef = useRef(0); + const currentPropRef = useRef(prop); + + // 更新最新 prop + currentPropRef.current = prop; + + const connect = useCallback((url: string, connectProp?: Prop) => { + // 合并 prop 优先级:connectProp > hook prop + const mergedProp = { ...currentPropRef.current, ...connectProp }; + + // 清理现有连接 + if (wsRef.current) { + wsRef.current.close(); + wsRef.current = null; + } + clearTimeout(reconnectTimerRef.current); + + const createConnection = () => { + const ws = new WebSocket(url); + + ws.onopen = () => { + retryCountRef.current = 0; + }; + + ws.onmessage = (event) => { + mergedProp?.fn?.(event).catch(error => { + console.error('WebSocket message handler error:', error); + }); + }; + + ws.onclose = (event) => { + const maxRetries = mergedProp?.retry ?? 0; + + if (!event.wasClean && retryCountRef.current < maxRetries) { + retryCountRef.current += 1; + const retryDelay = Math.pow(2, retryCountRef.current) * 1000; + + reconnectTimerRef.current = setTimeout(() => { + createConnection(); + }, retryDelay); + } + }; + + ws.onerror = (error) => { + console.error('WebSocket error:', error); + ws.close(); + }; + + wsRef.current = ws; + }; + + createConnection(); + }, []); + + const close = useCallback(() => { + wsRef.current?.close(); + clearTimeout(reconnectTimerRef.current); + retryCountRef.current = currentPropRef.current?.retry || 0; + }, []); + + return { + connect, + close + }; +}; \ No newline at end of file diff --git a/frontend/src/page/component/panel-left.tsx b/frontend/src/page/component/panel-left.tsx index 2b23240..5936f1b 100644 --- a/frontend/src/page/component/panel-left.tsx +++ b/frontend/src/page/component/panel-left.tsx @@ -2,7 +2,7 @@ import {createUseStyles} from "react-jss"; import {UButton} from "../../component/button/u-button.tsx"; import React, {useState} from "react"; import {useStore} from "../../store/share.ts"; -import {message} from "../../component/message/u-message.tsx"; +import {message} from "../../hook/message/u-message.tsx"; import {useFileUpload} from "../../api/upload.ts"; const useUploadStyle = createUseStyles({ diff --git a/frontend/src/page/local.tsx b/frontend/src/page/local.tsx index bf520bf..bd8281c 100644 --- a/frontend/src/page/local.tsx +++ b/frontend/src/page/local.tsx @@ -1,7 +1,8 @@ import {CloudBackground} from "../component/fluid/cloud.tsx"; -import {useEffect} from "react"; +import {useEffect, useState} from "react"; import {createUseStyles} from "react-jss"; -import {Client, useRoom} from "../store/local.ts"; +import {useWebsocket} from "../hook/websocket/u-ws.tsx"; +import {Resp} from "../interface/response.ts"; const useClass = createUseStyles({ '@global': { @@ -67,9 +68,40 @@ interface Bubble { angle: number; // 新增角度属性 } +interface Client { + client_type: 'desktop' | 'mobile' | 'tablet'; + app_type: 'web'; + room: string; + ip: number; + name: string; + id: string; + register_at: string; + offer: RTCSessionDescription; +} + +interface Store { + client: Client | null + clients: Client[] + rtc: RTCPeerConnection | null + offer: RTCSessionDescription | null + candidate: RTCIceCandidate | null +} + +function setupDataChannel(ch: RTCDataChannel) { + ch.onopen = () => console.log('通道已打开!'); + ch.onmessage = (e) => handleFileChunk(e.data); + ch.onclose = () => console.log('通道关闭'); +} + +// 接收文件块 +function handleFileChunk(chunk: any) { + console.log("[D] rtc file chunk =", chunk) +} + export const LocalSharing: React.FC = () => { const classes = useClass(); - const {register, enter, list, cleanup, client, clients} = useRoom(); + const [rtcStore, setRTCStore] = useState({} as Store) + const {connect, close} = useWebsocket({}) // 生成随机颜色 const generateColor = () => { @@ -95,7 +127,7 @@ export const LocalSharing: React.FC = () => { let attempt = 0; let validPosition = false; - if (cs[index].id == client?.id) { + if (cs[index].id == rtcStore.client?.id) { continue } @@ -144,27 +176,68 @@ export const LocalSharing: React.FC = () => { }; useEffect(() => { - register().then(() => { - setTimeout(() => { - enter().then(() => { - list().then() - }) - }, 600) - }); - return () => cleanup(); + const fn = async () => { + const rtc = new RTCPeerConnection({iceServers: [{urls: "stun:stun.qq.com:3478"}]}) + const dataChannel = rtc.createDataChannel('fileTransfer', {ordered: true}); + setupDataChannel(dataChannel); + const waitCandidate = new Promise(resolve => { + rtc.onicecandidate = (e) => { + resolve(e.candidate) + } + }) + const waitNegotiationneeded = new Promise(resolve => { + rtc.onnegotiationneeded = async () => { + const _offer = await rtc.createOffer() + await rtc.setLocalDescription(_offer) + resolve() + } + }) + + console.log('[D] rtc step 1') + + await waitNegotiationneeded + const candidate: RTCIceCandidate | null = await waitCandidate; + if (!candidate) throw new Error("candidate is null") + console.log('[D] rtc step 2') + + const res = await fetch("/api/ulocal/register", { + method: "POST", + headers: {"Content-Type": "application/json"}, + body: JSON.stringify({candidate: candidate, offer: rtc.localDescription}) + }) + const jes = await res.json() as Resp; + setRTCStore(val => { + return {...val, client: jes.data, candidate: candidate, offer: rtc.localDescription} + }) + + const api = `${window.location.protocol === 'https' ? 'wss' : 'ws'}://${window.location.host}/api/ulocal/ws?id=${jes.data.id}` + console.log('[D] websocker url =', api) + connect(api, {}) + + const res2 = await fetch(`/api/ulocal/clients?room=${jes.data.room}`) + const jes2 = await res2.json() as Resp + setRTCStore(val => { + return {...val, clients: jes2.data} + }) + } + + fn() + + return () => close(); }, []); // 气泡点击处理 const handleBubbleClick = async (id: string) => { console.log('[D] click bubble!!!', id) + // await link(id) }; return
-

{client?.name}

- {clients && generateBubbles(clients).map(bubble => { +

{rtcStore.client?.name}

+ {rtcStore.clients && generateBubbles(rtcStore.clients).map(bubble => { // const client = clients.find(c => c.id === bubble.id); - return client ? ( + return rtcStore.client ? (
Promise - enter: () => Promise - list: () => Promise - send: (file: File) => Promise - cleanup: () => void -} - -interface Message { - type: 'ping' | 'self' | 'enter' | 'leave'; - time: number; - body: any; -} - -function setupDataChannel(ch: RTCDataChannel) { - ch.onopen = () => console.log('通道已打开!'); - ch.onmessage = (e) => handleFileChunk(e.data); - ch.onclose = () => console.log('通道关闭'); -} - -// 接收文件块 -function handleFileChunk(chunk: any) { - console.log("[D] rtc file chunk =", chunk) -} - -const MAX_RETRY_DELAY = 30000 // 最大重试间隔30秒 -const NORMAL_CLOSE_CODE = 1000 // 正常关闭的状态码 - -export const useRoom = create()((set, get) => ({ - conn: null, - client: null, - clients: [], - pc: null, - ch: null, - candidate: null, - offer: null, - retryCount: 0, - reconnectTimer: null, - register: async () => { - let candidate: RTCIceCandidate; - let offer: RTCSessionDescription | null; - const rtc = new RTCPeerConnection({iceServers: [{urls: "stun:stun.qq.com:3478"}]}) - // 处理接收方DataChannel - rtc.ondatachannel = (e) => { - setupDataChannel(e.channel); - }; - - const waitCandidate = new Promise(resolve => { - rtc.onicecandidate = (e) => { - if (e.candidate) { - console.log('[D] candidate =', {candidate: e.candidate}) - candidate = e.candidate - } - resolve(); - } - }) - - // rtc.onicecandidate = (e) => { - // if (e.candidate) { - // console.log('[D] candidate =', {candidate: e.candidate}) - // candidate = e.candidate - // } - // } - - const waitOffer = new Promise(resolve => { - rtc.onnegotiationneeded = async () => { - await rtc.setLocalDescription(await rtc.createOffer()); - console.log("[D] offer =", {offer: rtc.localDescription}) - offer = rtc.localDescription - resolve(); - }; - }) - - // rtc.onnegotiationneeded = async () => { - // await rtc.setLocalDescription(await rtc.createOffer()); - // console.log("[D] offer =", {offer: rtc.localDescription}) - // offer = rtc.localDescription - // }; - - const ch = rtc.createDataChannel("fileTransfer", {ordered: true}) - - setupDataChannel(ch) - - - Promise.all([waitCandidate, waitOffer]).then(() => { - const api = `/api/ulocal/register` - fetch(api, { - method: 'POST', - headers: {"Content-Type": "application/json"}, - body: JSON.stringify({candidate: candidate, offer: offer}) - }).then(res => {return res.json() as unknown as Resp}).then(jes => { - set({client: jes.data, candidate: candidate, offer: offer}) - }) - }) - }, - enter: async () => { - const {conn, reconnectTimer} = get() - - // 清理旧连接和定时器 - if (reconnectTimer) clearTimeout(reconnectTimer) - if (conn) conn.close() - - const api = `${window.location.protocol === 'https' ? 'wss' : 'ws'}://${window.location.host}/api/ulocal/ws?id=${get().client?.id}` - console.log('[D] websocket api =', api) - const newConn = new WebSocket(api) - - newConn.onopen = () => { - - } - - newConn.onerror = (error) => { - console.error('WebSocket error:', error) - } - - newConn.onmessage = (event) => { - const msg = JSON.parse(event.data) as Message; - console.log('[D] ws msg =', msg) - let nc: Client - switch (msg.type) { - case "enter": - nc = msg.body as Client - if (nc.id && nc.name && nc.id !== get().client?.id) { - console.log('[D] enter new client =', nc) - set(state => { - return {...state, clients: [...get().clients, nc]} - }) - } - break - case "leave": - nc = msg.body as Client - if (nc.id) { - let idx = 0; - let items = get().clients; - for (const item of items) { - if (item.id === nc.id) { - items.splice(idx, 1) - set(state => { - return {...state, clients: items} - }) - break; - } - idx++; - } - } - break - } - } - - newConn.onclose = (event) => { - // 非正常关闭时触发重连 - if (event.code !== NORMAL_CLOSE_CODE) { - const {retryCount} = get() - const nextRetry = retryCount + 1 - const delay = Math.min(1000 * Math.pow(2, nextRetry), MAX_RETRY_DELAY) - - const timer = setTimeout(() => { - get().register() - }, delay) - - set({ - retryCount: nextRetry, - reconnectTimer: timer, - conn: null - }) - } - } - - set({conn: newConn, reconnectTimer: null}) - }, - list: async () => { - const api = "/api/ulocal/clients?room=" - const res = await fetch(api + get().client?.room) - const jes = await res.json() as Resp - set({clients: jes.data}) - }, - send: async (file: File) => { - const reader = new FileReader(); - const channel = get().ch!; - reader.onload = (e) => { - const chunkSize = 16384; // 16KB每块 - const buffer = e.target!.result! as ArrayBuffer; - let offset = 0; - while (offset < buffer.byteLength) { - const chunk = buffer.slice(offset, offset + chunkSize); - channel.send(chunk); - offset += chunkSize; - } - }; - reader.readAsArrayBuffer(file); - }, - cleanup: () => { - const {conn, reconnectTimer} = get() - if (reconnectTimer) clearTimeout(reconnectTimer) - if (conn) conn.close() - set({conn: null, retryCount: 0, reconnectTimer: null}) - } -})) diff --git a/internal/api/api.go b/internal/api/api.go index be94128..0097892 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -26,6 +26,7 @@ func Start(ctx context.Context) <-chan struct{} { { api := app.Group("/api/ulocal") api.Post("/register", handler.LocalRegister()) + api.Post("/offer", handler.LocalOffer()) api.Get("/clients", handler.LocalClients()) api.Get("/ws", handler.LocalWS()) } diff --git a/internal/controller/room.go b/internal/controller/room.go index 90fbea1..a462b3f 100644 --- a/internal/controller/room.go +++ b/internal/controller/room.go @@ -32,12 +32,22 @@ const ( type RoomMessageType string const ( - RoomMessageTypePing RoomMessageType = "ping" - RoomMessageTypeSelf RoomMessageType = "self" RoomMessageTypeEnter RoomMessageType = "enter" RoomMessageTypeLeave RoomMessageType = "leave" ) +type RoomOffer struct { + SDP string `json:"sdp"` + Type string `json:"type"` +} + +type RoomCandidate struct { + Candidate string `json:"candidate"` + SdpMid string `json:"sdpMid"` + SdpMLineIndex int `json:"sdpMLineIndex"` + UsernameFragment string `json:"usernameFragment"` +} + type roomClient struct { sync.Mutex controller *roomController @@ -49,8 +59,8 @@ type roomClient struct { Name string `json:"name"` Id string `json:"id"` RegisterAt time.Time `json:"register_at"` - Offer any `json:"offer"` - Candidate any `json:"candidate"` + Offer *RoomOffer `json:"offer"` + Candidate *RoomCandidate `json:"candidate"` msgChan chan any } @@ -153,7 +163,7 @@ func (rc *roomController) Start(ctx context.Context) { }() } -func (rc *roomController) Register(ip, userAgent string, candidate, offer any) *roomClient { +func (rc *roomController) Register(ip, userAgent string, candidate *RoomCandidate, offer *RoomOffer) *roomClient { nrc := &roomClient{ controller: rc, ClientType: ClientTypeDesktop, @@ -261,3 +271,18 @@ func (rc *roomController) Unregister(client *roomClient) { rc.Broadcast(key, map[string]any{"type": RoomMessageTypeLeave, "time": time.Now().UnixMilli(), "body": client}) } + +func (rc *roomController) Offer(room, id string, offer *RoomOffer) { + if _, ok := rc.rooms[room]; !ok { + return + } + + if _, ok := rc.rooms[room][id]; !ok { + return + } + + rc.rooms[room][id].msgChan <- map[string]any{ + "type": "offer", + "offer": offer, + } +} diff --git a/internal/handler/local.go b/internal/handler/local.go index 38b89ca..48d95a0 100644 --- a/internal/handler/local.go +++ b/internal/handler/local.go @@ -12,8 +12,8 @@ import ( func LocalRegister() nf.HandlerFunc { return func(c *nf.Ctx) error { type Req struct { - Candidate any `json:"candidate"` - Offer any `json:"offer"` + Candidate *controller.RoomCandidate `json:"candidate"` + Offer *controller.RoomOffer `json:"offer"` } var ( @@ -74,3 +74,26 @@ func LocalWS() nf.HandlerFunc { return nil } } + +func LocalOffer() nf.HandlerFunc { + return func(c *nf.Ctx) error { + type Req struct { + Room string `json:"room"` + Id string `json:"id"` + Offer *controller.RoomOffer `json:"offer"` + } + + var ( + err error + req = new(Req) + ) + + if err = c.BodyParser(req); err != nil { + return c.Status(http.StatusBadRequest).JSON(map[string]string{"err": err.Error()}) + } + + controller.RoomController.Offer(req.Room, req.Id, req.Offer) + + return resp.Resp200(c, req.Offer) + } +}