loveuer 244e749f54
Some checks failed
/ build ushare (push) Failing after 12s
/ clean (push) Successful in 0s
feat:
1. local msg/file share by webrtc
fix:
  1. meta clean goroutine walk error
  2. clean interval to args(--clean)
2025-06-23 23:03:27 +08:00

278 lines
6.6 KiB
Go

// room controller:
// local share websocket room controller
// same remote IP as a
package controller
import (
"context"
"encoding/json"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/loveuer/nf/nft/log"
"github.com/loveuer/ushare/internal/pkg/tool"
"github.com/mileusna/useragent"
"sync"
"time"
)
type RoomClientType string
const (
ClientTypeDesktop RoomClientType = "desktop"
ClientTypeMobile RoomClientType = "mobile"
ClientTypeTablet RoomClientType = "tablet"
)
type RoomAppType string
const (
RoomAppTypeWeb = "web"
)
type RoomMessageType string
const (
RoomMessageTypeEnter RoomMessageType = "enter"
RoomMessageTypeLeave RoomMessageType = "leave"
)
type RoomOffer struct {
SDP string `json:"sdp"`
Type string `json:"type"`
}
type RoomCandidate struct {
Candidate string `json:"candidate"`
SdpMid string `json:"sdpMid"`
SdpMLineIndex int `json:"sdpMLineIndex"`
UsernameFragment string `json:"usernameFragment"`
}
type roomClient struct {
sync.Mutex
controller *roomController
conn *websocket.Conn
ClientType RoomClientType `json:"client_type"`
AppType RoomAppType `json:"app_type"`
IP string `json:"ip"`
Name string `json:"name"`
Id string `json:"id"`
RegisterAt time.Time `json:"register_at"`
msgChan chan any
}
func (rc *roomClient) start(ctx context.Context) {
// start write
go func() {
for {
select {
case <-ctx.Done():
_ = rc.conn.Close()
return
case msg, _ := <-rc.msgChan:
err := rc.conn.WriteJSON(msg)
log.Debug("RoomClient: write json message, IP = %s, Id = %s, Name = %s, err = %v", rc.IP, rc.Id, rc.Name, err)
if err != nil {
log.Error("RoomClient: write json message failed, IP = %s, Id = %s, Name = %s, err = %s", rc.IP, rc.Id, rc.Name, err.Error())
}
}
}
}()
// start read
go func() {
for {
mt, bs, err := rc.conn.ReadMessage()
if err != nil {
log.Error("RoomClient: read message failed, IP = %s, Id = %s, Name = %s, err = %s", rc.IP, rc.Id, rc.Name, err.Error())
rc.controller.Unregister(rc)
return
}
switch mt {
case websocket.PingMessage:
rs, _ := json.Marshal(map[string]any{"type": "pong", "time": time.Now().UnixMilli(), "Id": rc.Id, "Name": rc.Name})
if err := rc.conn.WriteMessage(websocket.PongMessage, rs); err != nil {
log.Error("RoomClient: response ping message failed, IP = %s, Id = %s, Name = %s, err = %s", rc.IP, rc.Id, rc.Name, err.Error())
}
case websocket.CloseMessage:
log.Debug("RoomClient: received close message, unregister IP = %s Id = %s, Name = %s", rc.IP, rc.Id, rc.Name)
rc.controller.Unregister(rc)
return
case websocket.TextMessage:
log.Debug("RoomClient: received text message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs))
case websocket.BinaryMessage:
log.Debug("RoomClient: received bytes message, IP = %s, Id = %s, Name = %s, text = %s", rc.IP, rc.Id, rc.Name, string(bs))
// todo
//msg := new(model.Message)
//if err = json.Unmarshal(bs, msg); err != nil {
// log.Error("RoomClient: unmarshal message failed, id = %s, name = %s, err = %s", rc.Id, rc.Name, err.Error())
// continue
//}
//
//switch msg.Type {
//case model.WSMessageTypeOffer:
// rc.Lock()
// rc.Offer = msg.Body
// rc.Unlock()
//case model.WSMessageTypeCandidate:
// rc.Lock()
// rc.Candidate = msg.Body
// rc.Unlock()
//}
}
}
}()
}
type roomController struct {
sync.Mutex
ctx context.Context
//rooms map[string]map[string]*roomClient // map[room_id(remote-IP)][Id]
pre map[string]*roomClient
clients map[string]*roomClient
}
var (
RoomController = &roomController{
pre: make(map[string]*roomClient),
clients: make(map[string]*roomClient),
}
)
func (rc *roomController) Start(ctx context.Context) {
rc.ctx = ctx
}
func (rc *roomController) Register(ip, userAgent string) *roomClient {
nrc := &roomClient{
controller: rc,
ClientType: ClientTypeDesktop,
AppType: RoomAppTypeWeb,
IP: ip,
Id: uuid.Must(uuid.NewV7()).String(),
Name: tool.RandomName(),
msgChan: make(chan any, 1),
RegisterAt: time.Now(),
}
ua := useragent.Parse(userAgent)
switch {
case ua.Mobile:
nrc.ClientType = ClientTypeMobile
case ua.Tablet:
nrc.ClientType = ClientTypeTablet
}
rc.Lock()
defer rc.Unlock()
rc.pre[nrc.Id] = nrc
return nrc
}
func (rc *roomController) Enter(conn *websocket.Conn, id string) *roomClient {
log.Debug("controller.room: registry client, id = %s", id)
rc.Lock()
defer rc.Unlock()
nrc, ok := rc.pre[id]
if !ok {
return nil
}
nrc.conn = conn
nrc.start(rc.ctx)
rc.Broadcast(map[string]any{"type": "enter", "time": time.Now().UnixMilli(), "body": nrc})
delete(rc.pre, nrc.Id)
rc.clients[nrc.Id] = nrc
return nrc
}
func (rc *roomController) List() []*roomClient {
clientList := make([]*roomClient, 0)
for _, client := range rc.clients {
clientList = append(clientList, client)
}
return clientList
}
func (rc *roomController) Broadcast(msg any) {
for _, client := range rc.clients {
select {
case client.msgChan <- msg:
case <-time.After(2 * time.Second):
log.Warn("RoomController: broadcast timeout, client Id = %s, IP = %s", client.Id, client.IP)
}
}
}
func (rc *roomController) Unregister(client *roomClient) {
log.Debug("controller.room: unregister client, IP = %s, Id = %s, Name = %s", client.IP, client.Id, client.Name)
rc.Lock()
delete(rc.clients, client.Id)
rc.Unlock()
rc.Broadcast(map[string]any{"type": RoomMessageTypeLeave, "time": time.Now().UnixMilli(), "body": client})
}
func (rc *roomController) Offer(id, from string, offer *RoomOffer) {
if _, ok := rc.clients[id]; !ok {
return
}
rc.clients[id].msgChan <- map[string]any{
"type": "offer",
"time": time.Now().UnixMilli(),
"data": map[string]any{
"id": id,
"from": from,
"offer": offer,
},
}
}
func (rc *roomController) Answer(id string, answer *RoomOffer) {
if _, ok := rc.clients[id]; !ok {
return
}
rc.clients[id].msgChan <- map[string]any{
"type": "answer",
"time": time.Now().UnixMilli(),
"data": map[string]any{
"id": id,
"answer": answer,
},
}
}
func (rc *roomController) Candidate(id string, candidate *RoomCandidate) {
if _, ok := rc.clients[id]; !ok {
return
}
for _, client := range rc.clients {
if client.Id == id {
continue
}
client.msgChan <- map[string]any{
"type": "candidate",
"time": time.Now().UnixMilli(),
"data": map[string]any{
"id": client.Id,
"candidate": candidate,
},
}
}
}