package consume import ( "applet/app/utils" "applet/app/utils/logx" "bytes" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "encoding/json" jsoniter "github.com/json-iterator/go" "github.com/streadway/amqp" "log" "strings" ) var Json = jsoniter.ConfigCompatibleWithStandardLibrary type Message struct { MessageType string `json:"message_type"` Data int `json:"data"` } func WorkReceive(name string) { ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //接收消息时,指定 msgs := ch.Consume(name, false) for msg := range msgs { var message2 Message jsonStr := string(msg.Body) jsonStr = strings.Trim(jsonStr, "\"") jsonStr = strings.ReplaceAll(jsonStr, "\\", "") utils.Unserialize([]byte(jsonStr), &message2) switch message2.MessageType { case "test": go func(msg *amqp.Delivery) { log.Printf("recevie1 Received a message: %s", msg.Body) msg.Ack(true) }(&msg) } } } func TestWorkSend() { // 推入rabbitMq ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) } defer ch.Release() var message struct { MessageType string `json:"message_type"` Data int `json:"data"` } message.MessageType = "test" message.Data = 1 for message.Data < 2 { ch.Publish("test_work_queue_processor", utils.SerializeStr(message), "") message.Data += 1 //time.Sleep(time.Second * 5) } } // 去除json中的转义字符 func disableEscapeHtml(data interface{}) (string, error) { bf := bytes.NewBuffer([]byte{}) jsonEncoder := json.NewEncoder(bf) jsonEncoder.SetEscapeHTML(true) if err := jsonEncoder.Encode(data); err != nil { return "", err } return bf.String(), nil }