Compare commits

..

5 Commits

Author SHA1 Message Date
loveuer
16e9d663f4 wip: 0.2.3
1. websocket hook
  2. rtc init ok
2025-05-20 18:04:37 +08:00
loveuer
becbc137c5 Merge tag 'v0.1.4' 2025-05-16 16:52:17 +08:00
loveuer
d41d516f19 fix: 0.1.4
All checks were successful
/ build ushare (push) Successful in 36s
/ clean (push) Successful in 0s
1. meta clean goroutine walk error
  2. clean interval to args(--clean)
2025-05-16 16:50:22 +08:00
loveuer
0a24393dcb fix: 0.1.4
All checks were successful
/ build ushare (push) Successful in 51s
/ clean (push) Successful in 0s
1. meta clean goroutine walk error
  2. clean interval to args(--clean)
2025-05-16 16:48:28 +08:00
loveuer
7c089bbe84 wip: 0.2.2
1. rtc init
2025-05-16 16:14:40 +08:00
14 changed files with 297 additions and 226 deletions

View File

@ -1,5 +1,5 @@
import {useState} from "react"; import {useState} from "react";
import {message} from "../component/message/u-message.tsx"; import {message} from "../hook/message/u-message.tsx";
export interface User { export interface User {
id: number; id: number;

View File

@ -0,0 +1,77 @@
import { useCallback, useRef } from 'react';
export interface Prop {
/** 事件处理函数(可选) */
fn?: (event: MessageEvent) => Promise<void>;
/** 最大重试次数(可选) */
retry?: number;
}
export const useWebsocket = (prop?: Prop) => {
const wsRef = useRef<WebSocket | null>(null);
const retryCountRef = useRef(0);
const reconnectTimerRef = useRef<number>(0);
const currentPropRef = useRef(prop);
// 更新最新 prop
currentPropRef.current = prop;
const connect = useCallback((url: string, connectProp?: Prop) => {
// 合并 prop 优先级connectProp > hook prop
const mergedProp = { ...currentPropRef.current, ...connectProp };
// 清理现有连接
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
clearTimeout(reconnectTimerRef.current);
const createConnection = () => {
const ws = new WebSocket(url);
ws.onopen = () => {
retryCountRef.current = 0;
};
ws.onmessage = (event) => {
mergedProp?.fn?.(event).catch(error => {
console.error('WebSocket message handler error:', error);
});
};
ws.onclose = (event) => {
const maxRetries = mergedProp?.retry ?? 0;
if (!event.wasClean && retryCountRef.current < maxRetries) {
retryCountRef.current += 1;
const retryDelay = Math.pow(2, retryCountRef.current) * 1000;
reconnectTimerRef.current = setTimeout(() => {
createConnection();
}, retryDelay);
}
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
ws.close();
};
wsRef.current = ws;
};
createConnection();
}, []);
const close = useCallback(() => {
wsRef.current?.close();
clearTimeout(reconnectTimerRef.current);
retryCountRef.current = currentPropRef.current?.retry || 0;
}, []);
return {
connect,
close
};
};

View File

@ -2,7 +2,7 @@ import {createUseStyles} from "react-jss";
import {UButton} from "../../component/button/u-button.tsx"; import {UButton} from "../../component/button/u-button.tsx";
import React, {useState} from "react"; import React, {useState} from "react";
import {useStore} from "../../store/share.ts"; import {useStore} from "../../store/share.ts";
import {message} from "../../component/message/u-message.tsx"; import {message} from "../../hook/message/u-message.tsx";
import {useFileUpload} from "../../api/upload.ts"; import {useFileUpload} from "../../api/upload.ts";
const useUploadStyle = createUseStyles({ const useUploadStyle = createUseStyles({

View File

@ -1,8 +1,8 @@
import {CloudBackground} from "../component/fluid/cloud.tsx"; import {CloudBackground} from "../component/fluid/cloud.tsx";
import {useEffect} from "react"; import {useEffect, useState} from "react";
import {createUseStyles} from "react-jss"; import {createUseStyles} from "react-jss";
import {useRTC} from "../store/rtc.ts"; import {useWebsocket} from "../hook/websocket/u-ws.tsx";
import {Client, useRoom} from "../store/local.ts"; import {Resp} from "../interface/response.ts";
const useClass = createUseStyles({ const useClass = createUseStyles({
'@global': { '@global': {
@ -68,10 +68,40 @@ interface Bubble {
angle: number; // 新增角度属性 angle: number; // 新增角度属性
} }
interface Client {
client_type: 'desktop' | 'mobile' | 'tablet';
app_type: 'web';
room: string;
ip: number;
name: string;
id: string;
register_at: string;
offer: RTCSessionDescription;
}
interface Store {
client: Client | null
clients: Client[]
rtc: RTCPeerConnection | null
offer: RTCSessionDescription | null
candidate: RTCIceCandidate | null
}
function setupDataChannel(ch: RTCDataChannel) {
ch.onopen = () => console.log('通道已打开!');
ch.onmessage = (e) => handleFileChunk(e.data);
ch.onclose = () => console.log('通道关闭');
}
// 接收文件块
function handleFileChunk(chunk: any) {
console.log("[D] rtc file chunk =", chunk)
}
export const LocalSharing: React.FC = () => { export const LocalSharing: React.FC = () => {
const classes = useClass(); const classes = useClass();
const {register, enter, list, cleanup, client, clients} = useRoom(); const [rtcStore, setRTCStore] = useState<Store>({} as Store)
const {connect, create} = useRTC(); const {connect, close} = useWebsocket({})
// 生成随机颜色 // 生成随机颜色
const generateColor = () => { const generateColor = () => {
@ -97,7 +127,7 @@ export const LocalSharing: React.FC = () => {
let attempt = 0; let attempt = 0;
let validPosition = false; let validPosition = false;
if (cs[index].id == client?.id) { if (cs[index].id == rtcStore.client?.id) {
continue continue
} }
@ -146,29 +176,68 @@ export const LocalSharing: React.FC = () => {
}; };
useEffect(() => { useEffect(() => {
register().then(() => { const fn = async () => {
enter().then(() => { const rtc = new RTCPeerConnection({iceServers: [{urls: "stun:stun.qq.com:3478"}]})
list().then() const dataChannel = rtc.createDataChannel('fileTransfer', {ordered: true});
setupDataChannel(dataChannel);
const waitCandidate = new Promise<RTCIceCandidate | null>(resolve => {
rtc.onicecandidate = (e) => {
resolve(e.candidate)
}
}) })
}); const waitNegotiationneeded = new Promise<void>(resolve => {
connect().then(() => { rtc.onnegotiationneeded = async () => {
console.log("[D] rtc create!!!") const _offer = await rtc.createOffer()
await rtc.setLocalDescription(_offer)
resolve()
}
}) })
return () => cleanup();
console.log('[D] rtc step 1')
await waitNegotiationneeded
const candidate: RTCIceCandidate | null = await waitCandidate;
if (!candidate) throw new Error("candidate is null")
console.log('[D] rtc step 2')
const res = await fetch("/api/ulocal/register", {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({candidate: candidate, offer: rtc.localDescription})
})
const jes = await res.json() as Resp<Client>;
setRTCStore(val => {
return {...val, client: jes.data, candidate: candidate, offer: rtc.localDescription}
})
const api = `${window.location.protocol === 'https' ? 'wss' : 'ws'}://${window.location.host}/api/ulocal/ws?id=${jes.data.id}`
console.log('[D] websocker url =', api)
connect(api, {})
const res2 = await fetch(`/api/ulocal/clients?room=${jes.data.room}`)
const jes2 = await res2.json() as Resp<Client[]>
setRTCStore(val => {
return {...val, clients: jes2.data}
})
}
fn()
return () => close();
}, []); }, []);
// 气泡点击处理 // 气泡点击处理
const handleBubbleClick = async (id: string) => { const handleBubbleClick = async (id: string) => {
console.log('[D] click bubble!!!', id) console.log('[D] click bubble!!!', id)
await create() // await link(id)
}; };
return <div className={classes.container}> return <div className={classes.container}>
<CloudBackground/> <CloudBackground/>
<h1 className={classes.title}>{client?.name}</h1> <h1 className={classes.title}>{rtcStore.client?.name}</h1>
{clients && generateBubbles(clients).map(bubble => { {rtcStore.clients && generateBubbles(rtcStore.clients).map(bubble => {
// const client = clients.find(c => c.id === bubble.id); // const client = clients.find(c => c.id === bubble.id);
return client ? ( return rtcStore.client ? (
<div <div
key={bubble.id} key={bubble.id}
className={classes.bubble} className={classes.bubble}

View File

@ -1,140 +0,0 @@
import {create} from 'zustand'
import {Resp} from "../interface/response.ts";
export interface Client {
client_type: 'desktop' | 'mobile' | 'tablet';
app_type: 'web';
room: string;
ip: number;
name: string;
id: string;
register_at: string;
}
type RoomState = {
conn: WebSocket | null
client: Client | null
clients: Client[]
retryCount: number
reconnectTimer: number | null
}
type RoomActions = {
register: () => Promise<void>
enter: () => Promise<void>
list: () => Promise<void>
cleanup: () => void
}
interface Message {
type: 'ping' | 'self' | 'enter' | 'leave';
time: number;
body: any;
}
const MAX_RETRY_DELAY = 30000 // 最大重试间隔30秒
const NORMAL_CLOSE_CODE = 1000 // 正常关闭的状态码
export const useRoom = create<RoomState & RoomActions>()((set, get) => ({
conn: null,
client: null,
clients: [],
retryCount: 0,
reconnectTimer: null,
register: async () => {
const api = `/api/ulocal/register`
const res = await fetch(api, {method: 'POST'})
const jes = await res.json() as Resp<Client>
return set(state => {
return {...state, client: jes.data}
})
},
enter: async () => {
const {conn, reconnectTimer} = get()
// 清理旧连接和定时器
if (reconnectTimer) clearTimeout(reconnectTimer)
if (conn) conn.close()
const api = `${window.location.protocol === 'https' ? 'wss' : 'ws'}://${window.location.host}/api/ulocal/ws?id=${get().client?.id}`
console.log('[D] websocket api =',api)
const newConn = new WebSocket(api)
newConn.onopen = () => {
set({conn: newConn, retryCount: 0}) // 重置重试计数器
}
newConn.onerror = (error) => {
console.error('WebSocket error:', error)
}
newConn.onmessage = (event) => {
const msg = JSON.parse(event.data) as Message;
console.log('[D] ws msg =', msg)
let nc: Client
switch (msg.type) {
case "enter":
nc = msg.body as Client
if(nc.id && nc.name && nc.id !== get().client?.id) {
console.log('[D] enter new client =', nc)
set(state => {
return {...state, clients: [...get().clients, nc]}
})
}
break
case "leave":
nc = msg.body as Client
if(nc.id) {
let idx = 0;
let items = get().clients;
for (const item of items) {
if (item.id === nc.id) {
items.splice(idx, 1)
set(state => {
return {...state, clients: items}
})
break;
}
idx++;
}
}
break
}
}
newConn.onclose = (event) => {
// 非正常关闭时触发重连
if (event.code !== NORMAL_CLOSE_CODE) {
const {retryCount} = get()
const nextRetry = retryCount + 1
const delay = Math.min(1000 * Math.pow(2, nextRetry), MAX_RETRY_DELAY)
const timer = setTimeout(() => {
get().register()
}, delay)
set({
retryCount: nextRetry,
reconnectTimer: timer,
conn: null
})
}
}
set({conn: newConn, reconnectTimer: null})
},
list: async () => {
const api = "/api/ulocal/clients?room="
const res = await fetch(api + get().client?.room)
const jes = await res.json() as Resp<Client[]>
set(state => {
return {...state, clients: jes.data}
})
},
cleanup: () => {
const {conn, reconnectTimer} = get()
if (reconnectTimer) clearTimeout(reconnectTimer)
if (conn) conn.close()
set({conn: null, retryCount: 0, reconnectTimer: null})
}
}))

View File

@ -1,50 +0,0 @@
import {create} from 'zustand'
type RTCState = {
conn: RTCPeerConnection | null
}
type RTCAction = {
connect: () => Promise<void>
create: () => Promise<void>
cleanup: () => void
}
export const useRTC = create<RTCState & RTCAction>()((set, get) => ({
conn: null,
connect: async () => {
const conn = new RTCPeerConnection()
const ch = conn.createDataChannel("fileTransfer", {ordered: true})
console.log('[D] channel =', ch)
ch.onopen = (event) => {
console.log('🚀🚀🚀 / rtc open event', event)
}
ch.onclose = (event) => {
}
ch.onerror = (event) => {
}
ch.onmessage = (event) => {
console.log('🚀🚀🚀 / rtc message event', event)
}
set((state) => {
return {...state, conn: conn}
})
},
create: async () => {
const conn = get().conn
if (conn) conn.onicecandidate = async (event) => {
console.log('[D] rtc local desc =', conn.localDescription)
const offer = await conn.createOffer()
await conn.setLocalDescription(offer)
}
},
cleanup: () => {
},
}))

View File

@ -26,6 +26,7 @@ func Start(ctx context.Context) <-chan struct{} {
{ {
api := app.Group("/api/ulocal") api := app.Group("/api/ulocal")
api.Post("/register", handler.LocalRegister()) api.Post("/register", handler.LocalRegister())
api.Post("/offer", handler.LocalOffer())
api.Get("/clients", handler.LocalClients()) api.Get("/clients", handler.LocalClients())
api.Get("/ws", handler.LocalWS()) api.Get("/ws", handler.LocalWS())
} }

View File

@ -135,14 +135,25 @@ func (m *meta) Start(ctx context.Context) {
// 清理一天前的文件 // 清理一天前的文件
go func() { go func() {
if opt.Cfg.CleanInterval <= 0 {
log.Warn("meta.Clean: no clean interval set, plz clean manual!!!")
return
}
ticker := time.NewTicker(5 * time.Minute) ticker := time.NewTicker(5 * time.Minute)
duration := time.Duration(opt.Cfg.CleanInterval) * time.Hour
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case now := <-ticker.C: case now := <-ticker.C:
//log.Debug("meta.Clean: 开始清理过期文件 = %v", duration)
_ = filepath.Walk(opt.Cfg.DataPath, func(path string, info os.FileInfo, err error) error { _ = filepath.Walk(opt.Cfg.DataPath, func(path string, info os.FileInfo, err error) error {
if info == nil {
return nil
}
if info.IsDir() { if info.IsDir() {
return nil return nil
} }
@ -168,12 +179,16 @@ func (m *meta) Start(ctx context.Context) {
code := strings.TrimPrefix(name, ".meta.") code := strings.TrimPrefix(name, ".meta.")
if now.Sub(time.UnixMilli(mi.CreatedAt)) > 24*time.Hour { if now.Sub(time.UnixMilli(mi.CreatedAt)) > duration {
log.Debug("controller.meta: file out of date, code = %s, user_key = %s", code, mi.Uploader) log.Debug("controller.meta: file out of date, code = %s, user_key = %s", code, mi.Uploader)
os.RemoveAll(opt.FilePath(code)) if err = os.RemoveAll(opt.FilePath(code)); err != nil {
os.RemoveAll(path) log.Warn("meta.Clean: remove file failed, file = %s, err = %s", opt.FilePath(code), err.Error())
}
if err = os.RemoveAll(path); err != nil {
log.Warn("meta.Clean: remove file failed, file = %s, err = %s", path, err.Error())
}
m.Lock() m.Lock()
delete(m.m, code) delete(m.m, code)

View File

@ -32,13 +32,24 @@ const (
type RoomMessageType string type RoomMessageType string
const ( const (
RoomMessageTypePing RoomMessageType = "ping"
RoomMessageTypeSelf RoomMessageType = "self"
RoomMessageTypeEnter RoomMessageType = "enter" RoomMessageTypeEnter RoomMessageType = "enter"
RoomMessageTypeLeave RoomMessageType = "leave" RoomMessageTypeLeave RoomMessageType = "leave"
) )
type RoomOffer struct {
SDP string `json:"sdp"`
Type string `json:"type"`
}
type RoomCandidate struct {
Candidate string `json:"candidate"`
SdpMid string `json:"sdpMid"`
SdpMLineIndex int `json:"sdpMLineIndex"`
UsernameFragment string `json:"usernameFragment"`
}
type roomClient struct { type roomClient struct {
sync.Mutex
controller *roomController controller *roomController
conn *websocket.Conn conn *websocket.Conn
ClientType RoomClientType `json:"client_type"` ClientType RoomClientType `json:"client_type"`
@ -48,6 +59,8 @@ type roomClient struct {
Name string `json:"name"` Name string `json:"name"`
Id string `json:"id"` Id string `json:"id"`
RegisterAt time.Time `json:"register_at"` RegisterAt time.Time `json:"register_at"`
Offer *RoomOffer `json:"offer"`
Candidate *RoomCandidate `json:"candidate"`
msgChan chan any msgChan chan any
} }
@ -90,10 +103,26 @@ func (rc *roomClient) start(ctx context.Context) {
rc.controller.Unregister(rc) rc.controller.Unregister(rc)
return return
case websocket.TextMessage: case websocket.TextMessage:
log.Info("RoomClient: received text message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs)) log.Debug("RoomClient: received text message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs))
case websocket.BinaryMessage: case websocket.BinaryMessage:
log.Debug("RoomClient: received bytes message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs))
// todo // todo
log.Info("RoomClient: received bytes message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs)) //msg := new(model.Message)
//if err = json.Unmarshal(bs, msg); err != nil {
// log.Error("RoomClient: unmarshal message failed, id = %s, name = %s, err = %s", rc.Id, rc.Name, err.Error())
// continue
//}
//
//switch msg.Type {
//case model.WSMessageTypeOffer:
// rc.Lock()
// rc.Offer = msg.Body
// rc.Unlock()
//case model.WSMessageTypeCandidate:
// rc.Lock()
// rc.Candidate = msg.Body
// rc.Unlock()
//}
} }
} }
}() }()
@ -134,7 +163,7 @@ func (rc *roomController) Start(ctx context.Context) {
}() }()
} }
func (rc *roomController) Register(ip, userAgent string) *roomClient { func (rc *roomController) Register(ip, userAgent string, candidate *RoomCandidate, offer *RoomOffer) *roomClient {
nrc := &roomClient{ nrc := &roomClient{
controller: rc, controller: rc,
ClientType: ClientTypeDesktop, ClientType: ClientTypeDesktop,
@ -144,6 +173,8 @@ func (rc *roomController) Register(ip, userAgent string) *roomClient {
Name: tool.RandomName(), Name: tool.RandomName(),
msgChan: make(chan any, 1), msgChan: make(chan any, 1),
RegisterAt: time.Now(), RegisterAt: time.Now(),
Candidate: candidate,
Offer: offer,
} }
ua := useragent.Parse(userAgent) ua := useragent.Parse(userAgent)
@ -240,3 +271,18 @@ func (rc *roomController) Unregister(client *roomClient) {
rc.Broadcast(key, map[string]any{"type": RoomMessageTypeLeave, "time": time.Now().UnixMilli(), "body": client}) rc.Broadcast(key, map[string]any{"type": RoomMessageTypeLeave, "time": time.Now().UnixMilli(), "body": client})
} }
func (rc *roomController) Offer(room, id string, offer *RoomOffer) {
if _, ok := rc.rooms[room]; !ok {
return
}
if _, ok := rc.rooms[room][id]; !ok {
return
}
rc.rooms[room][id].msgChan <- map[string]any{
"type": "offer",
"offer": offer,
}
}

View File

@ -11,10 +11,23 @@ import (
func LocalRegister() nf.HandlerFunc { func LocalRegister() nf.HandlerFunc {
return func(c *nf.Ctx) error { return func(c *nf.Ctx) error {
ip := c.IP(true) type Req struct {
ua := c.Get("User-Agent") Candidate *controller.RoomCandidate `json:"candidate"`
Offer *controller.RoomOffer `json:"offer"`
}
client := controller.RoomController.Register(ip, ua) var (
err error
req = new(Req)
ip = c.IP(true)
ua = c.Get("User-Agent")
)
if err = c.BodyParser(req); err != nil {
return c.Status(http.StatusBadRequest).JSON(map[string]interface{}{"msg": err.Error()})
}
client := controller.RoomController.Register(ip, ua, req.Candidate, req.Offer)
return resp.Resp200(c, client) return resp.Resp200(c, client)
} }
@ -61,3 +74,26 @@ func LocalWS() nf.HandlerFunc {
return nil return nil
} }
} }
func LocalOffer() nf.HandlerFunc {
return func(c *nf.Ctx) error {
type Req struct {
Room string `json:"room"`
Id string `json:"id"`
Offer *controller.RoomOffer `json:"offer"`
}
var (
err error
req = new(Req)
)
if err = c.BodyParser(req); err != nil {
return c.Status(http.StatusBadRequest).JSON(map[string]string{"err": err.Error()})
}
controller.RoomController.Offer(req.Room, req.Id, req.Offer)
return resp.Resp200(c, req.Offer)
}
}

14
internal/model/ws.go Normal file
View File

@ -0,0 +1,14 @@
package model
type WSMessageType string
const (
WSMessageTypeOffer WSMessageType = "offer"
WSMessageTypeCandidate WSMessageType = "candidate"
)
type Message struct {
Type WSMessageType `json:"type"`
Time int64 `json:"time"`
Body any `json:"body"`
}

View File

@ -11,6 +11,7 @@ type config struct {
Address string Address string
DataPath string DataPath string
Auth string Auth string
CleanInterval int
} }
var ( var (

View File

@ -7,6 +7,7 @@ import (
"github.com/loveuer/ushare/internal/api" "github.com/loveuer/ushare/internal/api"
"github.com/loveuer/ushare/internal/controller" "github.com/loveuer/ushare/internal/controller"
"github.com/loveuer/ushare/internal/opt" "github.com/loveuer/ushare/internal/opt"
"github.com/loveuer/ushare/internal/pkg/tool"
"os/signal" "os/signal"
"syscall" "syscall"
) )
@ -16,11 +17,12 @@ func init() {
flag.StringVar(&opt.Cfg.Address, "address", "0.0.0.0:9119", "") flag.StringVar(&opt.Cfg.Address, "address", "0.0.0.0:9119", "")
flag.StringVar(&opt.Cfg.DataPath, "data", "/data", "") flag.StringVar(&opt.Cfg.DataPath, "data", "/data", "")
flag.StringVar(&opt.Cfg.Auth, "auth", "", "auth required(admin, password)") flag.StringVar(&opt.Cfg.Auth, "auth", "", "auth required(admin, password)")
flag.IntVar(&opt.Cfg.CleanInterval, "clean", 24, "清理文件的周期, 单位: 小时, 0 则表示不自动清理")
flag.Parse() flag.Parse()
if opt.Cfg.Debug { if opt.Cfg.Debug {
log.SetLogLevel(log.LogLevelDebug) log.SetLogLevel(log.LogLevelDebug)
log.Debug("start server with debug mode") tool.TablePrinter(opt.Cfg)
} }
} }