package consume import ( "applet/app/db" "applet/app/db/model" "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "encoding/json" "errors" "fmt" "github.com/streadway/amqp" "strings" "time" "xorm.io/xorm" ) func ZhiosUserRelate(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 ch.Qos(1) delivery := ch.Consume(queue.Name, false) var res amqp.Delivery var ok bool for { res, ok = <-delivery if ok == true { //fmt.Println(string(res.Body)) fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") err = handleZhiosUserRelate(res.Body) //_ = res.Reject(false) if err != nil { _ = res.Reject(false) //TODO::重新推回队列末尾,避免造成队列堵塞 var msg *md.ZhiosOrderBuckle var tmpString string err := json.Unmarshal(res.Body, &tmpString) if err != nil { return } fmt.Println(tmpString) err = json.Unmarshal([]byte(tmpString), &msg) if err != nil { return } ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey) } else { _ = res.Ack(true) } } else { panic(errors.New("error getting message")) } } fmt.Println("get msg done") } func handleZhiosUserRelate(msg []byte) error { time.Sleep(time.Microsecond * 20) // 等待500毫秒 //1、解析canal采集至mq中queue的数据结构体 var canalMsg *md.ZhiosOrderBuckle fmt.Println(string(msg)) var tmpString string err := json.Unmarshal(msg, &tmpString) if err != nil { fmt.Println("===with", err.Error()) return err } fmt.Println(tmpString) err = json.Unmarshal([]byte(tmpString), &canalMsg) if err != nil { fmt.Println("===with", err.Error()) return err } mid := canalMsg.Mid eg := db.DBs[mid] if eg == nil { return nil } profile, err := db.UserProfileFindByID(eg, canalMsg.Uid) if err != nil || profile == nil { return nil } if profile.ParentUid > 0 { ur := new(model.UserRelate) //如果有上级要加入关系链 initLV := 1 ur.ParentUid = profile.ParentUid ur.Uid = profile.Uid ur.Level = initLV ur.InviteTime = time.Now() _, err = db.UserRelateInsert(eg, ur) if err != nil && strings.Contains(err.Error(), "Duplicate") == false { return err } // 插入多级关联 RoutineMultiRelate1(eg, ur.ParentUid, ur.Uid, initLV) } return nil } //RoutineMultiRelate is 多级关联 func RoutineMultiRelate1(eg *xorm.Engine, pid int, uid int, lv int) { for { if pid == 0 { break } m, err := db.UserProfileFindByID(eg, pid) if err != nil { logx.Warn(err) break } if m != nil { if m.ParentUid == 0 { break } lv++ ur := new(model.UserRelate) ur.ParentUid = m.ParentUid ur.Uid = uid ur.Level = lv ur.InviteTime = time.Now() _, err := db.UserRelateInsert(eg, ur) if err != nil && strings.Contains(err.Error(), "Duplicate") == false { logx.Warn(err) break } if err != nil && strings.Contains(err.Error(), "Duplicate") { tmp, _, _ := db.UserRelateByUIDAndPUID(eg, ur.Uid, ur.ParentUid) if tmp != nil && tmp.Level != ur.Level { db.UserRelateUpdate(eg, ur) } } // 还要关联当前的用户的所有下级,注意关联等级 //go RoutineInsertUserRelate(c, m.ParentUid, uid, lv) // 下级关联上上级 // 继续查询 logx.Info(fmt.Sprintf("关联pid(%v) -> uid(%v),lv:%v", ur.ParentUid, ur.Uid, lv)) logx.Info("继续查询") pid = m.ParentUid } if m == nil { logx.Info("查询结束,退出") break } } }