commit 640318cd24fe0b859f3266255e42b48d3f5eac66 Author: RichZDS <3388214266@qq.com> Date: Tue Feb 3 23:45:27 2026 +0800 init diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..9d80972 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,14 @@ +.git +.idea +.vscode +*.exe +*.test +bin +dist +tmp +vendor +node_modules +Dockerfile +docker-compose.yml +.dockerignore +README.md diff --git a/DEPLOY.md b/DEPLOY.md new file mode 100644 index 0000000..e3a58b6 --- /dev/null +++ b/DEPLOY.md @@ -0,0 +1,59 @@ +# ChatRoom 部署说明 + +服务端口:**2779** + +## 一、本地构建部署 + +```bash +# 构建并启动(含 RabbitMQ) +docker compose up -d --build + +# 或使用部署脚本 +chmod +x deploy.sh +./deploy.sh +``` + +## 二、从镜像仓库部署(生产环境) + +### 1. 构建并推送镜像 + +```bash +# 构建 +docker build -t your-registry/chatroom:latest . + +# 推送(示例:Docker Hub / 阿里云 ACR) +docker push your-registry/chatroom:latest +``` + +### 2. 服务器拉取并部署 + +```bash +./deploy.sh your-registry/chatroom:latest +``` + +## 三、自动更新部署 + +### 方式 A:Cron 定时拉取(推荐) + +在服务器上添加 crontab,每 5 分钟检查并更新: + +```bash +crontab -e +# 添加(替换为你的镜像地址) +*/5 * * * * cd /path/to/ChatRoom && CHATROOM_IMAGE=your-registry/chatroom:latest docker compose pull app && docker compose up -d --no-build +``` + +### 方式 B:Watchtower 自动监控 + +```bash +# 先设置环境变量 +export CHATROOM_IMAGE=your-registry/chatroom:latest + +# 启动应用 + Watchtower(每 5 分钟检查镜像更新) +docker compose -f docker-compose.yml -f docker-compose.watchtower.yml --profile watchtower up -d +``` + +## 四、访问地址 + +- 前端页面:http://服务器IP:2779 +- WebSocket:ws://服务器IP:2779/ws diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..72d59e4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,37 @@ +# 第一阶段:构建 +FROM golang:1.23-alpine AS builder + +# 设置工作目录 +WORKDIR /app + +# 安装必要的构建工具 +RUN apk add --no-cache gcc musl-dev + +# 复制依赖文件并下载 +COPY go.mod go.sum ./ +RUN go mod download + +# 复制源代码 +COPY . . + +# 编译应用 +# CGO_ENABLED=0 用于生成静态二进制文件,适合在 alpine 运行 +RUN CGO_ENABLED=0 GOOS=linux go build -o server ./cmd/server/main.go + +# 第二阶段:运行 +FROM alpine:latest + +WORKDIR /app + +# 从构建阶段复制二进制文件 +COPY --from=builder /app/server . + +# 复制静态资源文件 +COPY --from=builder /app/web ./web + +# 暴露端口(默认 2779) +EXPOSE 2779 + +# 运行应用(通过 PORT 环境变量可覆盖) +ENV PORT=2779 +CMD ["./server"] diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..a27fa49 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "ChatRoom/internal/rabbitmq" + "ChatRoom/internal/ws" + "log" + "net/http" + "os" + "path/filepath" +) + +func main() { + // 1. 初始化 RabbitMQ + rabbitmqURL := os.Getenv("RABBITMQ_URL") + if rabbitmqURL == "" { + rabbitmqURL = "amqp://guest:guest@localhost:5672/" + } + rmq, err := rabbitmq.NewClient(rabbitmqURL) + if err != nil { + log.Fatalf("RabbitMQ 连接失败: %v", err) + } + defer rmq.Close() + + // 2. WebSocket 路由 + http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + ws.NewConnection(w, r, rmq) // 传入 RabbitMQ 客户端 + }) + + // 3. 静态文件服务(用于测试前端) + webDir := "./web" + if _, err := os.Stat(webDir); os.IsNotExist(err) { + // 从 cmd/server 目录运行时,web 在项目根目录 + webDir = filepath.Join("..", "..", "web") + } + http.Handle("/", http.FileServer(http.Dir(webDir))) + + // 4. 启动服务 + port := os.Getenv("PORT") + if port == "" { + port = "8080" + } + log.Printf("服务启动: http://localhost:%s", port) + log.Fatal(http.ListenAndServe(":"+port, nil)) +} diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..8cf3c3d --- /dev/null +++ b/deploy.sh @@ -0,0 +1,36 @@ +#!/bin/bash +# ChatRoom 自动部署脚本 +# 用法: ./deploy.sh [镜像地址] +# 示例: ./deploy.sh # 本地构建部署 +# ./deploy.sh registry.cn-hangzhou.aliyuncs.com/xxx/chatroom:latest # 从镜像仓库拉取部署 + +set -e + +COMPOSE_FILE="docker-compose.yml" +PORT=2779 + +echo "==========================================" +echo " ChatRoom 部署脚本 - 端口 $PORT" +echo "==========================================" + +if [ -n "$1" ]; then + # 从远程仓库拉取镜像并部署 + export CHATROOM_IMAGE="$1" + echo ">>> 拉取镜像: $CHATROOM_IMAGE" + docker compose -f "$COMPOSE_FILE" pull app + echo ">>> 启动容器..." + docker compose -f "$COMPOSE_FILE" up -d --no-build +else + # 本地构建并部署 + echo ">>> 构建并启动..." + docker compose -f "$COMPOSE_FILE" up -d --build +fi + +echo "" +echo ">>> 部署完成!" +echo " 服务地址: http://localhost:$PORT" +echo " WebSocket: ws://localhost:$PORT/ws" +echo "" +echo ">>> 查看日志: docker compose logs -f app" +echo ">>> 停止服务: docker compose down" +echo "==========================================" diff --git a/docker-compose.watchtower.yml b/docker-compose.watchtower.yml new file mode 100644 index 0000000..6bdbace --- /dev/null +++ b/docker-compose.watchtower.yml @@ -0,0 +1,21 @@ +# 配合 Watchtower 实现自动拉取镜像并重新部署 +# 用法: docker compose -f docker-compose.yml -f docker-compose.watchtower.yml up -d +# +# 推送新镜像后,Watchtower 会检测到更新并自动重启 chatroom-app 容器 + +services: + watchtower: + image: containrrr/watchtower + container_name: chatroom-watchtower + volumes: + - /var/run/docker.sock:/var/run/docker.sock + environment: + # 每 5 分钟检查一次镜像更新 + - WATCHTOWER_POLL_INTERVAL=300 + # 只监控带 com.centurylinklabs.watchtower.enable=true 标签的容器 + - WATCHTOWER_LABEL_ENABLE=true + # 清理旧镜像 + - WATCHTOWER_CLEANUP=true + # 仅当使用远程镜像时启用,本地构建请注释掉此服务 + profiles: + - watchtower diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..eb3ad3f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,28 @@ +services: + app: + build: . + # 生产环境可设置 CHATROOM_IMAGE 从镜像仓库拉取,如: registry.cn-hangzhou.aliyuncs.com/xxx/chatroom:latest + image: ${CHATROOM_IMAGE:-chatroom:latest} + container_name: chatroom-app + labels: + - "com.centurylinklabs.watchtower.enable=true" # Watchtower 监控此容器 + ports: + - "2779:2779" + environment: + - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/ + - PORT=2779 + depends_on: + rabbitmq: + condition: service_healthy + + rabbitmq: + image: rabbitmq:3-management + container_name: chatroom-rabbitmq + ports: + - "5672:5672" + - "15672:15672" + healthcheck: + test: ["CMD", "rabbitmq-diagnostics", "-q", "check_running"] + interval: 5s + timeout: 15s + retries: 5 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0c5035b --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module ChatRoom + +go 1.24.2 + +require ( + github.com/gorilla/websocket v1.5.3 + github.com/rabbitmq/amqp091-go v1.10.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0b2bb22 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/internal/models/message.go b/internal/models/message.go new file mode 100644 index 0000000..61f9547 --- /dev/null +++ b/internal/models/message.go @@ -0,0 +1,58 @@ +package models + +import ( + "time" +) + +// Message 核心变更:增加 Room 字段,移除歧义字段 +type Message struct { + Type string `json:"type"` // 消息类型(严格按常量) + User string `json:"user"` // 发送者(客户端填,服务端可校验) + Content string `json:"content"` // 消息内容 + Time string `json:"time"` // 服务端统一覆盖为 RFC3339(防客户端篡改) + To string `json:"to,omitempty"` // 私聊目标(仅 type=private 时有效) + Room string `json:"room,omitempty"` // 房间ID(仅 type=room 时有效) +} + +// 消息类型常量 +const ( + MsgTypeLogin = "login" // 服务端生成:用户上线事件 + MsgTypeLogout = "logout" // 服务端生成:用户下线事件 + MsgTypeBroadcast = "broadcast" // 全体用户广播(无视房间) + MsgTypeRoom = "room" // 房间消息(必须带 Room 字段) + MsgTypePrivate = "private" // 私聊(必须带 To 字段) + MsgTypeSystem = "system" // 服务端生成:系统通知 + MsgTypeError = "error" // 服务端生成:定向错误提示 +) + +// 发送广播消息 +func SendBroadcastMessage(user, content string) *Message { + return &Message{ + Type: MsgTypeBroadcast, + User: user, + Content: content, + Time: time.Now().UTC().Format(time.RFC3339), + } +} + +// 发送房间消息 +func SendRoomMessage(user, roomID, content string) *Message { + return &Message{ + Type: MsgTypeRoom, + User: user, + Room: roomID, // 显式绑定房间 + Content: content, + Time: time.Now().UTC().Format(time.RFC3339), + } +} + +// 发送私聊消息 +func SendPrivateMessage(sender, recipient, content string) *Message { + return &Message{ + Type: MsgTypePrivate, + User: sender, + To: recipient, // 显式指定接收者 + Content: content, + Time: time.Now().UTC().Format(time.RFC3339), + } +} diff --git a/internal/rabbitmq/client.go b/internal/rabbitmq/client.go new file mode 100644 index 0000000..8956b2f --- /dev/null +++ b/internal/rabbitmq/client.go @@ -0,0 +1,115 @@ +package rabbitmq + +import ( + "fmt" + + "github.com/rabbitmq/amqp091-go" +) + +// Client 封装 RabbitMQ 连接和操作 +type Client struct { + conn *amqp091.Connection // 连接 + channel *amqp091.Channel // 通道 + ExchangeName string // 交换机名称 +} + +// Publish 向交换机发布消息 +func (c *Client) Publish(exchange, routingKey string, body []byte) error { + return c.channel.Publish( + exchange, + routingKey, + false, // mandatory + false, // immediate + amqp091.Publishing{ + ContentType: "application/json", + Body: body, + }, + ) +} + +// NewClient 创建 RabbitMQ 客户端 +func NewClient(uri string) (*Client, error) { + conn, err := amqp091.Dial(uri) + if err != nil { + return nil, err + } + + // 交换机名称 + exchangeName := "chat.exchange" + ch, err := conn.Channel() + if err != nil { + conn.Close() + return nil, err + } + // 声明交换机 + err = ch.ExchangeDeclare( + exchangeName, + "topic", + true, + false, + false, + false, + nil, + ) + if err != nil { + fmt.Println("ExchangeDeclare error:", err) + conn.Close() + return nil, err + } + return &Client{ + conn: conn, + channel: ch, + ExchangeName: exchangeName, // 记录名称 + }, nil +} + +// Close 关闭连接 +func (c *Client) Close() { + c.channel.Close() + c.conn.Close() +} + +// DeclareQueue 创建唯一队列(自动删除) +func (c *Client) DeclareQueue() (string, error) { + // 队列名格式: chat.queue.{timestamp}.{random} + queue, err := c.channel.QueueDeclare( + "", // 队列名 空 → 自动生成唯一名称 + false, // 持久化 + true, // 自动删除 + true, // 独占 + false, // 是否等待确认 + nil, // 参数 + ) + if err != nil { + return fmt.Sprintf("error: %v", err), err + } + return queue.Name, nil +} + +// BindQueue 绑定队列到交换机 +func (c *Client) BindQueue(queueName, routingKey string) error { + err := c.channel.QueueBind( + queueName, + routingKey, + c.ExchangeName, + false, + nil, + ) + if err != nil { + return fmt.Errorf("error: %v", err) + } + return nil +} + +// ConsumeMessages 消费消息 +func (c *Client) Consume(queueName string) (<-chan amqp091.Delivery, error) { + return c.channel.Consume( + queueName, // Queue + "", // Consumer tag (自动生成) + false, // Auto-ack: 手动 ACK(关键!) + false, // 独占 + false, // 不本地 + false, // 不等待 + nil, // 参数 + ) +} diff --git a/internal/ws/connection.go b/internal/ws/connection.go new file mode 100644 index 0000000..b8e9acf --- /dev/null +++ b/internal/ws/connection.go @@ -0,0 +1,233 @@ +package ws + +import ( + "ChatRoom/internal/models" + "ChatRoom/internal/rabbitmq" + "encoding/json" + "log" + "net/http" + "time" + + "github.com/gorilla/websocket" +) + +// 常量 +const ( + writeWait = 10 * time.Second //写入等待时间 + pongWait = 60 * time.Second //pong等待时间 + pingPeriod = (pongWait * 9) / 10 //ping周期 + maxMessageSize = 512 //最大消息大小 +) + +// 升级器 +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, //读取缓冲区大小 + WriteBufferSize: 1024, //写入缓冲区大小 + CheckOrigin: func(r *http.Request) bool { + return true // 允许跨域(开发阶段)允许所有来源 + }, +} + +// 连接 +type Connection struct { + wsConn *websocket.Conn //websocket连接 + rmqClient *rabbitmq.Client //rabbitmq客户端 + queueName string //队列名称 + userID string //用户ID + send chan []byte //发送通道 +} + +func NewConnection(w http.ResponseWriter, r *http.Request, rmq *rabbitmq.Client) { + // 1. 升级 HTTP 到 WebSocket + wsConn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("WebSocket 升级失败: %v", err) + return + } + // 2. 从 URL 获取用户ID + userID := r.URL.Query().Get("user") + if userID == "" { + wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(4001, "user required")) + wsConn.Close() + return + } + // 3. 为用户创建 RabbitMQ 队列 + queueName, err := rmq.DeclareQueue() + if err != nil { + log.Printf("RabbitMQ 队列创建失败: %v", err) + wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(5000, "queue error")) + wsConn.Close() + return + } + // 4. 绑定基础路由(全局/私聊/事件) + bindings := []string{ + "chat.global", // 全体广播 + "chat.user." + userID, // 私聊 + "chat.event.*", // 上下线事件 + "chat.system", // 系统通知 + } + for _, rk := range bindings { + if err := rmq.BindQueue(queueName, rk); err != nil { + log.Printf("绑定失败 [%s]: %v", rk, err) + // 可选择继续或关闭连接 + } + } + + // 5. 创建 Connection 对象 + conn := &Connection{ + wsConn: wsConn, + rmqClient: rmq, + queueName: queueName, + userID: userID, + send: make(chan []byte, 256), + } + + // 发送登录成功消息 + systemMsg := &models.Message{ + Type: models.MsgTypeSystem, + User: "system", + Content: "登录成功", + Time: time.Now().UTC().Format(time.RFC3339), // 服务端覆盖 Time + } + if data, _ := json.Marshal(systemMsg); len(data) > 0 { + conn.send <- data + } + + go conn.writePump() + go conn.readPump() + go conn.consumeFromRabbitMQ() +} + +func (c *Connection) readPump() { + defer c.wsConn.Close() + + c.wsConn.SetReadLimit(maxMessageSize) + c.wsConn.SetReadDeadline(time.Now().Add(pongWait)) + c.wsConn.SetPongHandler(func(string) error { + c.wsConn.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + _, rawMsg, err := c.wsConn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) { + log.Printf("WebSocket 读取错误: %v", err) + } + break + } + + var msg models.Message + if err := json.Unmarshal(rawMsg, &msg); err != nil { + c.sendError("无效消息格式") + continue + } + + // 校验必填字段:content 必填,user 由服务端覆盖(防伪造) + if msg.Content == "" { + c.sendError("缺少 content 字段") + continue + } + + msg.User = c.userID + msg.Time = time.Now().UTC().Format(time.RFC3339) // 关键!服务端统一时间 + + // 生成路由键 + routingKey := c.getRoutingKey(&msg) + if routingKey == "" { + c.sendError("不支持的消息类型或缺少必要字段") + continue + } + + // 序列化并发布 + body, _ := json.Marshal(msg) + if err := c.rmqClient.Publish(c.rmqClient.ExchangeName, routingKey, body); err != nil { + log.Printf("RabbitMQ 发布失败 [%s]: %v", routingKey, err) + c.sendError("消息发送失败") + continue + } + } +} + +func (c *Connection) getRoutingKey(msg *models.Message) string { + switch msg.Type { + case models.MsgTypeBroadcast: + return "chat.global" + case models.MsgTypeRoom: + if msg.Room == "" { + return "" + } + return "chat.room." + msg.Room + case models.MsgTypePrivate: + if msg.To == "" { + return "" + } + return "chat.user." + msg.To + default: + return "" + } +} + +func (c *Connection) sendError(content string) { + errMsg := &models.Message{ + Type: models.MsgTypeError, + User: "system", + Content: content, + Time: time.Now().UTC().Format(time.RFC3339), // 服务端覆盖 Time + } + if data, _ := json.Marshal(errMsg); len(data) > 0 { + select { + case c.send <- data: + default: + } + } +} + +func (c *Connection) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.wsConn.Close() + }() + + for { + select { + case message, ok := <-c.send: + c.wsConn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + c.wsConn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + w, err := c.wsConn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.wsConn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.wsConn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +func (c *Connection) consumeFromRabbitMQ() { + deliveries, err := c.rmqClient.Consume(c.queueName) + if err != nil { + log.Printf("RabbitMQ 消费启动失败: %v", err) + return + } + + for delivery := range deliveries { + select { + case c.send <- delivery.Body: // 消息体已是 JSON(含服务端设置的 Time) + delivery.Ack(false) + default: + delivery.Nack(false, true) + } + } +} diff --git a/web/index.html b/web/index.html new file mode 100644 index 0000000..da16dea --- /dev/null +++ b/web/index.html @@ -0,0 +1,106 @@ + + + + Chat Test + + + +

多人聊天测试

+ +
+ + +
+ +
+ +
+ + +
+ +
+ + + +
+ +
+ + + +
+ + + + \ No newline at end of file