golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2 年之前
1 年之前
2 年之前
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package consume
  2. import (
  3. "applet/app/utils"
  4. "applet/app/utils/logx"
  5. "bytes"
  6. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  7. "encoding/json"
  8. jsoniter "github.com/json-iterator/go"
  9. "github.com/streadway/amqp"
  10. "log"
  11. "strings"
  12. )
  13. var Json = jsoniter.ConfigCompatibleWithStandardLibrary
  14. type Message struct {
  15. MessageType string `json:"message_type"`
  16. Data int `json:"data"`
  17. }
  18. func WorkReceive(name string) {
  19. ch, err := rabbit.Cfg.Pool.GetChannel()
  20. if err != nil {
  21. logx.Error(err)
  22. return
  23. }
  24. defer ch.Release()
  25. //接收消息时,指定
  26. msgs := ch.Consume(name, false)
  27. for msg := range msgs {
  28. var message2 Message
  29. jsonStr := string(msg.Body)
  30. jsonStr = strings.Trim(jsonStr, "\"")
  31. jsonStr = strings.ReplaceAll(jsonStr, "\\", "")
  32. utils.Unserialize([]byte(jsonStr), &message2)
  33. switch message2.MessageType {
  34. case "test":
  35. go func(msg *amqp.Delivery) {
  36. log.Printf("recevie1 Received a message: %s", msg.Body)
  37. msg.Ack(true)
  38. }(&msg)
  39. }
  40. }
  41. }
  42. func TestWorkSend() {
  43. // 推入rabbitMq
  44. ch, err := rabbit.Cfg.Pool.GetChannel()
  45. if err != nil {
  46. logx.Error(err)
  47. }
  48. defer ch.Release()
  49. var message struct {
  50. MessageType string `json:"message_type"`
  51. Data int `json:"data"`
  52. }
  53. message.MessageType = "test"
  54. message.Data = 1
  55. for message.Data < 2 {
  56. ch.Publish("test_work_queue_processor", utils.SerializeStr(message), "")
  57. message.Data += 1
  58. //time.Sleep(time.Second * 5)
  59. }
  60. }
  61. // 去除json中的转义字符
  62. func disableEscapeHtml(data interface{}) (string, error) {
  63. bf := bytes.NewBuffer([]byte{})
  64. jsonEncoder := json.NewEncoder(bf)
  65. jsonEncoder.SetEscapeHTML(true)
  66. if err := jsonEncoder.Encode(data); err != nil {
  67. return "", err
  68. }
  69. return bf.String(), nil
  70. }