init
This commit is contained in:
58
internal/models/message.go
Normal file
58
internal/models/message.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// Message 核心变更:增加 Room 字段,移除歧义字段
|
||||
type Message struct {
|
||||
Type string `json:"type"` // 消息类型(严格按常量)
|
||||
User string `json:"user"` // 发送者(客户端填,服务端可校验)
|
||||
Content string `json:"content"` // 消息内容
|
||||
Time string `json:"time"` // 服务端统一覆盖为 RFC3339(防客户端篡改)
|
||||
To string `json:"to,omitempty"` // 私聊目标(仅 type=private 时有效)
|
||||
Room string `json:"room,omitempty"` // 房间ID(仅 type=room 时有效)
|
||||
}
|
||||
|
||||
// 消息类型常量
|
||||
const (
|
||||
MsgTypeLogin = "login" // 服务端生成:用户上线事件
|
||||
MsgTypeLogout = "logout" // 服务端生成:用户下线事件
|
||||
MsgTypeBroadcast = "broadcast" // 全体用户广播(无视房间)
|
||||
MsgTypeRoom = "room" // 房间消息(必须带 Room 字段)
|
||||
MsgTypePrivate = "private" // 私聊(必须带 To 字段)
|
||||
MsgTypeSystem = "system" // 服务端生成:系统通知
|
||||
MsgTypeError = "error" // 服务端生成:定向错误提示
|
||||
)
|
||||
|
||||
// 发送广播消息
|
||||
func SendBroadcastMessage(user, content string) *Message {
|
||||
return &Message{
|
||||
Type: MsgTypeBroadcast,
|
||||
User: user,
|
||||
Content: content,
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
|
||||
// 发送房间消息
|
||||
func SendRoomMessage(user, roomID, content string) *Message {
|
||||
return &Message{
|
||||
Type: MsgTypeRoom,
|
||||
User: user,
|
||||
Room: roomID, // 显式绑定房间
|
||||
Content: content,
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
|
||||
// 发送私聊消息
|
||||
func SendPrivateMessage(sender, recipient, content string) *Message {
|
||||
return &Message{
|
||||
Type: MsgTypePrivate,
|
||||
User: sender,
|
||||
To: recipient, // 显式指定接收者
|
||||
Content: content,
|
||||
Time: time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
}
|
||||
115
internal/rabbitmq/client.go
Normal file
115
internal/rabbitmq/client.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
// Client 封装 RabbitMQ 连接和操作
|
||||
type Client struct {
|
||||
conn *amqp091.Connection // 连接
|
||||
channel *amqp091.Channel // 通道
|
||||
ExchangeName string // 交换机名称
|
||||
}
|
||||
|
||||
// Publish 向交换机发布消息
|
||||
func (c *Client) Publish(exchange, routingKey string, body []byte) error {
|
||||
return c.channel.Publish(
|
||||
exchange,
|
||||
routingKey,
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp091.Publishing{
|
||||
ContentType: "application/json",
|
||||
Body: body,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// NewClient 创建 RabbitMQ 客户端
|
||||
func NewClient(uri string) (*Client, error) {
|
||||
conn, err := amqp091.Dial(uri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 交换机名称
|
||||
exchangeName := "chat.exchange"
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
// 声明交换机
|
||||
err = ch.ExchangeDeclare(
|
||||
exchangeName,
|
||||
"topic",
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
fmt.Println("ExchangeDeclare error:", err)
|
||||
conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
return &Client{
|
||||
conn: conn,
|
||||
channel: ch,
|
||||
ExchangeName: exchangeName, // 记录名称
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close 关闭连接
|
||||
func (c *Client) Close() {
|
||||
c.channel.Close()
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
// DeclareQueue 创建唯一队列(自动删除)
|
||||
func (c *Client) DeclareQueue() (string, error) {
|
||||
// 队列名格式: chat.queue.{timestamp}.{random}
|
||||
queue, err := c.channel.QueueDeclare(
|
||||
"", // 队列名 空 → 自动生成唯一名称
|
||||
false, // 持久化
|
||||
true, // 自动删除
|
||||
true, // 独占
|
||||
false, // 是否等待确认
|
||||
nil, // 参数
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("error: %v", err), err
|
||||
}
|
||||
return queue.Name, nil
|
||||
}
|
||||
|
||||
// BindQueue 绑定队列到交换机
|
||||
func (c *Client) BindQueue(queueName, routingKey string) error {
|
||||
err := c.channel.QueueBind(
|
||||
queueName,
|
||||
routingKey,
|
||||
c.ExchangeName,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConsumeMessages 消费消息
|
||||
func (c *Client) Consume(queueName string) (<-chan amqp091.Delivery, error) {
|
||||
return c.channel.Consume(
|
||||
queueName, // Queue
|
||||
"", // Consumer tag (自动生成)
|
||||
false, // Auto-ack: 手动 ACK(关键!)
|
||||
false, // 独占
|
||||
false, // 不本地
|
||||
false, // 不等待
|
||||
nil, // 参数
|
||||
)
|
||||
}
|
||||
233
internal/ws/connection.go
Normal file
233
internal/ws/connection.go
Normal file
@@ -0,0 +1,233 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"ChatRoom/internal/models"
|
||||
"ChatRoom/internal/rabbitmq"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// 常量
|
||||
const (
|
||||
writeWait = 10 * time.Second //写入等待时间
|
||||
pongWait = 60 * time.Second //pong等待时间
|
||||
pingPeriod = (pongWait * 9) / 10 //ping周期
|
||||
maxMessageSize = 512 //最大消息大小
|
||||
)
|
||||
|
||||
// 升级器
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024, //读取缓冲区大小
|
||||
WriteBufferSize: 1024, //写入缓冲区大小
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true // 允许跨域(开发阶段)允许所有来源
|
||||
},
|
||||
}
|
||||
|
||||
// 连接
|
||||
type Connection struct {
|
||||
wsConn *websocket.Conn //websocket连接
|
||||
rmqClient *rabbitmq.Client //rabbitmq客户端
|
||||
queueName string //队列名称
|
||||
userID string //用户ID
|
||||
send chan []byte //发送通道
|
||||
}
|
||||
|
||||
func NewConnection(w http.ResponseWriter, r *http.Request, rmq *rabbitmq.Client) {
|
||||
// 1. 升级 HTTP 到 WebSocket
|
||||
wsConn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Printf("WebSocket 升级失败: %v", err)
|
||||
return
|
||||
}
|
||||
// 2. 从 URL 获取用户ID
|
||||
userID := r.URL.Query().Get("user")
|
||||
if userID == "" {
|
||||
wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(4001, "user required"))
|
||||
wsConn.Close()
|
||||
return
|
||||
}
|
||||
// 3. 为用户创建 RabbitMQ 队列
|
||||
queueName, err := rmq.DeclareQueue()
|
||||
if err != nil {
|
||||
log.Printf("RabbitMQ 队列创建失败: %v", err)
|
||||
wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(5000, "queue error"))
|
||||
wsConn.Close()
|
||||
return
|
||||
}
|
||||
// 4. 绑定基础路由(全局/私聊/事件)
|
||||
bindings := []string{
|
||||
"chat.global", // 全体广播
|
||||
"chat.user." + userID, // 私聊
|
||||
"chat.event.*", // 上下线事件
|
||||
"chat.system", // 系统通知
|
||||
}
|
||||
for _, rk := range bindings {
|
||||
if err := rmq.BindQueue(queueName, rk); err != nil {
|
||||
log.Printf("绑定失败 [%s]: %v", rk, err)
|
||||
// 可选择继续或关闭连接
|
||||
}
|
||||
}
|
||||
|
||||
// 5. 创建 Connection 对象
|
||||
conn := &Connection{
|
||||
wsConn: wsConn,
|
||||
rmqClient: rmq,
|
||||
queueName: queueName,
|
||||
userID: userID,
|
||||
send: make(chan []byte, 256),
|
||||
}
|
||||
|
||||
// 发送登录成功消息
|
||||
systemMsg := &models.Message{
|
||||
Type: models.MsgTypeSystem,
|
||||
User: "system",
|
||||
Content: "登录成功",
|
||||
Time: time.Now().UTC().Format(time.RFC3339), // 服务端覆盖 Time
|
||||
}
|
||||
if data, _ := json.Marshal(systemMsg); len(data) > 0 {
|
||||
conn.send <- data
|
||||
}
|
||||
|
||||
go conn.writePump()
|
||||
go conn.readPump()
|
||||
go conn.consumeFromRabbitMQ()
|
||||
}
|
||||
|
||||
func (c *Connection) readPump() {
|
||||
defer c.wsConn.Close()
|
||||
|
||||
c.wsConn.SetReadLimit(maxMessageSize)
|
||||
c.wsConn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
c.wsConn.SetPongHandler(func(string) error {
|
||||
c.wsConn.SetReadDeadline(time.Now().Add(pongWait))
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
_, rawMsg, err := c.wsConn.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) {
|
||||
log.Printf("WebSocket 读取错误: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
var msg models.Message
|
||||
if err := json.Unmarshal(rawMsg, &msg); err != nil {
|
||||
c.sendError("无效消息格式")
|
||||
continue
|
||||
}
|
||||
|
||||
// 校验必填字段:content 必填,user 由服务端覆盖(防伪造)
|
||||
if msg.Content == "" {
|
||||
c.sendError("缺少 content 字段")
|
||||
continue
|
||||
}
|
||||
|
||||
msg.User = c.userID
|
||||
msg.Time = time.Now().UTC().Format(time.RFC3339) // 关键!服务端统一时间
|
||||
|
||||
// 生成路由键
|
||||
routingKey := c.getRoutingKey(&msg)
|
||||
if routingKey == "" {
|
||||
c.sendError("不支持的消息类型或缺少必要字段")
|
||||
continue
|
||||
}
|
||||
|
||||
// 序列化并发布
|
||||
body, _ := json.Marshal(msg)
|
||||
if err := c.rmqClient.Publish(c.rmqClient.ExchangeName, routingKey, body); err != nil {
|
||||
log.Printf("RabbitMQ 发布失败 [%s]: %v", routingKey, err)
|
||||
c.sendError("消息发送失败")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) getRoutingKey(msg *models.Message) string {
|
||||
switch msg.Type {
|
||||
case models.MsgTypeBroadcast:
|
||||
return "chat.global"
|
||||
case models.MsgTypeRoom:
|
||||
if msg.Room == "" {
|
||||
return ""
|
||||
}
|
||||
return "chat.room." + msg.Room
|
||||
case models.MsgTypePrivate:
|
||||
if msg.To == "" {
|
||||
return ""
|
||||
}
|
||||
return "chat.user." + msg.To
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) sendError(content string) {
|
||||
errMsg := &models.Message{
|
||||
Type: models.MsgTypeError,
|
||||
User: "system",
|
||||
Content: content,
|
||||
Time: time.Now().UTC().Format(time.RFC3339), // 服务端覆盖 Time
|
||||
}
|
||||
if data, _ := json.Marshal(errMsg); len(data) > 0 {
|
||||
select {
|
||||
case c.send <- data:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) writePump() {
|
||||
ticker := time.NewTicker(pingPeriod)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.wsConn.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.wsConn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if !ok {
|
||||
c.wsConn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
w, err := c.wsConn.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write(message)
|
||||
if err := w.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
c.wsConn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
if err := c.wsConn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) consumeFromRabbitMQ() {
|
||||
deliveries, err := c.rmqClient.Consume(c.queueName)
|
||||
if err != nil {
|
||||
log.Printf("RabbitMQ 消费启动失败: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for delivery := range deliveries {
|
||||
select {
|
||||
case c.send <- delivery.Body: // 消息体已是 JSON(含服务端设置的 Time)
|
||||
delivery.Ack(false)
|
||||
default:
|
||||
delivery.Nack(false, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user