rabbitmq 操作库

tools.go 4.6 KiB

há 2 anos
há 2 anos
há 2 anos
há 2 anos
há 2 anos
há 2 anos
há 2 anos
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package rabbitmq
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/streadway/amqp"
  6. "log"
  7. )
  8. // RabbitMQ 声明队列类型
  9. type RabbitMQ struct {
  10. conn *amqp.Connection
  11. channel *amqp.Channel
  12. Name string
  13. exchange string
  14. }
  15. // Connect 连接服务器
  16. func Connect(s string) *RabbitMQ {
  17. //连接rabbitmq
  18. conn, e := amqp.Dial(s)
  19. failOnError(e, "连接Rabbitmq服务器失败!")
  20. ch, e := conn.Channel()
  21. failOnError(e, "无法打开频道!")
  22. mq := new(RabbitMQ)
  23. mq.conn = conn
  24. mq.channel = ch
  25. return mq
  26. }
  27. //New 初始化单个消息队列
  28. //s: rabbitmq服务器的链接 name:队列名字
  29. func New(s string, name string) *RabbitMQ {
  30. //连接rabbitmq
  31. conn, e := amqp.Dial(s)
  32. failOnError(e, "连接Rabbitmq服务器失败!")
  33. ch, e := conn.Channel()
  34. failOnError(e, "无法打开频道!")
  35. q, e := ch.QueueDeclare(
  36. name, //队列名
  37. false, //是否开启持久化
  38. false, //不使用时删除
  39. false, //排他
  40. false, //不等待
  41. nil, //参数
  42. )
  43. failOnError(e, "初始化队列失败!")
  44. mq := new(RabbitMQ)
  45. mq.channel = ch
  46. mq.conn = conn
  47. mq.Name = q.Name
  48. return mq
  49. }
  50. // QueueDeclare 声明交换机
  51. func (q *RabbitMQ) QueueDeclare(queue string) {
  52. _, e := q.channel.QueueDeclare(queue, false, false, false, false, nil)
  53. failOnError(e, "声明交换机!")
  54. }
  55. // QueueDelete 删除交换机
  56. func (q *RabbitMQ) QueueDelete(queue string) {
  57. _, e := q.channel.QueueDelete(queue, false, true, false)
  58. failOnError(e, "删除队列失败!")
  59. }
  60. // Qos 配置队列参数
  61. func (q *RabbitMQ) Qos(prefetchCount int) {
  62. e := q.channel.Qos(prefetchCount, 0, false)
  63. failOnError(e, "无法设置QoS")
  64. }
  65. //配置交换机参数
  66. // NewExchange 初始化交换机
  67. //s:rabbitmq服务器的链接,name:交换机名字,typename:交换机类型
  68. func NewExchange(s string, name string, typename string) {
  69. //连接rabbitmq
  70. conn, e := amqp.Dial(s)
  71. failOnError(e, "连接Rabbitmq服务器失败!")
  72. ch, e := conn.Channel()
  73. failOnError(e, "无法打开频道!")
  74. e = ch.ExchangeDeclare(
  75. name, // name
  76. typename, // type
  77. true, // durable
  78. false, // auto-deleted
  79. false, // internal
  80. false, // no-wait
  81. nil, // arguments
  82. )
  83. failOnError(e, "初始化交换机失败!")
  84. }
  85. // ExchangeDelete 删除交换机
  86. func (q *RabbitMQ) ExchangeDelete(exchange string) {
  87. e := q.channel.ExchangeDelete(exchange, false, true)
  88. failOnError(e, "绑定队列失败!")
  89. }
  90. // Bind 绑定消息队列到哪个exchange
  91. func (q *RabbitMQ) Bind(exchange string, key string) {
  92. e := q.channel.QueueBind(
  93. q.Name,
  94. key,
  95. exchange,
  96. false,
  97. nil,
  98. )
  99. failOnError(e, "绑定队列失败!")
  100. q.exchange = exchange
  101. }
  102. //Send 向消息队列发送消息
  103. //可以往某个消息队列发送消息
  104. func (q *RabbitMQ) Send(queue string, body interface{}) {
  105. str, e := json.Marshal(body)
  106. failOnError(e, "消息序列化失败!")
  107. e = q.channel.Publish(
  108. "", //交换
  109. queue, //路由键
  110. false, //必填
  111. false, //立即
  112. amqp.Publishing{
  113. ReplyTo: q.Name,
  114. Body: []byte(str),
  115. })
  116. msg := "向队列:" + q.Name + "发送消息失败!"
  117. failOnError(e, msg)
  118. }
  119. // Publish 向exchange发送消息
  120. // 可以往某个exchange发送消息
  121. func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
  122. str, e := json.Marshal(body)
  123. failOnError(e, "消息序列化失败!")
  124. e = q.channel.Publish(
  125. exchange,
  126. key,
  127. false,
  128. false,
  129. amqp.Publishing{ReplyTo: q.Name,
  130. Body: []byte(str)},
  131. )
  132. failOnError(e, "向路由发送消息失败!")
  133. }
  134. // Consume 接收某个消息队列的消息
  135. func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
  136. c, e := q.channel.Consume(
  137. q.Name, // 指定从哪个队列中接收消息
  138. "", // 用来区分多个消费者
  139. false, // 是否自动应答
  140. false, // 是否独有
  141. false, // 如果设置为true,表示不能将同一个connection中发送的消息传递给这个connection中的消费者
  142. false, // 列是否阻塞
  143. nil,
  144. )
  145. failOnError(e, "接收消息失败!")
  146. return c
  147. }
  148. //拉取某个消息队列的消息
  149. func (q *RabbitMQ) Get() (messages amqp.Delivery, ok bool, err error) {
  150. messages, ok, err = q.channel.Get(q.Name, false)
  151. failOnError(err, "拉取消息失败!")
  152. return
  153. }
  154. // 关闭信道
  155. func (q *RabbitMQ) CloseChannel() {
  156. q.channel.Close()
  157. }
  158. // 关闭连接
  159. func (q *RabbitMQ) CloseConn() {
  160. q.conn.Close()
  161. }
  162. // 销毁(断开channel和connection)
  163. func (q *RabbitMQ) Destroy() {
  164. q.channel.Close() //关闭信道资源
  165. q.conn.Close() //关闭链接资源
  166. }
  167. //错误处理函数
  168. func failOnError(err error, msg string) {
  169. if err != nil {
  170. log.Fatalf("%s: %s", msg, err)
  171. panic(fmt.Sprintf("%s:%s", msg, err))
  172. }
  173. }