diff --git a/consume/init.go b/consume/init.go index 2ba1f10..965738f 100644 --- a/consume/init.go +++ b/consume/init.go @@ -17,6 +17,8 @@ func Init() { // 增加消费任务队列 func initConsumes() { + + jobs[consumeMd.ZhiosOrderHjyFunName] = ZhiosOrderHjy jobs[consumeMd.ZhiosOrderBuckleFunName] = ZhiosOrderBuckle // jobs[consumeMd.ZhiosSupplierAfterOrderFunName] = ZhiosSupplierAfterOrder diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 4364743..1c8cbca 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -281,6 +281,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "ZhiosOrderBuckle", }, + { + ExchangeName: "zhios.order_hjy.exchange", + Name: "zhios_order_hjy", + Type: DirectQueueType, + IsPersistent: false, + RoutKey: "order_hjy", + BindKey: "", + ConsumeFunName: "ZhiosOrderHjy", + }, //{ // ExchangeName: "zhios.order_buckle.exchange", // Name: "zhios_order_buckle_dev", @@ -293,6 +302,7 @@ var RabbitMqQueueKeyList = []*MqQueue{ } const ( + ZhiosOrderHjyFunName = "ZhiosOrderHjy" ZhiosOrderBuckleFunName = "ZhiosOrderBuckle" ZhiosSupplierAfterOrderFunName = "ZhiosSupplierAfterOrder" CanalOrderConsumeFunName = "CanalOrderConsume" diff --git a/consume/md/md_hjy.go b/consume/md/md_hjy.go new file mode 100644 index 0000000..c73d624 --- /dev/null +++ b/consume/md/md_hjy.go @@ -0,0 +1,17 @@ +package md + +type HjyOrderCommisisonData struct { + Time int `json:"time"` + Msg string `json:"msg"` + Code int `json:"code"` + Data struct { + Total string `json:"total"` + List []HjyOrderCommisison `json:"list"` + } `json:"data"` +} + +type HjyOrderCommisison struct { + SettlementStatus string `json:"settlement_status"` + SettlementTime string `json:"settlement_time"` + OrderId string `json:"order_id"` +} diff --git a/consume/zhios_order_hjy.go b/consume/zhios_order_hjy.go new file mode 100644 index 0000000..a18d09b --- /dev/null +++ b/consume/zhios_order_hjy.go @@ -0,0 +1,354 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + md2 "applet/app/md" + "applet/app/utils" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/lib/comm_plan" + md3 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/svc" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "github.com/syyongx/php2go" + "sort" + "time" + "xorm.io/xorm" +) + +func ZhiosOrderHjy(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name, false) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") + err = handleZhiosOrderHjy(res.Body) + //_ = res.Reject(false) + if err != nil { + _ = res.Reject(false) + //TODO::重新推回队列末尾,避免造成队列堵塞 + var msg *md.ZhiosOrderBuckle + var tmpString string + err := json.Unmarshal(res.Body, &tmpString) + if err != nil { + return + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &msg) + if err != nil { + return + } + ch.Publish(queue.ExchangeName, utils.SerializeStr(msg), queue.RoutKey) + } else { + _ = res.Ack(true) + } + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} +func GetHjyOrderCommission(eg *xorm.Engine, dbName, order_id string) (string, string) { + url := "https://app.openapi.dhcc.wang" + hjyAppkey := db.SysCfgGetWithDb(eg, dbName, "hjy_appkey") + hjyAppSecret := db.SysCfgGetWithDb(eg, dbName, "hjy_app_secret") + param := map[string]string{ + "app_key": hjyAppkey, + "time": utils.Int64ToStr(time.Now().Unix()), + "order_id": order_id, + "page": "1", + "api_name": "dhcc.oauth.order.commission", + "version": "v1", + "pagesize": "1", + } + keys := KsortToStr(param) + str := "" + for _, v := range keys { + if str == "" { + str += v + "=" + php2go.URLEncode(param[v]) + } else { + str += "&" + v + "=" + php2go.URLEncode(param[v]) + } + } + str += hjyAppSecret + param["sign"] = utils.Md5(str) + post, _ := utils.CurlPost(url, param, nil) + var data md.HjyOrderCommisisonData + json.Unmarshal(post, &data) + if data.Code != 0 { + return "0", "0" + } + for _, v := range data.Data.List { + if v.SettlementStatus == "已结" { + return "1", v.SettlementTime + } + } + return "0", "" +} +func KsortToStr(params map[string]string) []string { + keys := make([]string, len(params)) + i := 0 + for k, _ := range params { + keys[i] = k + i++ + } + sort.Strings(keys) + return keys +} +func handleZhiosOrderHjy(msg []byte) error { + //1、解析canal采集至mq中queue的数据结构体 + var canalMsg *md.ZhiosOrderBuckle + fmt.Println(string(msg)) + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + fmt.Println(tmpString) + err = json.Unmarshal([]byte(tmpString), &canalMsg) + if err != nil { + fmt.Println("===with", err.Error()) + return err + } + mid := canalMsg.Mid + eg := db.DBs[mid] + if eg == nil { + return nil + } + sess := eg.NewSession() + defer sess.Close() + sess.Begin() + + ordData, err := db.OrderListByPvdOid(sess, canalMsg.Oid) + if err != nil || ordData == nil { + return nil + } + if ordData.OrderFormType < 0 || ordData.State > 3 { + sess.Commit() + + return nil + } + user, _ := db.UserFindByID(eg, ordData.Uid) + if user == nil { + sess.Commit() + + return nil + } + if ordData.OrderFormType < 0 { + sess.Commit() + return nil + } + if ordData.State == 5 { + sess.Commit() + return nil + } + isSettle, settleTime := GetHjyOrderCommission(eg, canalMsg.Mid, utils.Int64ToStr(ordData.OrdId)) + if isSettle == "1" { + ordData.State = 5 + ordData.SettleAt = int(utils.TimeStdParseUnix(settleTime)) + } + fmt.Println(ordData) + //加入分佣关系链 + opts, commissionOpts, _ := svc.GetAllPlan(eg, canalMsg.Mid) + if opts == nil { + sess.Commit() + + return nil + } + BenefitAll := ordData.BenefitAll + pvd := ordData.Pvd + if ordData.OrderFormType == 1 { //京东自营 + pvd = md3.PVD_JDOwn + } + var rmd = md3.CommissionParam{IsTikTokTeamOrder: utils.IntToStr(ordData.IsTikTokTeamOrder)} + opt, err := svc.GetPlanCfg(eg, pvd, canalMsg.Mid, opts, commissionOpts, map[int]string{}, &rmd) + if err != nil { + sess.Commit() + return nil + } + + var ord = md2.OrderInfo{ + PvdOid: ordData.PvdOid, + Pvd: pvd, + ItemId: ordData.ItemId, + ItemNum: ordData.ItemNum, + ItemPrice: ordData.ItemPrice, + PaidPrice: ordData.PaidPrice, + OrderType: ordData.OrderType, + Commission: BenefitAll, + State: ordData.State, + } + //调用公共的分佣 + isShare := 0 + if ord.OrderType == 1 { + isShare = 1 + } + //计算每个用户的分佣 + req := md2.CommissionFirstParam{ + CommissionParam: md2.CommissionParam{ + Commission: utils.Float64ToStr(ord.Commission), + PaidPrice: utils.Float64ToStr(ord.PaidPrice), + OldPrice: utils.Float64ToStr(ord.PaidPrice), + IsTikTokTeamOrder: utils.IntToStr(ordData.IsTikTokTeamOrder), + }, + Uid: utils.IntToStr(ordData.Uid), + IsShare: isShare, + Provider: ord.Pvd, + IsAllLevelReturn: 1, + } + if utils.InArr(ord.Pvd, []string{md3.PVD_TB, md3.PVD_TM, md3.PVD_PDD, md3.PVD_SN, md3.PVD_KL, md3.PVD_JD, md3.PVD_JDOwn, md3.PVD_VIP}) == false { + req.CommissionParam.GoodsPrice = utils.Float64ToStr(ord.PaidPrice) + } + commissionList, err := GetCommissionByCommApi(eg, canalMsg.Mid, req) + if err != nil || commissionList.LvUser == nil { + sess.Commit() + return nil + } + pvdFee := commissionList.PvdFee + sysFee := commissionList.SysFee + subsidyFee := commissionList.SubsidyFee + lvUser := commissionList.LvUser + profit := commissionList.Profit + ordData.SubsidyFee = subsidyFee + ordData.PvdCommission = pvdFee + ordData.SysCommission = sysFee + ordData.BuckleCommission = utils.Float64ToStr(BenefitAll) + if ordData.OrderFormType == 2 { + ordData.OrderFormType = -2 + } else { + ordData.OrderFormType = -1 + + } + var selfRate float64 = 0 + var subsidyRate float64 = 0 + //处理记录佣金 + if opt != nil { + profit = lvUser.Profit + //判断下这个等级有么有设置 + ordData.UserCommission = profit + ordData.SubsidyRate = subsidyRate + ordData.UserCommissionRate = selfRate + ordData.PlanCommissionId = opt.PlanCommissionId + ordData.BenefitList = SerializeLvUser(NewCalcLvUserFee(lvUser)) + } + has, _ := db.OrderListByUpdateOrd(sess, ordData) + if has == false { + sess.Rollback() + return nil + } + + //批量写入 + if lvUser != nil { + err := OrderRelateInsert1(eg, sess, ordData.OrdId, ordData.Pvd, ordData.CreateAt, lvUser, ordData, canalMsg.Mid, true, opt.Mode, "1") + if err != nil { + sess.Rollback() + return err + } + } + sess.Commit() + return nil +} + +func OrderRelateInsert1(eg *xorm.Engine, sess *xorm.Session, oid int64, pvd string, createTime int, lvUser *comm_plan.LvUser, newOrd *model.OrdList, masterId string, isDelete bool, mode string, isNew string) error { + if lvUser == nil { + return nil + } + uid := lvUser.Uid + if uid == 0 { + return nil + } + oldLvUser := lvUser + oldLevel := 0 + data := OrderRelateInsertComm(eg, oid, pvd, createTime, lvUser, newOrd, masterId, mode) + fmt.Println(data) + + if data == nil || len(data) == 0 { + return nil + } + list, _ := db.OrderRelateFindByOid(sess, oid, pvd) + listMap := make(map[int]model.OrdListRelate) + if list != nil { + for _, v := range *list { + listMap[v.Uid] = v + } + } + fmt.Println(isDelete) + _, err2 := db.OrderRelateDeleteByOid(sess, oid, pvd) + if err2 != nil { + return err2 + } + _, err2 = db.VirtualCoinOrderRelateDeleteByOid(sess, oid, pvd) + if err2 != nil { + return err2 + } + //后写入 + err := db.DbInsertBatchSess(sess, data) + if err != nil { + return err + + } else if lvUser.ProfitList != nil { + // 插入虚拟币数据 + vcrData := CombineVirtualCoinRelateData(oldLvUser, oid, pvd, oldLevel, mode) + if len(vcrData) == 0 { + return nil + } + err := db.DbInsertBatchSess(sess, vcrData) + if err != nil { + return err + + } + } + if lvUser.TikTokOwnSubsidyFeeList != nil { + _, err2 := db.TikTokTeamOrderRelateDeleteByOid(sess, oid, pvd) + if err != nil { + return err2 + } + var teamData []model.TikTokTeamOrderRelate + TikTokTeamCommission := newOrd.TikTokTeamCommission + if utils.StrToFloat64(newOrd.TikTokTeamRealCommission) > 0 { + TikTokTeamCommission = newOrd.TikTokTeamRealCommission + } + for k, v := range lvUser.TikTokOwnSubsidyFeeList { + tmp := model.TikTokTeamOrderRelate{ + CoinId: utils.StrToInt(k), + Uid: uid, + Amount: utils.Float64ToStrByPrec(v, 8), + Oid: utils.Int64ToStr(oid), + Time: time.Now(), + Commission: TikTokTeamCommission, + Pvd: pvd, + } + teamData = append(teamData, tmp) + } + if len(teamData) == 0 { + return nil + } + err := db.DbInsertBatchSess(sess, &teamData) + if err != nil { + return err + + } + } + return nil +}