|
- package consume
-
- import (
- "applet/app/db"
- "applet/app/db/model"
- "applet/app/utils"
- "applet/app/utils/logx"
- "applet/consume/md"
- "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/streadway/amqp"
- "strings"
- "time"
- "xorm.io/xorm"
- )
-
- func ZhiosUserRelate(queue md.MqQueue) {
- fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
- ch, err := rabbit.Cfg.Pool.GetChannel()
- if err != nil {
- logx.Error(err)
- return
- }
- defer ch.Release()
- //1、将自己绑定到交换机上
- ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
- //2、取出数据进行消费
- ch.Qos(1)
- delivery := ch.Consume(queue.Name, false)
-
- var res amqp.Delivery
- var ok bool
- for {
- res, ok = <-delivery
- if ok == true {
- //fmt.Println(string(res.Body))
- fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
- err = handleZhiosUserRelate(res.Body)
- //_ = res.Reject(false)
- if err != nil {
- _ = res.Reject(false)
- //TODO::重新推回队列末尾,避免造成队列堵塞
- var msg *md.ZhiosOrderBuckle
- var tmpString string
- err := json.Unmarshal(res.Body, &tmpString)
- if err != nil {
- return
- }
- fmt.Println(tmpString)
- err = json.Unmarshal([]byte(tmpString), &msg)
- if err != nil {
- return
- }
- ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey)
- } else {
- _ = res.Ack(true)
- }
- } else {
- panic(errors.New("error getting message"))
- }
- }
- fmt.Println("get msg done")
- }
-
- func handleZhiosUserRelate(msg []byte) error {
- time.Sleep(time.Microsecond * 20) // 等待500毫秒
- //1、解析canal采集至mq中queue的数据结构体
- var canalMsg *md.ZhiosOrderBuckle
- fmt.Println(string(msg))
- var tmpString string
- err := json.Unmarshal(msg, &tmpString)
- if err != nil {
- fmt.Println("===with", err.Error())
- return err
- }
- fmt.Println(tmpString)
- err = json.Unmarshal([]byte(tmpString), &canalMsg)
- if err != nil {
- fmt.Println("===with", err.Error())
- return err
- }
- mid := canalMsg.Mid
- eg := db.DBs[mid]
- if eg == nil {
- return nil
- }
- profile, err := db.UserProfileFindByID(eg, canalMsg.Uid)
- if err != nil || profile == nil {
-
- return nil
- }
- if profile.ParentUid > 0 {
- ur := new(model.UserRelate)
- //如果有上级要加入关系链
- initLV := 1
- ur.ParentUid = profile.ParentUid
- ur.Uid = profile.Uid
- ur.Level = initLV
- ur.InviteTime = time.Now()
- _, err = db.UserRelateInsert(eg, ur)
-
- if err != nil && strings.Contains(err.Error(), "Duplicate") == false {
- return err
- }
- // 插入多级关联
- RoutineMultiRelate1(eg, ur.ParentUid, ur.Uid, initLV)
-
- }
- return nil
- }
-
- //RoutineMultiRelate is 多级关联
- func RoutineMultiRelate1(eg *xorm.Engine, pid int, uid int, lv int) {
-
- for {
- if pid == 0 {
- break
- }
- m, err := db.UserProfileFindByID(eg, pid)
- if err != nil {
- logx.Warn(err)
- break
- }
- if m != nil {
- if m.ParentUid == 0 {
- break
- }
- lv++
- ur := new(model.UserRelate)
- ur.ParentUid = m.ParentUid
- ur.Uid = uid
- ur.Level = lv
- ur.InviteTime = time.Now()
- _, err := db.UserRelateInsert(eg, ur)
- if err != nil && strings.Contains(err.Error(), "Duplicate") == false {
- logx.Warn(err)
- break
- }
- if err != nil && strings.Contains(err.Error(), "Duplicate") {
- tmp, _, _ := db.UserRelateByUIDAndPUID(eg, ur.Uid, ur.ParentUid)
- if tmp != nil && tmp.Level != ur.Level {
- db.UserRelateUpdate(eg, ur)
- }
- }
- // 还要关联当前的用户的所有下级,注意关联等级
- //go RoutineInsertUserRelate(c, m.ParentUid, uid, lv)
- // 下级关联上上级
- // 继续查询
- logx.Info(fmt.Sprintf("关联pid(%v) -> uid(%v),lv:%v", ur.ParentUid, ur.Uid, lv))
- logx.Info("继续查询")
- pid = m.ParentUid
- }
- if m == nil {
- logx.Info("查询结束,退出")
- break
- }
- }
- }
|