package consume import ( "applet/app/cfg" "applet/app/db" utils2 "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" "code.fnuoos.com/EggPlanet/egg_models.git/src/model" "code.fnuoos.com/EggPlanet/egg_system_rules.git" "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy" "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum" md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/svc" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "encoding/json" "errors" "fmt" "github.com/shopspring/decimal" "github.com/streadway/amqp" "time" ) func EggEnergyDealFundDataConsume(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>EggEnergyDealFundDataConsume>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 ch.Qos(1) delivery := ch.Consume(queue.Name, false) egg_system_rules.Init(cfg.RedisAddr) var res amqp.Delivery var ok bool for { res, ok = <-delivery if ok == true { err = handleEggEnergyDealFundDataConsume(res.Body) if err != nil { fmt.Println("EggEnergyDealFundDataConsume_ERR:::::", err.Error()) utils2.FilePutContents("EggEnergyDealFundDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{ "body": res.Body, "err": err.Error(), })) } //_ = res.Reject(false) err = res.Ack(true) fmt.Println("err ::: ", err) } else { panic(errors.New("error getting message")) } } } func handleEggEnergyDealFundDataConsume(msgData []byte) error { time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 // 1.解析mq中queue的数据结构体 var msgStr string err := json.Unmarshal(msgData, &msgStr) if err != nil { return err } var msg *md2.EggEnergyStructForEggEnergyFundData err = json.Unmarshal([]byte(msgStr), &msg) if err != nil { return err } now := time.Now() engine := db.Db session := engine.NewSession() session.Begin() // 2. 获取 fund_data 数据 fundDataDb := implement.NewEggEnergyFundDataDb(engine) fundDataRecordsDb := implement.NewEggEnergyFundDataRecordsDb(engine) data, err := fundDataDb.EggEnergyFundDataGetOneByParams(map[string]interface{}{ "key": "id", "value": msg.ID, }) if err != nil { return err } // 3. 获取核心数据 eggEnergyCoreData, cb, err2 := svc.GetEggEnergyCoreData(engine) if err2 != nil { return err2 } if cb != nil { defer cb() // 释放锁 } // 4. 计算涨价公式 err3, calcPriceIncreaseFormulaResp := egg_energy.CalcPriceIncreaseFormula(utils2.AnyToString(msg.Amount), eggEnergyCoreData) if err3 != nil { _ = session.Rollback() session.Close() return err3 } // 5. 处理可用能量 dealAvailableEggEnergyCoinReq := md2.DealAvailableEggEnergyCoinReq{ Amount: calcPriceIncreaseFormulaResp.GetEggEnergyNums, AmountFee: "", BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, BeforeEnergyTotalNums: calcPriceIncreaseFormulaResp.BeforeEnergyTotalNums, AfterEnergyTotalNums: calcPriceIncreaseFormulaResp.AfterEnergyTotalNums, } err4 := egg_energy.DealAvailableEggEnergyCoin(session, int(enum.CapitalInjection), eggEnergyCoreData, dealAvailableEggEnergyCoinReq) if err4 != nil { fmt.Println("ActivityCoinAutoExchangeEggPersonEnergy:::::err111:::", err4) _ = session.Rollback() session.Close() return err4 } // 6. 计算余额 balanceAmount := decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount)). Sub(decimal.NewFromFloat(utils2.AnyToFloat64(data.BalanceAmount))). String() // 7. 插入资金数据详细数据 record := model.EggEnergyFundDataRecords{ RecordsId: data.Id, TotalAmount: data.TotalAmount, BalanceAmount: balanceAmount, BalanceTimes: data.BalanceTimes - 1, BeforePrice: calcPriceIncreaseFormulaResp.BeforePrice, AfterPrice: calcPriceIncreaseFormulaResp.AfterPrice, BeforePlanetTotalValue: calcPriceIncreaseFormulaResp.BeforePlanetTotalValue, AfterPlanetTotalValue: calcPriceIncreaseFormulaResp.AfterPlanetTotalValue, CreateAt: now.Format("2006-01-02 15:04:05"), UpdateAt: now.Format("2006-01-02 15:04:05"), } _, err5 := fundDataRecordsDb.EggEnergyFundDataRecordsInsertBySession(session, record) if err5 != nil { _ = session.Rollback() return err5 } // 8. 更新当前数据 fundData := model.EggEnergyFundData{ Id: data.Id, Kind: data.Kind, TotalAmount: data.TotalAmount, BalanceAmount: balanceAmount, Hours: data.Hours, BalanceTimes: data.BalanceTimes - 1, Frequency: data.Frequency, Memo: data.Memo, CreateAt: data.CreateAt, UpdateAt: now.Format("2006-01-02 15:04:05"), } forceColumns := []string{ "balance_times", "balance_amount", } _, err6 := fundDataDb.EggEnergyFundDataUpdateBySession(session, fundData, forceColumns...) if err6 != nil { _ = session.Rollback() return err6 } return session.Commit() }