diff --git a/app/db/db_user_relate.go b/app/db/db_user_relate.go new file mode 100644 index 0000000..380b80d --- /dev/null +++ b/app/db/db_user_relate.go @@ -0,0 +1,249 @@ +package db + +import ( + "applet/app/db/model" + "applet/app/utils/logx" + + "xorm.io/xorm" +) + +// UserRelateInsert is 插入一条数据到用户关系表 +func UserRelateInsert(Db *xorm.Engine, userRelate *model.UserRelate) (int64, error) { + affected, err := Db.Insert(userRelate) + if err != nil { + return 0, err + } + return affected, nil +} +func UserRelateInsertWithSess(sess *xorm.Session, userRelate *model.UserRelate) (int64, error) { + affected, err := sess.Insert(userRelate) + if err != nil { + return 0, err + } + return affected, nil +} +func UserRelateUpdate(Db *xorm.Engine, userRelate *model.UserRelate) (int64, error) { + affected, err := Db.Where("parent_uid=? and uid=?", userRelate.ParentUid, userRelate.Uid).Cols("level,invite_time").Update(userRelate) + if err != nil { + return 0, err + } + return affected, nil +} + +//UserRelateByPuid is 获取用户关系列表 by puid +func UserRelatesByPuid(Db *xorm.Engine, puid interface{}, limit, start int) (*[]model.UserRelate, error) { + var m []model.UserRelate + if limit == 0 && start == 0 { + if err := Db.Where("parent_uid = ?", puid). + Cols(`id,parent_uid,uid,level,invite_time`). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } + if err := Db.Where("parent_uid = ?", puid). + Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + + return &m, nil +} + +//UserRelatesByPuidByLv is 获取用户关系列表 by puid 和lv +func UserRelatesByPuidByLv(Db *xorm.Engine, puid, lv interface{}, limit, start int) (*[]model.UserRelate, error) { + var m []model.UserRelate + if limit == 0 && start == 0 { + if err := Db.Where("parent_uid = ? AND level = ?", puid, lv). + Cols(`id,parent_uid,uid,level,invite_time`). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } + if err := Db.Where("parent_uid = ? AND level = ?", puid, lv). + Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + + return &m, nil +} + +//UserRelatesByPuidByLvByTime is 获取直属 level =1用户关系列表 by puid 和lv by time +func UserRelatesByPuidByLvByTime(Db *xorm.Engine, puid, lv, stime, etime interface{}, limit, start int) (*[]model.UserRelate, error) { + var m []model.UserRelate + if limit == 0 && start == 0 { + if err := Db.Where("parent_uid = ? AND level = ? AND invite_time > ? AND invite_time < ?", puid, lv, stime, etime). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } + if err := Db.Where("parent_uid = ? AND level = ? AND invite_time > ? AND invite_time < ?", puid, lv, stime, etime). + Limit(limit, start). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + + return &m, nil +} + +//UserRelatesByPuidByTime is 获取户关系列表 by puid 和lv by time +func UserRelatesByPuidByTime(Db *xorm.Engine, puid, stime, etime interface{}, limit, start int) (*[]model.UserRelate, error) { + var m []model.UserRelate + if limit == 0 && start == 0 { + if err := Db.Where("parent_uid = ? AND invite_time > ? AND invite_time < ?", puid, stime, etime). + Cols(`id,parent_uid,uid,level,invite_time`). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } + if err := Db.Where("parent_uid = ? AND invite_time > ? AND invite_time < ?", puid, stime, etime). + Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + + return &m, nil +} + +//UserRelatesByPuidExceptLv is 获取用户关系列表 by puid 和非 lv +func UserRelatesByPuidExceptLv(Db *xorm.Engine, puid, lv interface{}, limit, start int) (*[]model.UserRelate, error) { + var m []model.UserRelate + if limit == 0 && start == 0 { + if err := Db.Where("parent_uid = ? AND level != ?", puid, lv). + Cols(`id,parent_uid,uid,level,invite_time`). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } + if err := Db.Where("parent_uid = ? AND level != ?", puid, lv). + Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + + return &m, nil +} + +//UserRelateByUID is 获取用户关系表 by uid +func UserRelateByUID(Db *xorm.Engine, uid interface{}) (*model.UserRelate, bool, error) { + r := new(model.UserRelate) + has, err := Db.Where("uid=?", uid).Get(r) + if err != nil { + return nil, false, err + } + return r, has, nil +} + +//UserRelateByUIDByLv is 获取用户关系表 by uid +func UserRelateByUIDByLv(Db *xorm.Engine, uid, lv interface{}) (*model.UserRelate, bool, error) { + r := new(model.UserRelate) + has, err := Db.Where("uid=? AND level=?", uid, lv).Get(r) + if err != nil { + return nil, false, err + } + return r, has, nil +} + +//UserRelateByUIDAndPUID 根据 Puid 和uid 查找 ,用于确认关联 +func UserRelateByUIDAndPUID(Db *xorm.Engine, uid, puid interface{}) (*model.UserRelate, bool, error) { + r := new(model.UserRelate) + has, err := Db.Where("uid=? AND parent_uid=?", uid, puid).Get(r) + if err != nil { + return nil, false, err + } + return r, has, nil +} + +//UserRelatesByPuIDAndLv is 查询用户关系表 获取指定等级和puid的关系 +func UserRelatesByPuIDAndLv(Db *xorm.Engine, puid, lv interface{}) (*[]model.UserRelate, error) { + var m []model.UserRelate + if err := Db.Where("parent_uid = ? AND level = ?", puid, lv). + Cols(`id,parent_uid,uid,level,invite_time`). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil +} + +// UserRelateInByUID is In查询 +func UserRelateInByUID(Db *xorm.Engine, ids []int) (*[]model.UserRelate, error) { + var m []model.UserRelate + if err := Db.In("uid", ids). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil +} + +// UserRelatesByUIDDescLv is Where 查询 根据level 降序 +func UserRelatesByUIDDescLv(Db *xorm.Engine, id interface{}) (*[]model.UserRelate, error) { + var m []model.UserRelate + if err := Db.Where("uid = ?", id).Desc("level"). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil +} +func UserRelatesByInvite(Db *xorm.Engine, times interface{}) (*[]model.UserRelate, error) { + var m []model.UserRelate + if err := Db.Where("invite_time >= ?", times). + Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil +} + +//UserRelateCountByPUID is 根据puid 计数 +func UserRelateCountByPUID(Db *xorm.Engine, pid interface{}) (int64, error) { + + count, err := Db.Where("parent_uid = ?", pid).Count(model.UserRelate{}) + if err != nil { + return 0, nil + } + return count, nil +} + +//UserRelateDelete is 删除关联他上级的关系记录,以及删除他下级的关联记录 +func UserRelateDelete(Db *xorm.Engine, uid interface{}) (int64, error) { + // 删除与之上级的记录 + _, err := Db.Where("uid = ?", uid).Delete(model.UserRelate{}) + if err != nil { + return 0, err + } + // 删除与之下级的记录 + _, err = Db.Where("parent_uid = ?", uid).Delete(model.UserRelate{}) + if err != nil { + return 0, err + } + + return 1, nil +} +func UserRelateDeleteWithSession(sess *xorm.Session, uid interface{}) (int64, error) { + // 删除与之上级的记录 + _, err := sess.Where("uid = ?", uid).Delete(model.UserRelate{}) + if err != nil { + return 0, err + } + // 删除与之下级的记录 + _, err = sess.Where("parent_uid = ?", uid).Delete(model.UserRelate{}) + if err != nil { + return 0, err + } + + return 1, nil +} + +//UserRelateDelete is 删除关联他上级的关系记录 +func UserRelateExtendDelete(Db *xorm.Engine, uid interface{}) (int64, error) { + // 删除与之上级的记录 + _, err := Db.Where("uid = ?", uid).Delete(model.UserRelate{}) + if err != nil { + return 0, err + } + return 1, nil +} diff --git a/app/db/dbs_map.go b/app/db/dbs_map.go index 693d4ca..cb45884 100644 --- a/app/db/dbs_map.go +++ b/app/db/dbs_map.go @@ -21,7 +21,7 @@ func InitMapDbs(c *cfg.DBCfg, prd bool) { logx.Fatalf("db_mapping not exists : %v", err) } // tables := MapAllDatabases(debug) - if true { + if cfg.Prd { tables = GetAllDatabasePrd() //debug 获取生产 } else { tables = GetAllDatabaseDev() //debug 获取开发 diff --git a/app/db/model/user_relate.go b/app/db/model/user_relate.go new file mode 100644 index 0000000..375562f --- /dev/null +++ b/app/db/model/user_relate.go @@ -0,0 +1,13 @@ +package model + +import ( + "time" +) + +type UserRelate struct { + Id int64 `json:"id" xorm:"pk autoincr comment('主键') BIGINT(10)"` + ParentUid int `json:"parent_uid" xorm:"not null default 0 comment('上级会员ID') unique(idx_union_u_p_id) INT(20)"` + Uid int `json:"uid" xorm:"not null default 0 comment('关联UserID') unique(idx_union_u_p_id) INT(20)"` + Level int `json:"level" xorm:"not null default 1 comment('推广等级(1直属,大于1非直属)') INT(10)"` + InviteTime time.Time `json:"invite_time" xorm:"not null default CURRENT_TIMESTAMP comment('邀请时间') TIMESTAMP"` +} diff --git a/consume/init.go b/consume/init.go index 79f5e7f..521c2aa 100644 --- a/consume/init.go +++ b/consume/init.go @@ -17,48 +17,49 @@ func Init() { // 增加消费任务队列 func initConsumes() { - //jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge - //jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv - //jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume - //jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree - //jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal - //jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond - //// - //jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal - //jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy - //jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle - //// - //jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder - //jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder - // - //jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation - //jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser - // - //jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition - // - //jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial - //jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter - //jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender - //jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans - //jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv - // - //jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay - //jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess - //jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund - //jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond - // - //jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore - // - //jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail - // - //jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume - //jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate - //jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate + jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume + jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge + jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv + jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume + jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree + jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal + jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond // - //jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal - //jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail - //jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward + jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal + jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy + jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle // + jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder + jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder + + jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation + jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser + + jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition + + jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial + jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter + jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender + jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans + jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv + + jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay + jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess + jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund + jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond + + jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore + + jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail + + jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume + jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate + jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate + + jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal + jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail + jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward + //// ////////////////////////////////////// V1 ///////////////////////////////////////////////////// //jobs[consumeMd.CloudIssuanceMsgCallBackFunName] = CloudIssuanceMsgCallBackConsume @@ -70,10 +71,10 @@ func initConsumes() { //jobs[consumeMd.MallAddSupplyGoodsFunName] = MallAddSupplyGoodsConsume //////////////////////////////////////// bigData ///////////////////////////////////////////////////// - jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume - jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume - jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume - jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume + //jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume + //jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume + //jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume + //jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 8a1be54..6b9d7d8 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -380,6 +380,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosIntegralProxyRecharge", }, + { + ExchangeName: "zhios.mall_green_coin_consume.exchange", + Name: "zhios_mall_green_coin_consume", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "mall_green_coin_consume", + BindKey: "", + ConsumeFunName: "ZhiosMallGreenCoinConsume", + }, { ExchangeName: "canal.topic", Name: "canal_user_virtual_coin_flow", @@ -392,7 +401,9 @@ var RabbitMqQueueKeyList = []*MqQueue{ } const ( + ZhiosUserRelateFunName = "ZhiosUserRelate" ZhiosIntegralProxyRechargeFunName = "ZhiosIntegralProxyRecharge" + ZhiosMallGreenCoinConsumeFunName = "ZhiosMallGreenCoinConsume" ZhiosUserUpLvFunName = "ZhiosUserUpLv" CanalGuideOrderByUserUpLvConsume = "CanalGuideOrderByUserUpLvConsume" ZhiosOrderFreeFunName = "ZhiosOrderFree" diff --git a/consume/md/md_zhios_capital_pool_order_total.go b/consume/md/md_zhios_capital_pool_order_total.go index b150348..f6053e4 100644 --- a/consume/md/md_zhios_capital_pool_order_total.go +++ b/consume/md/md_zhios_capital_pool_order_total.go @@ -1,5 +1,10 @@ package md +const ( + MallGreenCoinConsume = "zhios.mall_green_coin_consume.exchange" + MallGreenCoinConsumeKeyErr = "mall_green_coin_consume_err" +) + type ZhiosCapitalPoolOrderTotal struct { Uid []string `json:"uid"` Mid string `json:"mid"` @@ -23,6 +28,14 @@ type ZhiosOrderBuckle struct { Uid string `json:"uid"` Mid string `json:"mid"` } +type ZhiosMallConsume struct { + Uid string `json:"uid"` + Mid string `json:"mid"` + Oid string `json:"oid"` + Amount string `json:"amount"` + Err string `json:"err"` +} + type ZhiosOrderFree struct { ItemId string `json:"item_id"` OptPvd string `json:"opt_pvd"` diff --git a/consume/zhios_mall_green_coin_consume.go b/consume/zhios_mall_green_coin_consume.go new file mode 100644 index 0000000..0d71687 --- /dev/null +++ b/consume/zhios_mall_green_coin_consume.go @@ -0,0 +1,92 @@ +package consume + +import ( + "applet/app/cfg" + "applet/app/db" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" +) + +func ZhiosMallGreenCoinConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosMallGreenCoinConsume(res.Body) + //_ = res.Reject(false) + if err != nil { + var canalMsg *md.ZhiosMallConsume + var tmpString string + err1 := json.Unmarshal(res.Body, &tmpString) + if err1 == nil { + fmt.Println(tmpString) + err1 = json.Unmarshal([]byte(tmpString), &canalMsg) + if err1 == nil { + canalMsg.Err = err.Error() + ch.Publish(md.MallGreenCoinConsume, utils.SerializeStr(canalMsg), md.MallGreenCoinConsumeKeyErr) + } + } + + } + _ = res.Ack(true) + + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosMallGreenCoinConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosMallConsume + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + rule.InitForGreenCoinDoubleChainIntegral(cfg.RedisAddr) + _, err = rule.DealUserGreenCoinDoubleChainIntegral(eg, utils.StrToInt(canalMsg.Uid), canalMsg.Amount, canalMsg.Oid, canalMsg.Mid) + fmt.Println(err) + if err != nil { + return err + } + + return nil +} diff --git a/consume/zhios_user_relate.go b/consume/zhios_user_relate.go new file mode 100644 index 0000000..a67d13b --- /dev/null +++ b/consume/zhios_user_relate.go @@ -0,0 +1,160 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "strings" + "time" + "xorm.io/xorm" +) + +func ZhiosUserRelate(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosUserRelate(res.Body) + //_ = res.Reject(false) + if err != nil { + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.ZhiosOrderBuckle + var tmpString string + err := json.Unmarshal(res.Body, &tmpString) + if err != nil { + return + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &msg) + if err != nil { + return + } + ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey) + } else { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleZhiosUserRelate(msg []byte) error { + time.Sleep(time.Microsecond * 20) // 等待500毫秒 + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosOrderBuckle + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + profile, err := db.UserProfileFindByID(eg, canalMsg.Uid) + if err != nil || profile == nil { + + return nil + } + if profile.ParentUid > 0 { + ur := new(model.UserRelate) + //如果有上级要加入关系链 + initLV := 1 + ur.ParentUid = profile.ParentUid + ur.Uid = profile.Uid + ur.Level = initLV + ur.InviteTime = time.Now() + _, err = db.UserRelateInsert(eg, ur) + + if err != nil && strings.Contains(err.Error(), "Duplicate") == false { + return err + } + // 插入多级关联 + RoutineMultiRelate1(eg, ur.ParentUid, ur.Uid, initLV) + + } + return nil +} + +//RoutineMultiRelate is 多级关联 +func RoutineMultiRelate1(eg *xorm.Engine, pid int, uid int, lv int) { + + for { + if pid == 0 { + break + } + m, err := db.UserProfileFindByID(eg, pid) + if err != nil { + logx.Warn(err) + break + } + if m != nil { + if m.ParentUid == 0 { + break + } + lv++ + ur := new(model.UserRelate) + ur.ParentUid = m.ParentUid + ur.Uid = uid + ur.Level = lv + ur.InviteTime = time.Now() + _, err := db.UserRelateInsert(eg, ur) + if err != nil && strings.Contains(err.Error(), "Duplicate") == false { + logx.Warn(err) + break + } + if err != nil && strings.Contains(err.Error(), "Duplicate") { + tmp, _, _ := db.UserRelateByUIDAndPUID(eg, ur.Uid, ur.ParentUid) + if tmp != nil && tmp.Level != ur.Level { + db.UserRelateUpdate(eg, ur) + } + } + // 还要关联当前的用户的所有下级,注意关联等级 + //go RoutineInsertUserRelate(c, m.ParentUid, uid, lv) + // 下级关联上上级 + // 继续查询 + logx.Info(fmt.Sprintf("关联pid(%v) -> uid(%v),lv:%v", ur.ParentUid, ur.Uid, lv)) + logx.Info("继续查询") + pid = m.ParentUid + } + if m == nil { + logx.Info("查询结束,退出") + break + } + } +} diff --git a/go.mod b/go.mod index 3e1ddf9..9ee47af 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240222023917-c31b53f7e8cb code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.4 - code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240119104238-05c3962029ff + code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240301103805-f71bf8ac0ab3 code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240126015516-38ca248db2fd github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5