rabbitmq 操作库
No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.

pool.go 9.3 KiB

hace 2 años
hace 2 años
hace 2 años
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. package rabbit
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/streadway/amqp"
  7. "log"
  8. "os"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. const (
  14. defaultLogPrefix = "[rabbit-pool]"
  15. )
  16. var (
  17. ErrInvalidConfig = errors.New("invalid pool config\n")
  18. ErrFailedConnection = errors.New("failed to establish connection\n")
  19. ErrConnectionMaximum = errors.New("the number of connections exceeds the maximum\n")
  20. ErrChannelMaximum = errors.New("the number of channels exceeds the maximum\n")
  21. ErrGetChannelTimeOut = errors.New("get channel timeout\n")
  22. )
  23. type LoggerInter interface {
  24. Print(v ...interface{})
  25. }
  26. type Config struct {
  27. Host string // MQ的地址
  28. MinConn int // 最少建立的连接数
  29. MaxConn int // 最大建立的连接数
  30. MaxChannelPerConn int // 每个连接最多建立的信道数量
  31. MaxLifetime time.Duration
  32. }
  33. // 连接池
  34. type Pool struct {
  35. mu *sync.Mutex
  36. conf *Config
  37. logger LoggerInter
  38. connectionNum int32
  39. connections map[int64]*Connection
  40. connectionSerialNumber int64
  41. idleChannels chan *Channel
  42. }
  43. func NewPool(conf *Config, logger ...LoggerInter) (*Pool, error) {
  44. if conf.MaxConn <= 0 || conf.MinConn > conf.MaxConn {
  45. return nil, ErrInvalidConfig
  46. }
  47. p := &Pool{
  48. mu: new(sync.Mutex),
  49. connections: make(map[int64]*Connection),
  50. idleChannels: make(chan *Channel, conf.MaxConn*conf.MaxChannelPerConn),
  51. }
  52. if conf.MaxLifetime == 0 {
  53. conf.MaxLifetime = time.Duration(3600)
  54. }
  55. if len(logger) > 0 {
  56. p.SetLogger(logger[0])
  57. } else {
  58. p.SetLogger(log.New(os.Stdout, defaultLogPrefix, log.LstdFlags))
  59. }
  60. p.conf = conf
  61. var conn *Connection
  62. var err error
  63. // 建立最少连接数
  64. for i := 0; i < conf.MinConn; i++ {
  65. conn, err = p.NewConnection()
  66. if err != nil {
  67. p.GetLogger().Print(ErrFailedConnection.Error())
  68. return nil, ErrFailedConnection
  69. }
  70. p.connections[conn.connIdentity] = conn
  71. }
  72. return p, nil
  73. }
  74. func (p *Pool) SetConfig(conf *Config) *Pool {
  75. p.conf = conf
  76. return p
  77. }
  78. func (p *Pool) GetConfig() *Config {
  79. return p.conf
  80. }
  81. func (p *Pool) SetLogger(logger LoggerInter) *Pool {
  82. p.logger = logger
  83. return p
  84. }
  85. func (p *Pool) GetLogger() LoggerInter {
  86. return p.logger
  87. }
  88. func (p *Pool) NewConnection() (*Connection, error) {
  89. // 判断连接是否达到最大值
  90. if atomic.AddInt32(&p.connectionNum, 1) > int32(p.conf.MaxConn) {
  91. atomic.AddInt32(&p.connectionNum, -1)
  92. return nil, ErrConnectionMaximum
  93. }
  94. conn, err := amqp.Dial(p.conf.Host)
  95. if err != nil {
  96. atomic.AddInt32(&p.connectionNum, -1)
  97. return nil, err
  98. }
  99. return &Connection{
  100. mu: new(sync.Mutex),
  101. conn: conn,
  102. pool: p,
  103. channelNum: 0,
  104. expireTime: time.Duration(time.Now().Unix()) + p.conf.MaxLifetime,
  105. connIdentity: atomic.AddInt64(&p.connectionSerialNumber, 1),
  106. }, nil
  107. }
  108. func (p *Pool) CloseConnection(c *Connection) error {
  109. p.mu.Lock()
  110. defer p.mu.Unlock()
  111. atomic.AddInt32(&p.connectionNum, -1)
  112. delete(p.connections, c.connIdentity)
  113. return c.conn.Close()
  114. }
  115. func (p *Pool) GetChannel() (*Channel, error) {
  116. ch, _ := p.getOrCreate()
  117. if ch != nil {
  118. return ch, nil
  119. }
  120. C := time.After(time.Second * 10)
  121. for {
  122. ch, _ := p.getOrCreate()
  123. if ch != nil {
  124. return ch, nil
  125. }
  126. select {
  127. case <-C:
  128. p.GetLogger().Print(ErrGetChannelTimeOut.Error())
  129. return nil, ErrGetChannelTimeOut
  130. default:
  131. }
  132. }
  133. }
  134. func (p *Pool) getOrCreate() (*Channel, error) {
  135. // 池中是否有空闲channel
  136. var (
  137. ch *Channel
  138. err error
  139. )
  140. select {
  141. case ch = <-p.idleChannels:
  142. return ch, nil
  143. default:
  144. }
  145. p.mu.Lock()
  146. defer p.mu.Unlock()
  147. // 池中已有连接是否可以建立新的channel
  148. for _, conn := range p.connections {
  149. if conn.CheckExpire() {
  150. continue
  151. }
  152. ch, err = conn.NewChannel()
  153. if ch != nil {
  154. return ch, nil
  155. }
  156. }
  157. // 新建连接获取新的channel
  158. var conn *Connection
  159. conn, err = p.NewConnection()
  160. if err != nil {
  161. return nil, err
  162. }
  163. p.connections[conn.connIdentity] = conn
  164. ch, err = conn.NewChannel()
  165. if err != nil {
  166. return nil, err
  167. }
  168. return ch, nil
  169. }
  170. func (p *Pool) ReleaseChannel(ch *Channel) error {
  171. p.idleChannels <- ch
  172. return nil
  173. }
  174. type Connection struct {
  175. mu *sync.Mutex
  176. conn *amqp.Connection
  177. pool *Pool
  178. expireTime time.Duration
  179. isExpire bool
  180. connIdentity int64 // 连接标记
  181. channelNum int32 // 该连接的信道数量
  182. channelSerialNumber int64 // 第几个channel
  183. }
  184. func (c *Connection) NewChannel() (*Channel, error) {
  185. c.mu.Lock()
  186. defer c.mu.Unlock()
  187. if atomic.AddInt32(&c.channelNum, 1) > int32(c.pool.conf.MaxChannelPerConn) {
  188. atomic.AddInt32(&c.channelNum, -1)
  189. return nil, ErrChannelMaximum
  190. }
  191. ch, err := c.conn.Channel()
  192. if err != nil {
  193. atomic.AddInt32(&c.channelNum, -1)
  194. return nil, err
  195. }
  196. return &Channel{
  197. Channel: ch,
  198. conn: c,
  199. chanIdentity: atomic.AddInt64(&c.channelSerialNumber, 1),
  200. }, nil
  201. }
  202. func (c *Connection) ReleaseChannel(ch *Channel) error {
  203. if c.CheckExpire() {
  204. return c.CloseChannel(ch)
  205. }
  206. return c.pool.ReleaseChannel(ch)
  207. }
  208. func (c *Connection) CloseChannel(ch *Channel) error {
  209. c.mu.Lock()
  210. defer c.mu.Unlock()
  211. atomic.AddInt32(&c.channelNum, -1)
  212. var err = ch.Channel.Close()
  213. if atomic.LoadInt32(&c.channelNum) <= 0 && c.CheckExpire() {
  214. return c.pool.CloseConnection(c)
  215. }
  216. return err
  217. }
  218. // 检查是否过期
  219. func (c *Connection) CheckExpire() bool {
  220. if c.isExpire {
  221. return true
  222. }
  223. if time.Duration(time.Now().Unix()) > c.expireTime {
  224. c.isExpire = true
  225. }
  226. return c.isExpire
  227. }
  228. /************************************************************************************************************/
  229. type Channel struct {
  230. *amqp.Channel
  231. conn *Connection
  232. chanIdentity int64 // 该连接的第几个channel
  233. Name string
  234. exchange string
  235. }
  236. func (ch *Channel) Release() error {
  237. return ch.conn.ReleaseChannel(ch)
  238. }
  239. func (ch *Channel) Close() error {
  240. return ch.conn.CloseChannel(ch)
  241. }
  242. // QueueDeclare 声明交换机
  243. func (ch *Channel) QueueDeclare(queue string) {
  244. _, e := ch.Channel.QueueDeclare(queue, false, true, false, false, nil)
  245. failOnError(e, "声明交换机!")
  246. }
  247. // QueueDelete 删除交换机
  248. func (ch *Channel) QueueDelete(queue string) {
  249. _, e := ch.Channel.QueueDelete(queue, false, true, false)
  250. failOnError(e, "删除队列失败!")
  251. }
  252. //初始化队列
  253. func (ch *Channel) NewQueue(name string, typename string) {
  254. q, e := ch.Channel.QueueDeclare(
  255. name, //队列名
  256. false, //是否开启持久化
  257. false, //不使用时删除
  258. false, //排他
  259. false, //不等待
  260. nil, //参数
  261. )
  262. failOnError(e, "初始化队列失败!")
  263. ch.Name = q.Name
  264. }
  265. // NewExchange 初始化交换机
  266. //s:rabbitmq服务器的链接,name:交换机名字,typename:交换机类型
  267. func NewExchange(s string, name string, typename string) {
  268. //连接rabbitmq
  269. conn, e := amqp.Dial(s)
  270. failOnError(e, "连接Rabbitmq服务器失败!")
  271. ch, e := conn.Channel()
  272. failOnError(e, "无法打开频道!")
  273. e = ch.ExchangeDeclare(
  274. name, // name
  275. typename, // type
  276. true, // durable
  277. false, // auto-deleted
  278. false, // internal
  279. false, // no-wait
  280. nil, // arguments
  281. )
  282. failOnError(e, "初始化交换机失败!")
  283. }
  284. // ExchangeDelete 删除交换机
  285. func (ch *Channel) ExchangeDelete(exchange string) {
  286. e := ch.Channel.ExchangeDelete(exchange, false, true)
  287. failOnError(e, "绑定队列失败!")
  288. }
  289. // Bind 绑定消息队列到哪个exchange
  290. func (ch *Channel) Bind(queueName string, exchange string, key string) {
  291. e := ch.Channel.QueueBind(
  292. queueName,
  293. key,
  294. exchange,
  295. false,
  296. nil,
  297. )
  298. failOnError(e, "绑定队列失败!")
  299. ch.exchange = exchange
  300. }
  301. // Qos 配置队列参数
  302. func (ch *Channel) Qos(prefetchCount int) {
  303. e := ch.Channel.Qos(prefetchCount, 0, false)
  304. failOnError(e, "无法设置QoS")
  305. }
  306. //Send 向消息队列发送消息
  307. //可以往某个消息队列发送消息
  308. func (ch *Channel) Send(queue string, body interface{}) {
  309. str, e := json.Marshal(body)
  310. failOnError(e, "消息序列化失败!")
  311. e = ch.Channel.Publish(
  312. "", //交换
  313. queue, //路由键
  314. false, //必填
  315. false, //立即
  316. amqp.Publishing{
  317. ReplyTo: ch.Name,
  318. Body: []byte(str),
  319. })
  320. msg := "向队列:" + ch.Name + "发送消息失败!"
  321. failOnError(e, msg)
  322. }
  323. // Publish 向exchange发送消息
  324. // 可以往某个exchange发送消息
  325. func (ch *Channel) Publish(exchange string, body interface{}, key string) {
  326. str, e := json.Marshal(body)
  327. failOnError(e, "消息序列化失败!")
  328. e = ch.Channel.Publish(
  329. exchange,
  330. key,
  331. false,
  332. false,
  333. amqp.Publishing{
  334. ContentType: "text/plain",
  335. DeliveryMode: amqp.Transient,
  336. Priority: 0,
  337. Body: []byte(str)},
  338. )
  339. failOnError(e, "向路由发送消息失败!")
  340. }
  341. // 接收某个消息队列的消息
  342. func (ch *Channel) Consume(name string) <-chan amqp.Delivery {
  343. c, e := ch.Channel.Consume(
  344. name, // 指定从哪个队列中接收消息
  345. "", // 用来区分多个消费者
  346. false, // 是否自动应答
  347. false, // 是否独有
  348. false, // 如果设置为true,表示不能将同一个connection中发送的消息传递给这个connection中的消费者
  349. false, // 列是否阻塞
  350. nil,
  351. )
  352. failOnError(e, "接收消息失败!")
  353. return c
  354. }
  355. //错误处理函数
  356. func failOnError(err error, msg string) {
  357. if err != nil {
  358. log.Fatalf("%s: %s", msg, err)
  359. panic(fmt.Sprintf("%s:%s", msg, err))
  360. }
  361. }