From 09e24eee71dc53bbd55ddfce4f6ab0ec044b37d3 Mon Sep 17 00:00:00 2001 From: huangjiajun <582604932@qq.com> Date: Tue, 10 Dec 2024 15:35:16 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=A8=E6=88=B7=E6=B3=A8=E9=94=80=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/cfg/cfg_app.go | 2 +- consume/init.go | 16 ++-- consume/md/consume_key.go | 10 +++ consume/md/mq_com.go | 5 ++ consume/user_delete_consume.go | 154 +++++++++++++++++++++++++++++++++ main.go | 1 + 6 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 consume/md/mq_com.go create mode 100644 consume/user_delete_consume.go diff --git a/app/cfg/cfg_app.go b/app/cfg/cfg_app.go index cdef809..d312e99 100644 --- a/app/cfg/cfg_app.go +++ b/app/cfg/cfg_app.go @@ -12,7 +12,7 @@ type Config struct { RedisPassword string `yaml:"redis_password"` DB DBCfg `yaml:"db"` ImDB DBCfg `yaml:"im_db"` - BackUpDb BackUpDBCfg `yaml:"back_up_db"` + BackUpDb BackUpDBCfg `yaml:"db_back"` MQ MQCfg `yaml:"mq"` Log LogCfg `yaml:"log"` ImBusinessRpc ImBusinessRpcCfg `yaml:"im_business_rpc"` diff --git a/consume/init.go b/consume/init.go index 476dc04..543c328 100644 --- a/consume/init.go +++ b/consume/init.go @@ -29,13 +29,13 @@ func initConsumes() { jobs[consumeMd.EggEnergySettlementPublicGiveActivityCoinFunName] = EggEnergySettlementPublicGiveActivityCoinConsume jobs[consumeMd.EggEnergyStartExchangeGreenEnergyFunName] = EggEnergyStartExchangeGreenEnergyConsume jobs[consumeMd.EggEnergyAutoExchangeGreenEnergyFunName] = EggEnergyAutoExchangeGreenEnergyConsume - jobs[consumeMd.EggEnergyNewUserRegisterDataFunName] = EggEnergyNewUserRegisterDataConsume // 新用户注册 - jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值 - jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数 - jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数 - jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 - jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励 - jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励 + jobs[consumeMd.EggEnergyNewUserRegisterDataFunName] = EggEnergyNewUserRegisterDataConsume // 新用户注册 + jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值 + jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数 + jobs[consumeMd.EggCanalViolateNumsFunName] = EggCanalViolateNumsConsume // 处理违规次数 + jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 + jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励 + jobs[consumeMd.PlayletRewardFunName] = PlayletRewardConsume // 短剧奖励 jobs[consumeMd.EggEnergyNewUserRegisterDataFunName] = EggEnergyNewUserRegisterDataConsume // 新用户注册 jobs[consumeMd.EggEnergyDealUserECPMFunName] = EggEnergyDealUserECPMConsume // 处理给用户Ecpm值 jobs[consumeMd.EggCanalInviteUserNumsFunName] = EggCanalInviteUserNumsConsume // 处理拉新人数 @@ -43,6 +43,8 @@ func initConsumes() { jobs[consumeMd.IMEggEnergySendRedPackageFunName] = IMEggEnergySendRedPackageConsume // 处理用户发送红包次数 jobs[consumeMd.EggRecordActiveDataFunName] = EggRecordActiveDataConsume // 用户签到后更新活跃数据 jobs[consumeMd.EggCanalPersonAddActivityValueFunName] = EggCanalPersonAddActivityValueConsume // 用户活跃积分变更时更新es + jobs[consumeMd.UserDeleteFunName] = UserDeleteConsume // 用户注销处理 + } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 0de66ab..bf60f39 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -218,6 +218,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "PlayletRewardFunName", }, + { + ExchangeName: "egg.user", + Name: "egg_user_delete", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "user_delete", + BindKey: "", + ConsumeFunName: "UserDeleteConsume", + }, } const ( @@ -244,4 +253,5 @@ const ( EggCanalPersonAddActivityValueFunName = "EggCanalPersonAddActivityValueConsume" VideoRewardFunName = "VideoRewardFunName" PlayletRewardFunName = "PlayletRewardFunName" + UserDeleteFunName = "UserDeleteConsume" ) diff --git a/consume/md/mq_com.go b/consume/md/mq_com.go new file mode 100644 index 0000000..7649e9c --- /dev/null +++ b/consume/md/mq_com.go @@ -0,0 +1,5 @@ +package md + +type CommUserId struct { + Uid string `json:"uid"` +} diff --git a/consume/user_delete_consume.go b/consume/user_delete_consume.go new file mode 100644 index 0000000..837211b --- /dev/null +++ b/consume/user_delete_consume.go @@ -0,0 +1,154 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/e" + utils2 "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + db2 "code.fnuoos.com/EggPlanet/egg_models.git/src" + "code.fnuoos.com/EggPlanet/egg_models.git/src/model" + "code.fnuoos.com/EggPlanet/egg_system_rules.git" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/jinzhu/copier" + "github.com/streadway/amqp" + "time" + "xorm.io/xorm" +) + +func UserDeleteConsume(queue md.MqQueue) { + var backCfg db2.BackUpDBCfg + copier.Copy(&backCfg, &cfg.BackUpDb) + backUpDb, err := db2.InitBackUpDB(&backCfg) + if err != nil { + logx.Error(err) + return + } + fmt.Println(">>>>>>>>>>>>UserDeleteConsume>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(100) + delivery := ch.Consume(queue.Name, false) + + egg_system_rules.Init(cfg.RedisAddr) + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + err = handleUserDeleteConsume(backUpDb, ch, res.Body) + if err != nil { + fmt.Println("UserDeleteConsume_ERR:::::", err.Error()) + utils2.FilePutContents("UserDeleteConsume_ERR", utils2.SerializeStr(map[string]interface{}{ + "body": res.Body, + "err": err.Error(), + })) + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.CommUserId + json.Unmarshal(res.Body, &msg) + ch.Publish(queue.ExchangeName, msg, queue.RoutKey) + } else { + //_ = res.Reject(false) + err = res.Ack(true) + } + + fmt.Println("err ::: ", err) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleUserDeleteConsume(backEg *xorm.Engine, ch *rabbit.Channel, msgData []byte) error { + if backEg == nil { + return nil + } + time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 + // 1.解析mq中queue的数据结构体 + var msg *md.CommUserId + err := json.Unmarshal(msgData, &msg) + if err != nil { + return err + } + eg := db.Db + //1.用户信息 + var user model.User + exist, err := eg.Where("id=?", msg.Uid).Get(&user) + if exist { + backEg.Insert(&user) + has, err := eg.Where("id=?", msg.Uid).Delete(&model.User{}) + if has == 0 || err != nil { + return e.NewErr(400, "注销失败") + } + } + //2.用户关系链 + //查出所有下级 + var relate []model.UserRelate + err = eg.Where("parent_uid=?", msg.Uid).Find(&relate) + if len(relate) > 0 { + backEg.Insert(&relate) + } + var ids = make([]int64, 0) + for _, v := range relate { + ids = append(ids, v.Uid) + } + //读出所有下级重新处理 + var allRelate []model.UserRelate + err = eg.In("uid", ids).Asc("level").Find(&allRelate) + var relateMap = make(map[int64]int) + for _, v := range allRelate { + if v.ParentUid != utils2.StrToInt64(msg.Uid) { //如果相等就删掉 + relateMap[v.Uid]++ + eg.Where("id=?", v.Id).Cols("level").Update(&model.UserRelate{Level: relateMap[v.Uid]}) + } + } + _, err = eg.Where("parent_uid=?", msg.Uid).Delete(&model.UserRelate{}) + var relateParent []model.UserRelate + err = eg.Where("uid=?", msg.Uid).Find(&relateParent) + if len(relateParent) > 0 { + backEg.Insert(&relateParent) + } + _, err = eg.Where("uid=?", msg.Uid).Delete(&model.UserRelate{}) + //3.公排位置 + var publicPlatoonUserRelation model.PublicPlatoonUserRelation + exist, err = eg.Where("uid=?", msg.Uid).Get(&publicPlatoonUserRelation) + if exist { + backEg.Insert(&publicPlatoonUserRelation) + } + eg.Where("uid=?", msg.Uid).Delete(&model.PublicPlatoonUserRelation{}) + //4.用户余额 + var UserWallet model.UserWallet + exist, err = eg.Where("uid=?", msg.Uid).Get(&UserWallet) + if exist { + backEg.Insert(&UserWallet) + } + eg.Where("uid=?", msg.Uid).Delete(&model.UserWallet{}) + //5.用户积分 + var UserVirtualAmount []model.UserVirtualAmount + err = eg.Where("uid=?", msg.Uid).Find(&UserVirtualAmount) + if len(UserVirtualAmount) > 0 { + backEg.Insert(&UserVirtualAmount) + } + eg.Where("uid=?", msg.Uid).Delete(&model.UserVirtualAmount{}) + //6.用户实名信息 + var UserRealNameAuth model.UserRealNameAuth + exist, err = eg.Where("uid=?", msg.Uid).Get(&UserRealNameAuth) + if exist { + backEg.Insert(&UserRealNameAuth) + } + eg.Where("uid=?", msg.Uid).Delete(&model.UserRealNameAuth{}) + return nil +} diff --git a/main.go b/main.go index e9b2bda..b123d75 100644 --- a/main.go +++ b/main.go @@ -26,6 +26,7 @@ func init() { if err := db.InitImDB(cfg.IMDB); err != nil { // IM主数据库初始化 panic(err) } + } fmt.Println("init success") }