rabbitmq 操作库
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

446 行
10 KiB

  1. package rabbit
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/streadway/amqp"
  7. "log"
  8. "os"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const (
  14. defaultLogPrefix = "[rabbit-pool]"
  15. )
  16. var (
  17. ErrInvalidConfig = errors.New("invalid pool config\n")
  18. ErrFailedConnection = errors.New("failed to establish connection\n")
  19. ErrConnectionMaximum = errors.New("the number of connections exceeds the maximum\n")
  20. ErrChannelMaximum = errors.New("the number of channels exceeds the maximum\n")
  21. ErrGetChannelTimeOut = errors.New("get channel timeout\n")
  22. )
  23. type LoggerInter interface {
  24. Print(v ...interface{})
  25. }
  26. type Config struct {
  27. Host string // MQ的地址
  28. MinConn int // 最少建立的连接数
  29. MaxConn int // 最大建立的连接数
  30. MaxChannelPerConn int // 每个连接最多建立的信道数量
  31. MaxLifetime time.Duration
  32. }
  33. // 连接池
  34. type Pool struct {
  35. mu *sync.Mutex
  36. conf *Config
  37. logger LoggerInter
  38. connectionNum int32
  39. connections map[int64]*Connection
  40. connectionSerialNumber int64
  41. idleChannels chan *Channel
  42. }
  43. func NewPool(conf *Config, logger ...LoggerInter) (*Pool, error) {
  44. if conf.MaxConn <= 0 || conf.MinConn > conf.MaxConn {
  45. return nil, ErrInvalidConfig
  46. }
  47. p := &Pool{
  48. mu: new(sync.Mutex),
  49. connections: make(map[int64]*Connection),
  50. idleChannels: make(chan *Channel, conf.MaxConn*conf.MaxChannelPerConn),
  51. }
  52. if conf.MaxLifetime == 0 {
  53. conf.MaxLifetime = time.Duration(3600)
  54. }
  55. if len(logger) > 0 {
  56. p.SetLogger(logger[0])
  57. } else {
  58. p.SetLogger(log.New(os.Stdout, defaultLogPrefix, log.LstdFlags))
  59. }
  60. p.conf = conf
  61. var conn *Connection
  62. var err error
  63. // 建立最少连接数
  64. for i := 0; i < conf.MinConn; i++ {
  65. conn, err = p.NewConnection()
  66. if err != nil {
  67. p.GetLogger().Print(ErrFailedConnection.Error())
  68. return nil, ErrFailedConnection
  69. }
  70. p.connections[conn.connIdentity] = conn
  71. }
  72. return p, nil
  73. }
  74. func (p *Pool) SetConfig(conf *Config) *Pool {
  75. p.conf = conf
  76. return p
  77. }
  78. func (p *Pool) GetConfig() *Config {
  79. return p.conf
  80. }
  81. func (p *Pool) SetLogger(logger LoggerInter) *Pool {
  82. p.logger = logger
  83. return p
  84. }
  85. func (p *Pool) GetLogger() LoggerInter {
  86. return p.logger
  87. }
  88. func (p *Pool) NewConnection() (*Connection, error) {
  89. // 判断连接是否达到最大值
  90. if atomic.AddInt32(&p.connectionNum, 1) > int32(p.conf.MaxConn) {
  91. atomic.AddInt32(&p.connectionNum, -1)
  92. return nil, ErrConnectionMaximum
  93. }
  94. conn, err := amqp.Dial(p.conf.Host)
  95. if err != nil {
  96. atomic.AddInt32(&p.connectionNum, -1)
  97. return nil, err
  98. }
  99. return &Connection{
  100. mu: new(sync.Mutex),
  101. conn: conn,
  102. pool: p,
  103. channelNum: 0,
  104. expireTime: time.Duration(time.Now().Unix()) + p.conf.MaxLifetime,
  105. connIdentity: atomic.AddInt64(&p.connectionSerialNumber, 1),
  106. }, nil
  107. }
  108. func (p *Pool) CloseConnection(c *Connection) error {
  109. p.mu.Lock()
  110. defer p.mu.Unlock()
  111. atomic.AddInt32(&p.connectionNum, -1)
  112. delete(p.connections, c.connIdentity)
  113. return c.conn.Close()
  114. }
  115. func (p *Pool) GetChannel() (*Channel, error) {
  116. ch, _ := p.getOrCreate()
  117. if ch != nil {
  118. return ch, nil
  119. }
  120. C := time.After(time.Second * 10)
  121. for {
  122. ch, _ := p.getOrCreate()
  123. if ch != nil {
  124. return ch, nil
  125. }
  126. select {
  127. case <-C:
  128. p.GetLogger().Print(ErrGetChannelTimeOut.Error())
  129. return nil, ErrGetChannelTimeOut
  130. default:
  131. }
  132. }
  133. }
  134. func (p *Pool) getOrCreate() (*Channel, error) {
  135. // 池中是否有空闲channel
  136. var (
  137. ch *Channel
  138. err error
  139. )
  140. select {
  141. case ch = <-p.idleChannels:
  142. return ch, nil
  143. default:
  144. }
  145. p.mu.Lock()
  146. defer p.mu.Unlock()
  147. // 池中已有连接是否可以建立新的channel
  148. for _, conn := range p.connections {
  149. if conn.CheckExpire() {
  150. continue
  151. }
  152. ch, err = conn.NewChannel()
  153. if ch != nil {
  154. return ch, nil
  155. }
  156. }
  157. // 新建连接获取新的channel
  158. var conn *Connection
  159. conn, err = p.NewConnection()
  160. if err != nil {
  161. return nil, err
  162. }
  163. p.connections[conn.connIdentity] = conn
  164. ch, err = conn.NewChannel()
  165. if err != nil {
  166. return nil, err
  167. }
  168. return ch, nil
  169. }
  170. func (p *Pool) ReleaseChannel(ch *Channel) error {
  171. p.idleChannels <- ch
  172. return nil
  173. }
  174. type Connection struct {
  175. mu *sync.Mutex
  176. conn *amqp.Connection
  177. pool *Pool
  178. expireTime time.Duration
  179. isExpire bool
  180. connIdentity int64 // 连接标记
  181. channelNum int32 // 该连接的信道数量
  182. channelSerialNumber int64 // 第几个channel
  183. }
  184. func (c *Connection) NewChannel() (*Channel, error) {
  185. c.mu.Lock()
  186. defer c.mu.Unlock()
  187. if atomic.AddInt32(&c.channelNum, 1) > int32(c.pool.conf.MaxChannelPerConn) {
  188. atomic.AddInt32(&c.channelNum, -1)
  189. return nil, ErrChannelMaximum
  190. }
  191. ch, err := c.conn.Channel()
  192. if err != nil {
  193. atomic.AddInt32(&c.channelNum, -1)
  194. return nil, err
  195. }
  196. return &Channel{
  197. Channel: ch,
  198. conn: c,
  199. chanIdentity: atomic.AddInt64(&c.channelSerialNumber, 1),
  200. }, nil
  201. }
  202. func (c *Connection) ReleaseChannel(ch *Channel) error {
  203. if c.CheckExpire() {
  204. return c.CloseChannel(ch)
  205. }
  206. return c.pool.ReleaseChannel(ch)
  207. }
  208. func (c *Connection) CloseChannel(ch *Channel) error {
  209. c.mu.Lock()
  210. defer c.mu.Unlock()
  211. atomic.AddInt32(&c.channelNum, -1)
  212. var err = ch.Channel.Close()
  213. if atomic.LoadInt32(&c.channelNum) <= 0 && c.CheckExpire() {
  214. return c.pool.CloseConnection(c)
  215. }
  216. return err
  217. }
  218. // 检查是否过期
  219. func (c *Connection) CheckExpire() bool {
  220. if c.isExpire {
  221. return true
  222. }
  223. if time.Duration(time.Now().Unix()) > c.expireTime {
  224. c.isExpire = true
  225. }
  226. return c.isExpire
  227. }
  228. /************************************************************************************************************/
  229. type Channel struct {
  230. *amqp.Channel
  231. conn *Connection
  232. chanIdentity int64 // 该连接的第几个channel
  233. Name string
  234. exchange string
  235. }
  236. func (ch *Channel) Release() error {
  237. return ch.conn.ReleaseChannel(ch)
  238. }
  239. func (ch *Channel) Close() error {
  240. return ch.conn.CloseChannel(ch)
  241. }
  242. // QueueDeclare 声明交换机
  243. func (ch *Channel) QueueDeclare(queue string) {
  244. _, e := ch.Channel.QueueDeclare(queue, false, true, false, false, nil)
  245. failOnError(e, "声明交换机!")
  246. }
  247. // QueueDelete 删除交换机
  248. func (ch *Channel) QueueDelete(queue string) {
  249. _, e := ch.Channel.QueueDelete(queue, false, true, false)
  250. failOnError(e, "删除队列失败!")
  251. }
  252. //初始化队列
  253. func (ch *Channel) NewQueue(name string, args ...bool) {
  254. var durable, autoDelete, exclusive, noWait = true, false, false, false
  255. if len(args) > 0 {
  256. durable = args[0]
  257. }
  258. if len(args) > 1 {
  259. autoDelete = args[1]
  260. }
  261. if len(args) > 2 {
  262. exclusive = args[2]
  263. }
  264. if len(args) > 3 {
  265. noWait = args[3]
  266. }
  267. q, e := ch.Channel.QueueDeclare(
  268. name, //队列名
  269. durable, //是否开启持久化
  270. autoDelete, //不使用时删除
  271. exclusive, //排他
  272. noWait, //不等待
  273. nil, //参数
  274. )
  275. failOnError(e, "初始化队列失败!")
  276. ch.Name = q.Name
  277. }
  278. // NewExchange 初始化交换机
  279. //s:rabbitmq服务器的链接,name:交换机名字,typename:交换机类型
  280. func (ch *Channel) NewExchange(name string, typename string, args ...bool) {
  281. var durable, autoDelete, internal, noWait = true, false, false, false
  282. if len(args) > 0 {
  283. durable = args[0]
  284. }
  285. if len(args) > 1 {
  286. autoDelete = args[1]
  287. }
  288. if len(args) > 2 {
  289. internal = args[2]
  290. }
  291. if len(args) > 3 {
  292. noWait = args[3]
  293. }
  294. e := ch.ExchangeDeclare(
  295. name, // name
  296. typename, // type
  297. durable, // durable
  298. autoDelete, // auto-deleted
  299. internal, // 是否只在rabbitmq server内部使用
  300. noWait, // no-wait
  301. nil, // arguments
  302. )
  303. failOnError(e, "初始化交换机失败!")
  304. }
  305. // ExchangeDelete 删除交换机
  306. func (ch *Channel) ExchangeDelete(exchange string) {
  307. e := ch.Channel.ExchangeDelete(exchange, false, true)
  308. failOnError(e, "绑定队列失败!")
  309. }
  310. // Bind 绑定消息队列到哪个exchange
  311. func (ch *Channel) Bind(queueName string, exchange string, key string) {
  312. e := ch.Channel.QueueBind(
  313. queueName,
  314. key,
  315. exchange,
  316. false,
  317. nil,
  318. )
  319. failOnError(e, "绑定队列失败!")
  320. ch.exchange = exchange
  321. }
  322. // Qos 配置队列参数
  323. func (ch *Channel) Qos(prefetchCount int) {
  324. e := ch.Channel.Qos(prefetchCount, 0, false)
  325. failOnError(e, "无法设置QoS")
  326. }
  327. //Send 向消息队列发送消息
  328. //可以往某个消息队列发送消息
  329. func (ch *Channel) Send(queue string, body interface{}) {
  330. str, e := json.Marshal(body)
  331. failOnError(e, "消息序列化失败!")
  332. e = ch.Channel.Publish(
  333. "", //交换机
  334. queue, //路由键
  335. false, //必填
  336. false, //立即
  337. amqp.Publishing{
  338. ReplyTo: ch.Name,
  339. Body: []byte(str),
  340. })
  341. msg := "向队列:" + ch.Name + "发送消息失败!"
  342. failOnError(e, msg)
  343. }
  344. // Publish 向exchange发送消息
  345. // 可以往某个exchange发送消息
  346. func (ch *Channel) Publish(exchange string, body interface{}, key string) {
  347. str, e := json.Marshal(body)
  348. failOnError(e, "消息序列化失败!")
  349. e = ch.Channel.Publish(
  350. exchange,
  351. key,
  352. false,
  353. false,
  354. amqp.Publishing{
  355. ContentType: "text/plain",
  356. DeliveryMode: amqp.Transient,
  357. Priority: 0,
  358. Body: []byte(str)},
  359. )
  360. failOnError(e, "向路由发送消息失败!")
  361. }
  362. // PublishV2 向exchange发送消息
  363. // 可以往某个exchange发送消息
  364. func (ch *Channel) PublishV2(exchange string, body interface{}, key string) error {
  365. str, e := json.Marshal(body)
  366. if e != nil {
  367. return e
  368. }
  369. e = ch.Channel.Publish(
  370. exchange,
  371. key,
  372. false,
  373. false,
  374. amqp.Publishing{
  375. ContentType: "text/plain",
  376. DeliveryMode: amqp.Transient,
  377. Priority: 0,
  378. Body: []byte(str)},
  379. )
  380. return e
  381. }
  382. // 接收某个消息队列的消息
  383. func (ch *Channel) Consume(name string, autoAck bool) <-chan amqp.Delivery {
  384. c, e := ch.Channel.Consume(
  385. name, // 指定从哪个队列中接收消息
  386. "", // 用来区分多个消费者
  387. autoAck, // 是否自动应答
  388. false, // 是否独有
  389. false, // 如果设置为true,表示不能将同一个connection中发送的消息传递给这个connection中的消费者
  390. false, // 列是否阻塞
  391. nil,
  392. )
  393. failOnError(e, "接收消息失败!")
  394. return c
  395. }
  396. //错误处理函数
  397. func failOnError(err error, msg string) {
  398. if err != nil {
  399. log.Fatalf("%s: %s", msg, err)
  400. panic(fmt.Sprintf("%s:%s", msg, err))
  401. }
  402. }