|
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- package consume
-
- import (
- "applet/app/utils"
- "applet/app/utils/logx"
- "bytes"
- "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
- "encoding/json"
- jsoniter "github.com/json-iterator/go"
- "github.com/streadway/amqp"
- "log"
- "strings"
- )
-
- var Json = jsoniter.ConfigCompatibleWithStandardLibrary
-
- type Message struct {
- MessageType string `json:"message_type"`
- Data int `json:"data"`
- }
-
- func WorkReceive(name string) {
- ch, err := rabbit.Cfg.Pool.GetChannel()
- if err != nil {
- logx.Error(err)
- return
- }
- defer ch.Release()
- //接收消息时,指定
- msgs := ch.Consume(name, false)
- for msg := range msgs {
- var message2 Message
- jsonStr := string(msg.Body)
- jsonStr = strings.Trim(jsonStr, "\"")
- jsonStr = strings.ReplaceAll(jsonStr, "\\", "")
- utils.Unserialize([]byte(jsonStr), &message2)
- switch message2.MessageType {
- case "test":
- go func(msg *amqp.Delivery) {
- log.Printf("recevie1 Received a message: %s", msg.Body)
- msg.Ack(true)
- }(&msg)
- }
-
- }
- }
-
- func TestWorkSend() {
- // 推入rabbitMq
- ch, err := rabbit.Cfg.Pool.GetChannel()
- if err != nil {
- logx.Error(err)
- }
- defer ch.Release()
- var message struct {
- MessageType string `json:"message_type"`
- Data int `json:"data"`
- }
- message.MessageType = "test"
- message.Data = 1
- for message.Data < 2 {
- ch.Publish("test_work_queue_processor", utils.SerializeStr(message), "")
- message.Data += 1
- //time.Sleep(time.Second * 5)
- }
- }
-
- // 去除json中的转义字符
- func disableEscapeHtml(data interface{}) (string, error) {
- bf := bytes.NewBuffer([]byte{})
- jsonEncoder := json.NewEncoder(bf)
- jsonEncoder.SetEscapeHTML(true)
- if err := jsonEncoder.Encode(data); err != nil {
- return "", err
- }
- return bf.String(), nil
- }
|