package consume import ( "applet/app/cfg" "applet/app/db" "applet/app/e" utils2 "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" db2 "code.fnuoos.com/EggPlanet/egg_models.git/src" "code.fnuoos.com/EggPlanet/egg_models.git/src/model" "code.fnuoos.com/EggPlanet/egg_system_rules.git" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "encoding/json" "errors" "fmt" "github.com/jinzhu/copier" "github.com/streadway/amqp" "time" "xorm.io/xorm" ) func UserDeleteConsume(queue md.MqQueue) { var backCfg db2.BackUpDBCfg copier.Copy(&backCfg, &cfg.BackUpDb) backUpDb, err := db2.InitBackUpDB(&backCfg) if err != nil { logx.Error(err) return } fmt.Println(">>>>>>>>>>>>UserDeleteConsume>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 ch.Qos(100) delivery := ch.Consume(queue.Name, false) egg_system_rules.Init(cfg.RedisAddr) var res amqp.Delivery var ok bool for { res, ok = <-delivery if ok == true { err = handleUserDeleteConsume(backUpDb, ch, res.Body) if err != nil { fmt.Println("UserDeleteConsume_ERR:::::", err.Error()) utils2.FilePutContents("UserDeleteConsume_ERR", utils2.SerializeStr(map[string]interface{}{ "body": res.Body, "err": err.Error(), })) _ = res.Reject(false) //TODO::重新推回队列末尾,避免造成队列堵塞 var msg *md.CommUserId json.Unmarshal(res.Body, &msg) ch.Publish(queue.ExchangeName, msg, queue.RoutKey) } else { //_ = res.Reject(false) err = res.Ack(true) } fmt.Println("err ::: ", err) } else { panic(errors.New("error getting message")) } } fmt.Println("get msg done") } func handleUserDeleteConsume(backEg *xorm.Engine, ch *rabbit.Channel, msgData []byte) error { if backEg == nil { return nil } time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 // 1.解析mq中queue的数据结构体 var msg *md.CommUserId err := json.Unmarshal(msgData, &msg) if err != nil { return err } eg := db.Db //1.用户信息 var user model.User exist, err := eg.Where("id=?", msg.Uid).Get(&user) if exist { backEg.Insert(&user) has, err := eg.Where("id=?", msg.Uid).Delete(&model.User{}) if has == 0 || err != nil { return e.NewErr(400, "注销失败") } } //2.用户关系链 //查出所有下级 var relate []model.UserRelate err = eg.Where("parent_uid=?", msg.Uid).Find(&relate) if len(relate) > 0 { backEg.Insert(&relate) } var ids = make([]int64, 0) for _, v := range relate { ids = append(ids, v.Uid) } //读出所有下级重新处理 var allRelate []model.UserRelate err = eg.In("uid", ids).Asc("level").Find(&allRelate) var relateMap = make(map[int64]int) for _, v := range allRelate { if v.ParentUid != utils2.StrToInt64(msg.Uid) { //如果相等就删掉 relateMap[v.Uid]++ eg.Where("id=?", v.Id).Cols("level").Update(&model.UserRelate{Level: relateMap[v.Uid]}) } } _, err = eg.Where("parent_uid=?", msg.Uid).Delete(&model.UserRelate{}) var relateParent []model.UserRelate err = eg.Where("uid=?", msg.Uid).Find(&relateParent) if len(relateParent) > 0 { backEg.Insert(&relateParent) } _, err = eg.Where("uid=?", msg.Uid).Delete(&model.UserRelate{}) //3.公排位置 var publicPlatoonUserRelation model.PublicPlatoonUserRelation exist, err = eg.Where("uid=?", msg.Uid).Get(&publicPlatoonUserRelation) if exist { backEg.Insert(&publicPlatoonUserRelation) } eg.Where("uid=?", msg.Uid).Delete(&model.PublicPlatoonUserRelation{}) //4.用户余额 var UserWallet model.UserWallet exist, err = eg.Where("uid=?", msg.Uid).Get(&UserWallet) if exist { backEg.Insert(&UserWallet) } eg.Where("uid=?", msg.Uid).Delete(&model.UserWallet{}) //5.用户积分 var UserVirtualAmount []model.UserVirtualAmount err = eg.Where("uid=?", msg.Uid).Find(&UserVirtualAmount) if len(UserVirtualAmount) > 0 { backEg.Insert(&UserVirtualAmount) } eg.Where("uid=?", msg.Uid).Delete(&model.UserVirtualAmount{}) //6.用户实名信息 var UserRealNameAuth model.UserRealNameAuth exist, err = eg.Where("uid=?", msg.Uid).Get(&UserRealNameAuth) if exist { backEg.Insert(&UserRealNameAuth) } eg.Where("uid=?", msg.Uid).Delete(&model.UserRealNameAuth{}) return nil }