diff --git a/app/db/dbs_map.go b/app/db/dbs_map.go index 4c5a323..ad8b420 100644 --- a/app/db/dbs_map.go +++ b/app/db/dbs_map.go @@ -111,7 +111,7 @@ func GetAllDatabaseDev() *[]model.DbMapping { if cfg.Local { // 本地调试 加快速度 fmt.Println("notice:LOCAL TEST, only masterId:** 123456 ** available!") err = Db.Where("deleted_at != ? AND is_dev = '1' AND db_master_id= ?", 1, 123456). - Or("db_master_id = ?", 25616402).Find(&m) + Or("db_master_id = ?", 71353282).Find(&m) } else { err = Db.Where("deleted_at != ? AND is_dev = '1' ", 1).Find(&m) } diff --git a/consume/canal_service_award_dividend_relation_consume.go b/consume/canal_service_award_dividend_relation_consume.go new file mode 100644 index 0000000..c260a67 --- /dev/null +++ b/consume/canal_service_award_dividend_relation_consume.go @@ -0,0 +1,101 @@ +package consume + +import ( + "applet/app/db" + "applet/app/utils/logx" + "applet/consume/md" + zhios_order_relate_utils "code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/utils" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/service_award_dividend" + md2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule/service_award_dividend/md" + "code.fnuoos.com/go_rely_warehouse/zyos_model.git/src/models" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "strings" +) + +func CanalServiceAwardDividendRelationConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1000) + delivery := ch.Consume(queue.Name, true) //设置自动应答 + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>>>CanalServiceAwardDividendRelationConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + handleCanalServiceAwardDividendRelationConsume(res.Body) + //_ = res.Reject + //_ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleCanalServiceAwardDividendRelationConsume(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.CanalServiceAwardDividendRelationMessage[md.CanalServiceAwardDividendRelation] + err := json.Unmarshal(msg, &canalMsg) + if err != nil { + fmt.Println("err::::", err.Error()) + return err + } + if canalMsg.Type != md.CanalMsgUpdateSqlType { + return nil + } + masterId := strings.Split(canalMsg.Database, "_")[1] + if masterId != "71353282" { + return nil + } + + if len(canalMsg.Old) > 0 && canalMsg.Old[0].VipLevel != "" && canalMsg.Old[0].VipLevel != canalMsg.Data[0].VipLevel { + eg := db.DBs[masterId] + //TODO::删除旧的记录 + var parentServiceAwardDividendRelation models.ServiceAwardDividendRelation + _, err1 := eg.Where("uid = ?", canalMsg.Data[0].BindUid).Get(&parentServiceAwardDividendRelation) + if err1 != nil { + return err1 + } + parentServiceAwardDividendRelation.AllocationNums-- + _, err1 = eg.Where("id=?", parentServiceAwardDividendRelation.Id).Cols("allocation_nums").Update(&parentServiceAwardDividendRelation) + if err1 != nil { + return err1 + } + + _, err = eg.Where("id = ?", canalMsg.Data[0].Id).Delete(models.ServiceAwardDividendRelation{}) + if err != nil { + fmt.Println("err::::", err.Error()) + return err + } + + err = service_award_dividend.JoinServiceAwardDividendRelation(eg, md2.JoinServiceAwardDividendRelationReq{ + MasterId: masterId, + Uid: zhios_order_relate_utils.StrToInt(canalMsg.Data[0].Uid), + ParentUid: zhios_order_relate_utils.StrToInt(canalMsg.Data[0].ParentUid), + RegionId: zhios_order_relate_utils.StrToInt(canalMsg.Data[0].RegionId), + CreateAt: canalMsg.Data[0].CreateAt, + AllocationNums: zhios_order_relate_utils.StrToInt(canalMsg.Data[0].AllocationNums), + }) + if err != nil { + fmt.Println("err::::", err.Error()) + return err + } + } + + return nil +} diff --git a/consume/init.go b/consume/init.go index 3a735e8..9b6cffd 100644 --- a/consume/init.go +++ b/consume/init.go @@ -99,8 +99,9 @@ func initConsumes() { jobs[consumeMd.FlexibleEmploymentWithdrawForPupiaoConsumeFunName] = FlexibleEmploymentWithdrawForPupiaoConsume jobs[consumeMd.NewFlexibleEmploymentWithdrawForPupiaoConsumeFunName] = NewFlexibleEmploymentWithdrawForPupiaoConsume jobs[consumeMd.ServiceAwardDividendRelationConsumeFunName] = ServiceAwardDividendRelationConsume - jobs[consumeMd.DividendRewardConsumeFunName] = DividendRewardConsume //蜂蜜分红 - jobs[consumeMd.ContributionExchangeConsumeFunName] = ContributionExchangeConsume //蜂蜜分红-兑换贡献值 + jobs[consumeMd.CanalServiceAwardDividendRelationConsumeFunName] = CanalServiceAwardDividendRelationConsume //服务奖消费 + jobs[consumeMd.DividendRewardConsumeFunName] = DividendRewardConsume //蜂蜜分红 + jobs[consumeMd.ContributionExchangeConsumeFunName] = ContributionExchangeConsume //蜂蜜分红-兑换贡献值 //jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 //jobs[consumeMd.ZhiosOneCirclesCoinConsumeFunName] = ZhiosOneCirclesCoinConsume //一个圈圈虚拟币变化 diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index ad6f665..f1c48fb 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -506,6 +506,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CanalUserVirtualCoinFlowConsume", }, + { + ExchangeName: "canal.topic", + Name: "canal_service_award_dividend_relation", + Type: TopicQueueType, + IsPersistent: false, + RoutKey: "canal_service_award_dividend_relation", + BindKey: "", + ConsumeFunName: "CanalServiceAwardDividendRelationConsume", + }, { ExchangeName: "one.circles", Name: "one_circles_sign_in_green_energy", @@ -713,6 +722,7 @@ const ( InstallmentPaymentAutoRepaidConsumeFunName = "InstallmentPaymentAutoRepaidConsume" CanalGimMessageConsumeFunName = "CanalGimMessageConsume" ServiceAwardDividendRelationConsumeFunName = "ServiceAwardDividendRelationConsume" + CanalServiceAwardDividendRelationConsumeFunName = "CanalServiceAwardDividendRelationConsume" DividendRewardConsumeFunName = "DividendRewardConsume" ContributionExchangeConsumeFunName = "ContributionExchangeConsume" ) diff --git a/consume/md/md_user.go b/consume/md/md_user.go new file mode 100644 index 0000000..336ad24 --- /dev/null +++ b/consume/md/md_user.go @@ -0,0 +1,29 @@ +package md + +type CanalServiceAwardDividendRelation struct { + Id string `json:"id"` + Uid string `json:"uid"` + ParentUid string `json:"parent_uid"` + BindUid string `json:"bind_uid"` + VipLevel string `json:"vip_level"` + AllocationNums string `json:"allocation_nums"` + RegionId string `json:"region_id"` + CreateAt string `json:"create_at"` + UpdateAt string `json:"update_at"` +} + +type CanalServiceAwardDividendRelationMessage[T any] struct { + Data []T `json:"data"` + Database string `json:"database"` + ES int64 `json:"es"` + ID int64 `json:"id"` + IsDdl bool `json:"isDdl"` + Old []struct { + VipLevel string `json:"vip_level"` + UpdateAt string `json:"update_at"` + } `json:"old"` + PkNames []string `json:"pkNames"` + Table string `json:"table"` + TS int64 `json:"ts"` + Type string `json:"type"` +} diff --git a/go.mod b/go.mod index ed4e3a2..35b4d15 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240911091427-ab0c3e964b56 code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0 code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5 - code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240913101023-fa0fc0ae7803 + code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240914112555-5fc5d98aa38e code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240730013105-a13176f971e9 code.fnuoos.com/go_rely_warehouse/zyos_model.git v0.0.4-0.20240830093306-94cbf0dfb7bd