package consume import ( "applet/app/db" "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "encoding/json" "errors" "fmt" "github.com/streadway/amqp" "time" ) func ZhiosTaskTotal(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 ch.Qos(1) delivery := ch.Consume(queue.Name, false) var res amqp.Delivery var ok bool for { res, ok = <-delivery if ok == true { //fmt.Println(string(res.Body)) fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") err = handleZhiosTaskTotal(res.Body) //_ = res.Reject(false) if err == nil { _ = res.Ack(true) } } else { panic(errors.New("error getting message")) } } fmt.Println("get msg done") } func handleZhiosTaskTotal(msg []byte) error { //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.ZhiosAcquisition fmt.Println(string(msg)) var tmpString string err := json.Unmarshal(msg, &tmpString) if err != nil { fmt.Println(err.Error()) return err } fmt.Println(tmpString) err = json.Unmarshal([]byte(tmpString), &canalMsg) if err != nil { return err } mid := canalMsg.Mid eg := db.DBs[mid] if eg == nil { return nil } sess := eg.NewSession() defer sess.Close() sess.Begin() list := db.GetTaskCenterRewardList(sess, canalMsg.Id) if list == nil { sess.Rollback() return nil } if list.IsSend == 1 { sess.Rollback() return nil } var tmp = make(map[string]string) json.Unmarshal([]byte(list.Reward), &tmp) for k, v := range tmp { if utils.StrToFloat64(v) == 0 { continue } err := db.GetTaskTotal(sess, utils.IntToStr(list.Uid), time.Unix(int64(list.CreateTime), 0).Format("20060102"), time.Unix(int64(list.CreateTime), 0).Format("200601"), k, list.Type, list.TaskType, list.Title, v) if err != nil { sess.Rollback() return err } } list.IsSend = 1 update, err := sess.Where("id=?", list.Id).Cols("is_send").Update(list) if update == 0 || err != nil { sess.Rollback() return errors.New("失败") } sess.Commit() return nil }