From 7c089bbe841a0479765bef332249f06bf45f4404 Mon Sep 17 00:00:00 2001 From: loveuer Date: Fri, 16 May 2025 16:14:40 +0800 Subject: [PATCH] wip: 0.2.2 1. rtc init --- frontend/src/page/local.tsx | 14 ++--- frontend/src/store/local.ts | 108 +++++++++++++++++++++++++++++++----- frontend/src/store/rtc.ts | 50 ----------------- internal/controller/room.go | 27 ++++++++- internal/handler/local.go | 19 ++++++- internal/model/ws.go | 14 +++++ 6 files changed, 154 insertions(+), 78 deletions(-) delete mode 100644 frontend/src/store/rtc.ts create mode 100644 internal/model/ws.go diff --git a/frontend/src/page/local.tsx b/frontend/src/page/local.tsx index 9c15fbf..bf520bf 100644 --- a/frontend/src/page/local.tsx +++ b/frontend/src/page/local.tsx @@ -1,7 +1,6 @@ import {CloudBackground} from "../component/fluid/cloud.tsx"; import {useEffect} from "react"; import {createUseStyles} from "react-jss"; -import {useRTC} from "../store/rtc.ts"; import {Client, useRoom} from "../store/local.ts"; const useClass = createUseStyles({ @@ -71,7 +70,6 @@ interface Bubble { export const LocalSharing: React.FC = () => { const classes = useClass(); const {register, enter, list, cleanup, client, clients} = useRoom(); - const {connect, create} = useRTC(); // 生成随机颜色 const generateColor = () => { @@ -147,20 +145,18 @@ export const LocalSharing: React.FC = () => { useEffect(() => { register().then(() => { - enter().then(() => { - list().then() - }) + setTimeout(() => { + enter().then(() => { + list().then() + }) + }, 600) }); - connect().then(() => { - console.log("[D] rtc create!!!") - }) return () => cleanup(); }, []); // 气泡点击处理 const handleBubbleClick = async (id: string) => { console.log('[D] click bubble!!!', id) - await create() }; return
diff --git a/frontend/src/store/local.ts b/frontend/src/store/local.ts index 219c20f..0da4367 100644 --- a/frontend/src/store/local.ts +++ b/frontend/src/store/local.ts @@ -15,6 +15,10 @@ type RoomState = { conn: WebSocket | null client: Client | null clients: Client[] + pc: RTCPeerConnection | null + ch: RTCDataChannel | null + candidate: RTCIceCandidate | null + offer: RTCSessionDescription | null retryCount: number reconnectTimer: number | null } @@ -23,6 +27,7 @@ type RoomActions = { register: () => Promise enter: () => Promise list: () => Promise + send: (file: File) => Promise cleanup: () => void } @@ -32,6 +37,17 @@ interface Message { body: any; } +function setupDataChannel(ch: RTCDataChannel) { + ch.onopen = () => console.log('通道已打开!'); + ch.onmessage = (e) => handleFileChunk(e.data); + ch.onclose = () => console.log('通道关闭'); +} + +// 接收文件块 +function handleFileChunk(chunk: any) { + console.log("[D] rtc file chunk =", chunk) +} + const MAX_RETRY_DELAY = 30000 // 最大重试间隔30秒 const NORMAL_CLOSE_CODE = 1000 // 正常关闭的状态码 @@ -39,14 +55,67 @@ export const useRoom = create()((set, get) => ({ conn: null, client: null, clients: [], + pc: null, + ch: null, + candidate: null, + offer: null, retryCount: 0, reconnectTimer: null, register: async () => { - const api = `/api/ulocal/register` - const res = await fetch(api, {method: 'POST'}) - const jes = await res.json() as Resp - return set(state => { - return {...state, client: jes.data} + let candidate: RTCIceCandidate; + let offer: RTCSessionDescription | null; + const rtc = new RTCPeerConnection({iceServers: [{urls: "stun:stun.qq.com:3478"}]}) + // 处理接收方DataChannel + rtc.ondatachannel = (e) => { + setupDataChannel(e.channel); + }; + + const waitCandidate = new Promise(resolve => { + rtc.onicecandidate = (e) => { + if (e.candidate) { + console.log('[D] candidate =', {candidate: e.candidate}) + candidate = e.candidate + } + resolve(); + } + }) + + // rtc.onicecandidate = (e) => { + // if (e.candidate) { + // console.log('[D] candidate =', {candidate: e.candidate}) + // candidate = e.candidate + // } + // } + + const waitOffer = new Promise(resolve => { + rtc.onnegotiationneeded = async () => { + await rtc.setLocalDescription(await rtc.createOffer()); + console.log("[D] offer =", {offer: rtc.localDescription}) + offer = rtc.localDescription + resolve(); + }; + }) + + // rtc.onnegotiationneeded = async () => { + // await rtc.setLocalDescription(await rtc.createOffer()); + // console.log("[D] offer =", {offer: rtc.localDescription}) + // offer = rtc.localDescription + // }; + + const ch = rtc.createDataChannel("fileTransfer", {ordered: true}) + + setupDataChannel(ch) + + + Promise.all([waitCandidate, waitOffer]).then(() => { + const api = `/api/ulocal/register` + fetch(api, { + method: 'POST', + headers: {"Content-Type": "application/json"}, + body: JSON.stringify({candidate: candidate, offer: offer}) + }).then(res => {return res.json() as unknown as Resp}).then(jes => { + set({client: jes.data, candidate: candidate, offer: offer}) + }) }) }, enter: async () => { @@ -57,11 +126,11 @@ export const useRoom = create()((set, get) => ({ if (conn) conn.close() const api = `${window.location.protocol === 'https' ? 'wss' : 'ws'}://${window.location.host}/api/ulocal/ws?id=${get().client?.id}` - console.log('[D] websocket api =',api) + console.log('[D] websocket api =', api) const newConn = new WebSocket(api) newConn.onopen = () => { - set({conn: newConn, retryCount: 0}) // 重置重试计数器 + } newConn.onerror = (error) => { @@ -75,7 +144,7 @@ export const useRoom = create()((set, get) => ({ switch (msg.type) { case "enter": nc = msg.body as Client - if(nc.id && nc.name && nc.id !== get().client?.id) { + if (nc.id && nc.name && nc.id !== get().client?.id) { console.log('[D] enter new client =', nc) set(state => { return {...state, clients: [...get().clients, nc]} @@ -84,7 +153,7 @@ export const useRoom = create()((set, get) => ({ break case "leave": nc = msg.body as Client - if(nc.id) { + if (nc.id) { let idx = 0; let items = get().clients; for (const item of items) { @@ -93,7 +162,7 @@ export const useRoom = create()((set, get) => ({ set(state => { return {...state, clients: items} }) - break; + break; } idx++; } @@ -127,9 +196,22 @@ export const useRoom = create()((set, get) => ({ const api = "/api/ulocal/clients?room=" const res = await fetch(api + get().client?.room) const jes = await res.json() as Resp - set(state => { - return {...state, clients: jes.data} - }) + set({clients: jes.data}) + }, + send: async (file: File) => { + const reader = new FileReader(); + const channel = get().ch!; + reader.onload = (e) => { + const chunkSize = 16384; // 16KB每块 + const buffer = e.target!.result! as ArrayBuffer; + let offset = 0; + while (offset < buffer.byteLength) { + const chunk = buffer.slice(offset, offset + chunkSize); + channel.send(chunk); + offset += chunkSize; + } + }; + reader.readAsArrayBuffer(file); }, cleanup: () => { const {conn, reconnectTimer} = get() diff --git a/frontend/src/store/rtc.ts b/frontend/src/store/rtc.ts deleted file mode 100644 index 746c905..0000000 --- a/frontend/src/store/rtc.ts +++ /dev/null @@ -1,50 +0,0 @@ -import {create} from 'zustand' - -type RTCState = { - conn: RTCPeerConnection | null -} - -type RTCAction = { - connect: () => Promise - create: () => Promise - cleanup: () => void -} - -export const useRTC = create()((set, get) => ({ - conn: null, - connect: async () => { - const conn = new RTCPeerConnection() - - const ch = conn.createDataChannel("fileTransfer", {ordered: true}) - - console.log('[D] channel =', ch) - - ch.onopen = (event) => { - console.log('🚀🚀🚀 / rtc open event', event) - } - - ch.onclose = (event) => { - } - - ch.onerror = (event) => { - } - - ch.onmessage = (event) => { - console.log('🚀🚀🚀 / rtc message event', event) - } - - set((state) => { - return {...state, conn: conn} - }) - }, - create: async () => { - const conn = get().conn - if (conn) conn.onicecandidate = async (event) => { - console.log('[D] rtc local desc =', conn.localDescription) - const offer = await conn.createOffer() - await conn.setLocalDescription(offer) - } - }, - cleanup: () => { - }, -})) \ No newline at end of file diff --git a/internal/controller/room.go b/internal/controller/room.go index 66835c5..90fbea1 100644 --- a/internal/controller/room.go +++ b/internal/controller/room.go @@ -39,6 +39,7 @@ const ( ) type roomClient struct { + sync.Mutex controller *roomController conn *websocket.Conn ClientType RoomClientType `json:"client_type"` @@ -48,6 +49,8 @@ type roomClient struct { Name string `json:"name"` Id string `json:"id"` RegisterAt time.Time `json:"register_at"` + Offer any `json:"offer"` + Candidate any `json:"candidate"` msgChan chan any } @@ -90,10 +93,26 @@ func (rc *roomClient) start(ctx context.Context) { rc.controller.Unregister(rc) return case websocket.TextMessage: - log.Info("RoomClient: received text message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs)) + log.Debug("RoomClient: received text message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs)) case websocket.BinaryMessage: + log.Debug("RoomClient: received bytes message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs)) // todo - log.Info("RoomClient: received bytes message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs)) + //msg := new(model.Message) + //if err = json.Unmarshal(bs, msg); err != nil { + // log.Error("RoomClient: unmarshal message failed, id = %s, name = %s, err = %s", rc.Id, rc.Name, err.Error()) + // continue + //} + // + //switch msg.Type { + //case model.WSMessageTypeOffer: + // rc.Lock() + // rc.Offer = msg.Body + // rc.Unlock() + //case model.WSMessageTypeCandidate: + // rc.Lock() + // rc.Candidate = msg.Body + // rc.Unlock() + //} } } }() @@ -134,7 +153,7 @@ func (rc *roomController) Start(ctx context.Context) { }() } -func (rc *roomController) Register(ip, userAgent string) *roomClient { +func (rc *roomController) Register(ip, userAgent string, candidate, offer any) *roomClient { nrc := &roomClient{ controller: rc, ClientType: ClientTypeDesktop, @@ -144,6 +163,8 @@ func (rc *roomController) Register(ip, userAgent string) *roomClient { Name: tool.RandomName(), msgChan: make(chan any, 1), RegisterAt: time.Now(), + Candidate: candidate, + Offer: offer, } ua := useragent.Parse(userAgent) diff --git a/internal/handler/local.go b/internal/handler/local.go index f452880..38b89ca 100644 --- a/internal/handler/local.go +++ b/internal/handler/local.go @@ -11,10 +11,23 @@ import ( func LocalRegister() nf.HandlerFunc { return func(c *nf.Ctx) error { - ip := c.IP(true) - ua := c.Get("User-Agent") + type Req struct { + Candidate any `json:"candidate"` + Offer any `json:"offer"` + } - client := controller.RoomController.Register(ip, ua) + var ( + err error + req = new(Req) + ip = c.IP(true) + ua = c.Get("User-Agent") + ) + + if err = c.BodyParser(req); err != nil { + return c.Status(http.StatusBadRequest).JSON(map[string]interface{}{"msg": err.Error()}) + } + + client := controller.RoomController.Register(ip, ua, req.Candidate, req.Offer) return resp.Resp200(c, client) } diff --git a/internal/model/ws.go b/internal/model/ws.go new file mode 100644 index 0000000..d315e76 --- /dev/null +++ b/internal/model/ws.go @@ -0,0 +1,14 @@ +package model + +type WSMessageType string + +const ( + WSMessageTypeOffer WSMessageType = "offer" + WSMessageTypeCandidate WSMessageType = "candidate" +) + +type Message struct { + Type WSMessageType `json:"type"` + Time int64 `json:"time"` + Body any `json:"body"` +}