Files

116 lines
2.3 KiB
Go
Raw Permalink Normal View History

2026-02-03 23:45:27 +08:00
package rabbitmq
import (
"fmt"
"github.com/rabbitmq/amqp091-go"
)
// Client 封装 RabbitMQ 连接和操作
type Client struct {
conn *amqp091.Connection // 连接
channel *amqp091.Channel // 通道
ExchangeName string // 交换机名称
}
// Publish 向交换机发布消息
func (c *Client) Publish(exchange, routingKey string, body []byte) error {
return c.channel.Publish(
exchange,
routingKey,
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "application/json",
Body: body,
},
)
}
// NewClient 创建 RabbitMQ 客户端
func NewClient(uri string) (*Client, error) {
conn, err := amqp091.Dial(uri)
if err != nil {
return nil, err
}
// 交换机名称
exchangeName := "chat.exchange"
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, err
}
// 声明交换机
err = ch.ExchangeDeclare(
exchangeName,
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
fmt.Println("ExchangeDeclare error:", err)
conn.Close()
return nil, err
}
return &Client{
conn: conn,
channel: ch,
ExchangeName: exchangeName, // 记录名称
}, nil
}
// Close 关闭连接
func (c *Client) Close() {
c.channel.Close()
c.conn.Close()
}
// DeclareQueue 创建唯一队列(自动删除)
func (c *Client) DeclareQueue() (string, error) {
// 队列名格式: chat.queue.{timestamp}.{random}
queue, err := c.channel.QueueDeclare(
"", // 队列名 空 → 自动生成唯一名称
false, // 持久化
true, // 自动删除
true, // 独占
false, // 是否等待确认
nil, // 参数
)
if err != nil {
return fmt.Sprintf("error: %v", err), err
}
return queue.Name, nil
}
// BindQueue 绑定队列到交换机
func (c *Client) BindQueue(queueName, routingKey string) error {
err := c.channel.QueueBind(
queueName,
routingKey,
c.ExchangeName,
false,
nil,
)
if err != nil {
return fmt.Errorf("error: %v", err)
}
return nil
}
// ConsumeMessages 消费消息
func (c *Client) Consume(queueName string) (<-chan amqp091.Delivery, error) {
return c.channel.Consume(
queueName, // Queue
"", // Consumer tag (自动生成)
false, // Auto-ack: 手动 ACK关键
false, // 独占
false, // 不本地
false, // 不等待
nil, // 参数
)
}