116 lines
2.3 KiB
Go
116 lines
2.3 KiB
Go
package rabbitmq
|
||
|
||
import (
|
||
"fmt"
|
||
|
||
"github.com/rabbitmq/amqp091-go"
|
||
)
|
||
|
||
// Client 封装 RabbitMQ 连接和操作
|
||
type Client struct {
|
||
conn *amqp091.Connection // 连接
|
||
channel *amqp091.Channel // 通道
|
||
ExchangeName string // 交换机名称
|
||
}
|
||
|
||
// Publish 向交换机发布消息
|
||
func (c *Client) Publish(exchange, routingKey string, body []byte) error {
|
||
return c.channel.Publish(
|
||
exchange,
|
||
routingKey,
|
||
false, // mandatory
|
||
false, // immediate
|
||
amqp091.Publishing{
|
||
ContentType: "application/json",
|
||
Body: body,
|
||
},
|
||
)
|
||
}
|
||
|
||
// NewClient 创建 RabbitMQ 客户端
|
||
func NewClient(uri string) (*Client, error) {
|
||
conn, err := amqp091.Dial(uri)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 交换机名称
|
||
exchangeName := "chat.exchange"
|
||
ch, err := conn.Channel()
|
||
if err != nil {
|
||
conn.Close()
|
||
return nil, err
|
||
}
|
||
// 声明交换机
|
||
err = ch.ExchangeDeclare(
|
||
exchangeName,
|
||
"topic",
|
||
true,
|
||
false,
|
||
false,
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
fmt.Println("ExchangeDeclare error:", err)
|
||
conn.Close()
|
||
return nil, err
|
||
}
|
||
return &Client{
|
||
conn: conn,
|
||
channel: ch,
|
||
ExchangeName: exchangeName, // 记录名称
|
||
}, nil
|
||
}
|
||
|
||
// Close 关闭连接
|
||
func (c *Client) Close() {
|
||
c.channel.Close()
|
||
c.conn.Close()
|
||
}
|
||
|
||
// DeclareQueue 创建唯一队列(自动删除)
|
||
func (c *Client) DeclareQueue() (string, error) {
|
||
// 队列名格式: chat.queue.{timestamp}.{random}
|
||
queue, err := c.channel.QueueDeclare(
|
||
"", // 队列名 空 → 自动生成唯一名称
|
||
false, // 持久化
|
||
true, // 自动删除
|
||
true, // 独占
|
||
false, // 是否等待确认
|
||
nil, // 参数
|
||
)
|
||
if err != nil {
|
||
return fmt.Sprintf("error: %v", err), err
|
||
}
|
||
return queue.Name, nil
|
||
}
|
||
|
||
// BindQueue 绑定队列到交换机
|
||
func (c *Client) BindQueue(queueName, routingKey string) error {
|
||
err := c.channel.QueueBind(
|
||
queueName,
|
||
routingKey,
|
||
c.ExchangeName,
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("error: %v", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ConsumeMessages 消费消息
|
||
func (c *Client) Consume(queueName string) (<-chan amqp091.Delivery, error) {
|
||
return c.channel.Consume(
|
||
queueName, // Queue
|
||
"", // Consumer tag (自动生成)
|
||
false, // Auto-ack: 手动 ACK(关键!)
|
||
false, // 独占
|
||
false, // 不本地
|
||
false, // 不等待
|
||
nil, // 参数
|
||
)
|
||
}
|