huangjiajun 8 місяці тому
джерело
коміт
adafd116e4
9 змінених файлів з 585 додано та 46 видалено
  1. +249
    -0
      app/db/db_user_relate.go
  2. +1
    -1
      app/db/dbs_map.go
  3. +13
    -0
      app/db/model/user_relate.go
  4. +45
    -44
      consume/init.go
  5. +11
    -0
      consume/md/consume_key.go
  6. +13
    -0
      consume/md/md_zhios_capital_pool_order_total.go
  7. +92
    -0
      consume/zhios_mall_green_coin_consume.go
  8. +160
    -0
      consume/zhios_user_relate.go
  9. +1
    -1
      go.mod

+ 249
- 0
app/db/db_user_relate.go Переглянути файл

@@ -0,0 +1,249 @@
package db

import (
"applet/app/db/model"
"applet/app/utils/logx"

"xorm.io/xorm"
)

// UserRelateInsert is 插入一条数据到用户关系表
func UserRelateInsert(Db *xorm.Engine, userRelate *model.UserRelate) (int64, error) {
affected, err := Db.Insert(userRelate)
if err != nil {
return 0, err
}
return affected, nil
}
func UserRelateInsertWithSess(sess *xorm.Session, userRelate *model.UserRelate) (int64, error) {
affected, err := sess.Insert(userRelate)
if err != nil {
return 0, err
}
return affected, nil
}
func UserRelateUpdate(Db *xorm.Engine, userRelate *model.UserRelate) (int64, error) {
affected, err := Db.Where("parent_uid=? and uid=?", userRelate.ParentUid, userRelate.Uid).Cols("level,invite_time").Update(userRelate)
if err != nil {
return 0, err
}
return affected, nil
}

//UserRelateByPuid is 获取用户关系列表 by puid
func UserRelatesByPuid(Db *xorm.Engine, puid interface{}, limit, start int) (*[]model.UserRelate, error) {
var m []model.UserRelate
if limit == 0 && start == 0 {
if err := Db.Where("parent_uid = ?", puid).
Cols(`id,parent_uid,uid,level,invite_time`).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}
if err := Db.Where("parent_uid = ?", puid).
Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start).
Find(&m); err != nil {
return nil, logx.Warn(err)
}

return &m, nil
}

//UserRelatesByPuidByLv is 获取用户关系列表 by puid 和lv
func UserRelatesByPuidByLv(Db *xorm.Engine, puid, lv interface{}, limit, start int) (*[]model.UserRelate, error) {
var m []model.UserRelate
if limit == 0 && start == 0 {
if err := Db.Where("parent_uid = ? AND level = ?", puid, lv).
Cols(`id,parent_uid,uid,level,invite_time`).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}
if err := Db.Where("parent_uid = ? AND level = ?", puid, lv).
Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start).
Find(&m); err != nil {
return nil, logx.Warn(err)
}

return &m, nil
}

//UserRelatesByPuidByLvByTime is 获取直属 level =1用户关系列表 by puid 和lv by time
func UserRelatesByPuidByLvByTime(Db *xorm.Engine, puid, lv, stime, etime interface{}, limit, start int) (*[]model.UserRelate, error) {
var m []model.UserRelate
if limit == 0 && start == 0 {
if err := Db.Where("parent_uid = ? AND level = ? AND invite_time > ? AND invite_time < ?", puid, lv, stime, etime).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}
if err := Db.Where("parent_uid = ? AND level = ? AND invite_time > ? AND invite_time < ?", puid, lv, stime, etime).
Limit(limit, start).
Find(&m); err != nil {
return nil, logx.Warn(err)
}

return &m, nil
}

//UserRelatesByPuidByTime is 获取户关系列表 by puid 和lv by time
func UserRelatesByPuidByTime(Db *xorm.Engine, puid, stime, etime interface{}, limit, start int) (*[]model.UserRelate, error) {
var m []model.UserRelate
if limit == 0 && start == 0 {
if err := Db.Where("parent_uid = ? AND invite_time > ? AND invite_time < ?", puid, stime, etime).
Cols(`id,parent_uid,uid,level,invite_time`).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}
if err := Db.Where("parent_uid = ? AND invite_time > ? AND invite_time < ?", puid, stime, etime).
Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start).
Find(&m); err != nil {
return nil, logx.Warn(err)
}

return &m, nil
}

//UserRelatesByPuidExceptLv is 获取用户关系列表 by puid 和非 lv
func UserRelatesByPuidExceptLv(Db *xorm.Engine, puid, lv interface{}, limit, start int) (*[]model.UserRelate, error) {
var m []model.UserRelate
if limit == 0 && start == 0 {
if err := Db.Where("parent_uid = ? AND level != ?", puid, lv).
Cols(`id,parent_uid,uid,level,invite_time`).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}
if err := Db.Where("parent_uid = ? AND level != ?", puid, lv).
Cols(`id,parent_uid,uid,level,invite_time`).Limit(limit, start).
Find(&m); err != nil {
return nil, logx.Warn(err)
}

return &m, nil
}

//UserRelateByUID is 获取用户关系表 by uid
func UserRelateByUID(Db *xorm.Engine, uid interface{}) (*model.UserRelate, bool, error) {
r := new(model.UserRelate)
has, err := Db.Where("uid=?", uid).Get(r)
if err != nil {
return nil, false, err
}
return r, has, nil
}

//UserRelateByUIDByLv is 获取用户关系表 by uid
func UserRelateByUIDByLv(Db *xorm.Engine, uid, lv interface{}) (*model.UserRelate, bool, error) {
r := new(model.UserRelate)
has, err := Db.Where("uid=? AND level=?", uid, lv).Get(r)
if err != nil {
return nil, false, err
}
return r, has, nil
}

//UserRelateByUIDAndPUID 根据 Puid 和uid 查找 ,用于确认关联
func UserRelateByUIDAndPUID(Db *xorm.Engine, uid, puid interface{}) (*model.UserRelate, bool, error) {
r := new(model.UserRelate)
has, err := Db.Where("uid=? AND parent_uid=?", uid, puid).Get(r)
if err != nil {
return nil, false, err
}
return r, has, nil
}

//UserRelatesByPuIDAndLv is 查询用户关系表 获取指定等级和puid的关系
func UserRelatesByPuIDAndLv(Db *xorm.Engine, puid, lv interface{}) (*[]model.UserRelate, error) {
var m []model.UserRelate
if err := Db.Where("parent_uid = ? AND level = ?", puid, lv).
Cols(`id,parent_uid,uid,level,invite_time`).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}

// UserRelateInByUID is In查询
func UserRelateInByUID(Db *xorm.Engine, ids []int) (*[]model.UserRelate, error) {
var m []model.UserRelate
if err := Db.In("uid", ids).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}

// UserRelatesByUIDDescLv is Where 查询 根据level 降序
func UserRelatesByUIDDescLv(Db *xorm.Engine, id interface{}) (*[]model.UserRelate, error) {
var m []model.UserRelate
if err := Db.Where("uid = ?", id).Desc("level").
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}
func UserRelatesByInvite(Db *xorm.Engine, times interface{}) (*[]model.UserRelate, error) {
var m []model.UserRelate
if err := Db.Where("invite_time >= ?", times).
Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
}

//UserRelateCountByPUID is 根据puid 计数
func UserRelateCountByPUID(Db *xorm.Engine, pid interface{}) (int64, error) {

count, err := Db.Where("parent_uid = ?", pid).Count(model.UserRelate{})
if err != nil {
return 0, nil
}
return count, nil
}

//UserRelateDelete is 删除关联他上级的关系记录,以及删除他下级的关联记录
func UserRelateDelete(Db *xorm.Engine, uid interface{}) (int64, error) {
// 删除与之上级的记录
_, err := Db.Where("uid = ?", uid).Delete(model.UserRelate{})
if err != nil {
return 0, err
}
// 删除与之下级的记录
_, err = Db.Where("parent_uid = ?", uid).Delete(model.UserRelate{})
if err != nil {
return 0, err
}

return 1, nil
}
func UserRelateDeleteWithSession(sess *xorm.Session, uid interface{}) (int64, error) {
// 删除与之上级的记录
_, err := sess.Where("uid = ?", uid).Delete(model.UserRelate{})
if err != nil {
return 0, err
}
// 删除与之下级的记录
_, err = sess.Where("parent_uid = ?", uid).Delete(model.UserRelate{})
if err != nil {
return 0, err
}

return 1, nil
}

//UserRelateDelete is 删除关联他上级的关系记录
func UserRelateExtendDelete(Db *xorm.Engine, uid interface{}) (int64, error) {
// 删除与之上级的记录
_, err := Db.Where("uid = ?", uid).Delete(model.UserRelate{})
if err != nil {
return 0, err
}
return 1, nil
}

+ 1
- 1
app/db/dbs_map.go Переглянути файл

@@ -21,7 +21,7 @@ func InitMapDbs(c *cfg.DBCfg, prd bool) {
logx.Fatalf("db_mapping not exists : %v", err)
}
// tables := MapAllDatabases(debug)
if true {
if cfg.Prd {
tables = GetAllDatabasePrd() //debug 获取生产
} else {
tables = GetAllDatabaseDev() //debug 获取开发


+ 13
- 0
app/db/model/user_relate.go Переглянути файл

@@ -0,0 +1,13 @@
package model

import (
"time"
)

type UserRelate struct {
Id int64 `json:"id" xorm:"pk autoincr comment('主键') BIGINT(10)"`
ParentUid int `json:"parent_uid" xorm:"not null default 0 comment('上级会员ID') unique(idx_union_u_p_id) INT(20)"`
Uid int `json:"uid" xorm:"not null default 0 comment('关联UserID') unique(idx_union_u_p_id) INT(20)"`
Level int `json:"level" xorm:"not null default 1 comment('推广等级(1直属,大于1非直属)') INT(10)"`
InviteTime time.Time `json:"invite_time" xorm:"not null default CURRENT_TIMESTAMP comment('邀请时间') TIMESTAMP"`
}

+ 45
- 44
consume/init.go Переглянути файл

@@ -17,48 +17,49 @@ func Init() {

// 增加消费任务队列
func initConsumes() {
//jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge
//jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv
//jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume
//jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree
//jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal
//jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond
////
//jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal
//jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy
//jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle
////
//jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder
//jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder
//
//jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation
//jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser
//
//jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition
//
//jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial
//jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter
//jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender
//jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans
//jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv
//
//jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay
//jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess
//jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund
//jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond
//
//jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore
//
//jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail
//
//jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume
//jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate
//jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate
jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume
jobs[consumeMd.ZhiosIntegralProxyRechargeFunName] = ZhiosIntegralProxyRecharge
jobs[consumeMd.ZhiosUserUpLvFunName] = ZhiosUserUpLv
jobs[consumeMd.CanalGuideOrderByUserUpLvConsume] = CanalGuideOrderByUserUpLvConsume
jobs[consumeMd.ZhiosOrderFreeFunName] = ZhiosOrderFree
jobs[consumeMd.ZhiosOrderTotalFunName] = ZhiosOrderTotal
jobs[consumeMd.ZhiosOrderTotalSecondFunName] = ZhiosOrderTotalSecond
//
//jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal
//jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail
//jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward
jobs[consumeMd.ZhiosOrderSettleTotalFunName] = ZhiosSettleTotal
jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy
jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle
//
jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder
jobs[consumeMd.ZhiosGuideStoreOrderFunName] = ZhiosGuideStoreOrder

jobs[consumeMd.ZhiosAppreciationFunName] = ZhiosAppreciation
jobs[consumeMd.ZhiosValidUserFunName] = ZhiosValidUser

jobs[consumeMd.ZhiosAcquisitionConditionFunName] = ZhiosAcquisitionCondition

jobs[consumeMd.DouShenUserRegisterConsumeForOfficialFunName] = DouShenUserRegisterConsumeForOfficial
jobs[consumeMd.DouShenUserRegisterConsumeForOperationCenterFunName] = DouShenUserRegisterConsumeForOperationCenter
jobs[consumeMd.DouShenUserRegisterConsumeForMyRecommenderFunName] = DouShenUserRegisterConsumeForMyRecommender
jobs[consumeMd.DouShenUserRegisterConsumeForMyFansFunName] = DouShenUserRegisterConsumeForMyFans
jobs[consumeMd.DouShenUserRegisterConsumeForUserRegisterUpLvFunName] = DouShenUserRegisterConsumeForUserRegisterUpLv

jobs[consumeMd.ZhiosFastReturnOrderPayFunName] = ZhiosFastReturnOrderPay
jobs[consumeMd.ZhiosFastReturnOrderSuccessFunName] = ZhiosFastReturnOrderSuccess
jobs[consumeMd.ZhiosFastReturnOrderRefundFunName] = ZhiosFastReturnOrderRefund
jobs[consumeMd.ZhiosFastReturnOrderRefundSecondFunName] = ZhiosFastReturnOrderRefundSecond

jobs[consumeMd.YoumishangExchangeStoreFunName] = YoumishangExchangeStore

jobs[consumeMd.ZhiosRechargeOrderFailFunName] = ZhiosRechargeOrderFail

jobs[consumeMd.CloudIssuanceAsyncMLoginFunName] = CloudIssuanceAsyncMLoginConsume
jobs[consumeMd.ZhiosTikTokUpdateFunName] = ZhiosTikTokUpdate
jobs[consumeMd.ZhiosTikTokAllUpdateFunName] = ZhiosTikTokAllUpdate

jobs[consumeMd.ZhiosCapitalPoolOrderTotalFunName] = ZhiosCapitalPoolOrderTotal
jobs[consumeMd.ZhiosExpressOrderFail] = ZhiosExpressOrderFail
jobs[consumeMd.ZhiosWithdrawReward] = ZhiosWithdrawReward
////

////////////////////////////////////// V1 /////////////////////////////////////////////////////
//jobs[consumeMd.CloudIssuanceMsgCallBackFunName] = CloudIssuanceMsgCallBackConsume
@@ -70,10 +71,10 @@ func initConsumes() {
//jobs[consumeMd.MallAddSupplyGoodsFunName] = MallAddSupplyGoodsConsume

//////////////////////////////////////// bigData /////////////////////////////////////////////////////
jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume
jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume
jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume
jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume
//jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume
//jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume
//jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume
//jobs[consumeMd.CanalUserVirtualCcoinFlowFunName] = CanalUserVirtualCoinFlowConsume
}

func Run() {


+ 11
- 0
consume/md/consume_key.go Переглянути файл

@@ -380,6 +380,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "ZhiosIntegralProxyRecharge",
},
{
ExchangeName: "zhios.mall_green_coin_consume.exchange",
Name: "zhios_mall_green_coin_consume",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "mall_green_coin_consume",
BindKey: "",
ConsumeFunName: "ZhiosMallGreenCoinConsume",
},
{
ExchangeName: "canal.topic",
Name: "canal_user_virtual_coin_flow",
@@ -392,7 +401,9 @@ var RabbitMqQueueKeyList = []*MqQueue{
}

const (
ZhiosUserRelateFunName = "ZhiosUserRelate"
ZhiosIntegralProxyRechargeFunName = "ZhiosIntegralProxyRecharge"
ZhiosMallGreenCoinConsumeFunName = "ZhiosMallGreenCoinConsume"
ZhiosUserUpLvFunName = "ZhiosUserUpLv"
CanalGuideOrderByUserUpLvConsume = "CanalGuideOrderByUserUpLvConsume"
ZhiosOrderFreeFunName = "ZhiosOrderFree"


+ 13
- 0
consume/md/md_zhios_capital_pool_order_total.go Переглянути файл

@@ -1,5 +1,10 @@
package md

const (
MallGreenCoinConsume = "zhios.mall_green_coin_consume.exchange"
MallGreenCoinConsumeKeyErr = "mall_green_coin_consume_err"
)

type ZhiosCapitalPoolOrderTotal struct {
Uid []string `json:"uid"`
Mid string `json:"mid"`
@@ -23,6 +28,14 @@ type ZhiosOrderBuckle struct {
Uid string `json:"uid"`
Mid string `json:"mid"`
}
type ZhiosMallConsume struct {
Uid string `json:"uid"`
Mid string `json:"mid"`
Oid string `json:"oid"`
Amount string `json:"amount"`
Err string `json:"err"`
}

type ZhiosOrderFree struct {
ItemId string `json:"item_id"`
OptPvd string `json:"opt_pvd"`


+ 92
- 0
consume/zhios_mall_green_coin_consume.go Переглянути файл

@@ -0,0 +1,92 @@
package consume

import (
"applet/app/cfg"
"applet/app/db"
"applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/rule"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
)

func ZhiosMallGreenCoinConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleZhiosMallGreenCoinConsume(res.Body)
//_ = res.Reject(false)
if err != nil {
var canalMsg *md.ZhiosMallConsume
var tmpString string
err1 := json.Unmarshal(res.Body, &tmpString)
if err1 == nil {
fmt.Println(tmpString)
err1 = json.Unmarshal([]byte(tmpString), &canalMsg)
if err1 == nil {
canalMsg.Err = err.Error()
ch.Publish(md.MallGreenCoinConsume, utils.SerializeStr(canalMsg), md.MallGreenCoinConsumeKeyErr)
}
}

}
_ = res.Ack(true)

} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleZhiosMallGreenCoinConsume(msg []byte) error {
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.ZhiosMallConsume
fmt.Println(string(msg))
var tmpString string
err := json.Unmarshal(msg, &tmpString)
if err != nil {
fmt.Println("===with", err.Error())
return err
}
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &canalMsg)
if err != nil {
fmt.Println("===with", err.Error())
return err
}
mid := canalMsg.Mid
eg := db.DBs[mid]
if eg == nil {
return nil
}
rule.InitForGreenCoinDoubleChainIntegral(cfg.RedisAddr)
_, err = rule.DealUserGreenCoinDoubleChainIntegral(eg, utils.StrToInt(canalMsg.Uid), canalMsg.Amount, canalMsg.Oid, canalMsg.Mid)
fmt.Println(err)
if err != nil {
return err
}

return nil
}

+ 160
- 0
consume/zhios_user_relate.go Переглянути файл

@@ -0,0 +1,160 @@
package consume

import (
"applet/app/db"
"applet/app/db/model"
"applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"strings"
"time"
"xorm.io/xorm"
)

func ZhiosUserRelate(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
err = handleZhiosUserRelate(res.Body)
//_ = res.Reject(false)
if err != nil {
_ = res.Reject(false)
//TODO::重新推回队列末尾,避免造成队列堵塞
var msg *md.ZhiosOrderBuckle
var tmpString string
err := json.Unmarshal(res.Body, &tmpString)
if err != nil {
return
}
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &msg)
if err != nil {
return
}
ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey)
} else {
_ = res.Ack(true)
}
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleZhiosUserRelate(msg []byte) error {
time.Sleep(time.Microsecond * 20) // 等待500毫秒
//1、解析canal采集至mq中queue的数据结构体
var canalMsg *md.ZhiosOrderBuckle
fmt.Println(string(msg))
var tmpString string
err := json.Unmarshal(msg, &tmpString)
if err != nil {
fmt.Println("===with", err.Error())
return err
}
fmt.Println(tmpString)
err = json.Unmarshal([]byte(tmpString), &canalMsg)
if err != nil {
fmt.Println("===with", err.Error())
return err
}
mid := canalMsg.Mid
eg := db.DBs[mid]
if eg == nil {
return nil
}
profile, err := db.UserProfileFindByID(eg, canalMsg.Uid)
if err != nil || profile == nil {

return nil
}
if profile.ParentUid > 0 {
ur := new(model.UserRelate)
//如果有上级要加入关系链
initLV := 1
ur.ParentUid = profile.ParentUid
ur.Uid = profile.Uid
ur.Level = initLV
ur.InviteTime = time.Now()
_, err = db.UserRelateInsert(eg, ur)

if err != nil && strings.Contains(err.Error(), "Duplicate") == false {
return err
}
// 插入多级关联
RoutineMultiRelate1(eg, ur.ParentUid, ur.Uid, initLV)

}
return nil
}

//RoutineMultiRelate is 多级关联
func RoutineMultiRelate1(eg *xorm.Engine, pid int, uid int, lv int) {

for {
if pid == 0 {
break
}
m, err := db.UserProfileFindByID(eg, pid)
if err != nil {
logx.Warn(err)
break
}
if m != nil {
if m.ParentUid == 0 {
break
}
lv++
ur := new(model.UserRelate)
ur.ParentUid = m.ParentUid
ur.Uid = uid
ur.Level = lv
ur.InviteTime = time.Now()
_, err := db.UserRelateInsert(eg, ur)
if err != nil && strings.Contains(err.Error(), "Duplicate") == false {
logx.Warn(err)
break
}
if err != nil && strings.Contains(err.Error(), "Duplicate") {
tmp, _, _ := db.UserRelateByUIDAndPUID(eg, ur.Uid, ur.ParentUid)
if tmp != nil && tmp.Level != ur.Level {
db.UserRelateUpdate(eg, ur)
}
}
// 还要关联当前的用户的所有下级,注意关联等级
//go RoutineInsertUserRelate(c, m.ParentUid, uid, lv)
// 下级关联上上级
// 继续查询
logx.Info(fmt.Sprintf("关联pid(%v) -> uid(%v),lv:%v", ur.ParentUid, ur.Uid, lv))
logx.Info("继续查询")
pid = m.ParentUid
}
if m == nil {
logx.Info("查询结束,退出")
break
}
}
}

+ 1
- 1
go.mod Переглянути файл

@@ -6,7 +6,7 @@ require (
code.fnuoos.com/go_rely_warehouse/zyos_go_condition_statistics.git v1.1.2-0.20240222023917-c31b53f7e8cb
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.0
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.4
code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240119104238-05c3962029ff
code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git v1.9.10-0.20240301103805-f71bf8ac0ab3
code.fnuoos.com/go_rely_warehouse/zyos_go_pay.git v1.6.2-0.20231116085701-9ba6e19f877b
code.fnuoos.com/go_rely_warehouse/zyos_go_third_party_api.git v1.1.21-0.20240126015516-38ca248db2fd
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5


Завантаження…
Відмінити
Зберегти