@@ -0,0 +1,16 @@ | |||
package db | |||
import ( | |||
"applet/app/db/model" | |||
"xorm.io/xorm" | |||
) | |||
// | |||
func TaskCenterBase(Db *xorm.Engine) (*model.OneOrengeTaskBase, error) { | |||
var PineappleTaskBase model.OneOrengeTaskBase | |||
has, err := Db.Get(&PineappleTaskBase) | |||
if has == false || err != nil { | |||
return nil, err | |||
} | |||
return &PineappleTaskBase, nil | |||
} |
@@ -119,6 +119,15 @@ func VirtualCoinListInUse(Db *xorm.Engine, masterId, isFreeze string) ([]*model. | |||
return m, nil | |||
} | |||
func VirtualCoinListInUseSess(sess *xorm.Session, masterId, isFreeze string) ([]*model.VirtualCoin, error) { | |||
var m []*model.VirtualCoin | |||
err := sess.Where("is_use=1").Asc("id").Find(&m) | |||
if err != nil { | |||
return nil, err | |||
} | |||
return m, nil | |||
} | |||
func VirtualCoinMapInUse(Db *xorm.Engine, masterId, isFreeze string) (map[string]model.VirtualCoin, error) { | |||
virtualCoinMap := make(map[string]model.VirtualCoin) | |||
@@ -131,6 +140,17 @@ func VirtualCoinMapInUse(Db *xorm.Engine, masterId, isFreeze string) (map[string | |||
} | |||
return virtualCoinMap, nil | |||
} | |||
func VirtualCoinMapInUseSess(sess *xorm.Session, masterId, isFreeze string) (map[string]model.VirtualCoin, error) { | |||
virtualCoinMap := make(map[string]model.VirtualCoin) | |||
listInUse, err := VirtualCoinListInUseSess(sess, masterId, isFreeze) | |||
if err != nil { | |||
return nil, err | |||
} | |||
for _, coin := range listInUse { | |||
virtualCoinMap[utils.AnyToString(coin.Id)] = *coin | |||
} | |||
return virtualCoinMap, nil | |||
} | |||
func VirtualCoinByIds(eg *xorm.Engine, ids []string) map[string]model.VirtualCoin { | |||
var data []model.VirtualCoin | |||
@@ -0,0 +1,15 @@ | |||
package model | |||
type OneOrengeTaskBase struct { | |||
Id int `json:"id" xorm:"not null pk autoincr INT(11)"` | |||
CoinId int `json:"coin_id" xorm:"default 0 INT(11)"` | |||
SignCondition int `json:"sign_condition" xorm:"default 0 comment('0无条件 1看完视频') INT(1)"` | |||
SignWayType int `json:"sign_way_type" xorm:"default 0 comment('签到玩法 0连续签到 1累计签到') INT(1)"` | |||
SignWay string `json:"sign_way" xorm:"default '' comment('签到玩法') VARCHAR(255)"` | |||
MustSign int `json:"must_sign" xorm:"default 0 comment('强制签到') INT(1)"` | |||
WithdrawClearDay int `json:"withdraw_clear_day" xorm:"default 0 comment('收益清空 未进入app天数') INT(11)"` | |||
BaseSignReward string `json:"base_sign_reward" xorm:"default 0.00 comment('签到基础奖励') DECIMAL(20,2)"` | |||
AdvId int `json:"adv_id" xorm:"default 0 INT(11)"` | |||
VideoTotal int `json:"video_total" xorm:"default 0 INT(11)"` | |||
VideoReward string `json:"video_reward" xorm:"default 0.00 comment('签到基础奖励') DECIMAL(20,2)"` | |||
} |
@@ -74,3 +74,69 @@ func virtualCoinFlowInsert(session *xorm.Session, uid, coinId, coinIdTo int, mon | |||
} | |||
return data.Id, nil | |||
} | |||
func UpdateUserFinValidAndInterFlowWithSession(session *xorm.Session, money, Title, ordType string, types, orderAction, uid, id int, ordId, otherId int64) error { | |||
if utils.StrToFloat64(money) <= 0 { | |||
return nil | |||
} | |||
userProfile, err := db.UserProfileFindByIdWithSession(session, uid) | |||
if err != nil || userProfile == nil { | |||
_ = session.Rollback() | |||
if err == nil { | |||
err = errors.New("获取用户余额信息失败") | |||
} | |||
return err | |||
} | |||
beforeAmount := userProfile.FinValid | |||
if types == 0 { | |||
userProfile.FinValid = utils.AnyToString(utils.AnyToFloat64(userProfile.FinValid) + utils.StrToFloat64(money)) | |||
} else if types == 1 { | |||
userProfile.FinValid = utils.AnyToString(utils.AnyToFloat64(userProfile.FinValid) - utils.StrToFloat64(money)) | |||
} | |||
afterAmount := userProfile.FinValid | |||
userProfile.FinTotal = userProfile.FinTotal + utils.StrToFloat32(money) | |||
affected, err := db.UserProfileUpdateWithSession(session, uid, userProfile, "fin_valid,fin_total") | |||
if err != nil || affected == 0 { | |||
if err == nil { | |||
err = errors.New("更新用户余额信息失败") | |||
} | |||
return err | |||
} | |||
err = flowInsert(session, uid, money, orderAction, ordId, otherId, id, Title, ordType, types, beforeAmount, afterAmount) | |||
if err != nil { | |||
return err | |||
} | |||
return nil | |||
} | |||
// 开始写入流水 | |||
//uid:用户id,paidPrice金额,ordId订单id,id其他关联订单,具体根据订单类型判断,goodsId(OrdDetail记录商品ID或提现账号),ItemTitle订单标题 | |||
//type:0收入,1支出,beforeAmount变更前金额,afterAmount变更后 | |||
//orderAction:10自购,11推广,12团队,13免单,20提现,21消费,22退款,23拼团返佣,24资金池分红 | |||
//ordType:订单类型taobao,jd,pdd,vip,suning,kaola,own自营,withdraw提现,vip_refund会员升级退款,vip_order会员升级余额支付流水,group_buy拼团 | |||
func flowInsert(session *xorm.Session, uid int, paidPrice string, orderAction int, ordId int64, id int64, goodsId int, ItemTitle string, ordType string, types int, beforeAmount string, afterAmount string) error { | |||
now := time.Now() | |||
if err := db.FinUserFlowInsertOneWithSession( | |||
session, | |||
&model.FinUserFlow{ | |||
Type: types, | |||
Uid: uid, | |||
Amount: paidPrice, | |||
BeforeAmount: beforeAmount, | |||
AfterAmount: afterAmount, | |||
OrdType: ordType, | |||
OrdId: utils.Int64ToStr(ordId), | |||
OrdAction: orderAction, | |||
OrdDetail: utils.IntToStr(goodsId), | |||
State: 2, | |||
OtherId: id, | |||
OrdTitle: ItemTitle, | |||
OrdTime: int(now.Unix()), | |||
CreateAt: now, | |||
UpdateAt: now, | |||
}); err != nil { | |||
_ = logx.Warn(err) | |||
return err | |||
} | |||
return nil | |||
} |
@@ -88,7 +88,8 @@ func initConsumes() { | |||
//jobs[consumeMd.ZhiosMallGreenCoinConsumeFunName] = ZhiosMallGreenCoinConsume //绿色双链积分 | |||
//一个橘子 | |||
jobs[consumeMd.CancalUserMoneyConsumeFunName] = CancalUserMoneyConsume //余额 | |||
jobs[consumeMd.CancalUserMoneyConsumeFunName] = CancalUserMoneyConsume //余额 | |||
jobs[consumeMd.CancalUserIntegralExchangeConsumeFunName] = CancalUserIntegralExchange //兑换 | |||
} | |||
func Run() { | |||
@@ -434,6 +434,15 @@ var RabbitMqQueueKeyList = []*MqQueue{ | |||
BindKey: "", | |||
ConsumeFunName: "WithdrawConsume", | |||
}, | |||
{ | |||
ExchangeName: "zhios.one.orenge.exchange", | |||
Name: "zhios_one_orenge_exchange", | |||
Type: FanOutQueueType, | |||
IsPersistent: false, | |||
RoutKey: "integral_exchange", | |||
BindKey: "", | |||
ConsumeFunName: "CancalUserIntegralExchange", | |||
}, | |||
{ | |||
ExchangeName: "canal.topic", // | |||
Name: "canal_fin_user_flow", | |||
@@ -495,5 +504,6 @@ const ( | |||
OneCirclesSignInCopyGreenEnergyFunName = "OneCirclesSignInCopyGreenEnergyConsume" | |||
WithdrawConsumeFunName = "WithdrawConsume" | |||
CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" | |||
CancalUserMoneyConsumeFunName = "CancalUserMoneyConsume" | |||
CancalUserIntegralExchangeConsumeFunName = "CancalUserIntegralExchange" | |||
) |
@@ -0,0 +1,155 @@ | |||
package consume | |||
import ( | |||
"applet/app/db" | |||
"applet/app/e" | |||
"applet/app/svc" | |||
"applet/app/utils" | |||
"applet/app/utils/logx" | |||
"applet/consume/md" | |||
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" | |||
db2 "code.fnuoos.com/go_rely_warehouse/zyos_go_order_relate_rule.git/db" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"github.com/streadway/amqp" | |||
"xorm.io/xorm" | |||
) | |||
// | |||
func CancalUserIntegralExchange(queue md.MqQueue) { | |||
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") | |||
ch, err := rabbit.Cfg.Pool.GetChannel() | |||
if err != nil { | |||
logx.Error(err) | |||
return | |||
} | |||
defer ch.Release() | |||
//1、将自己绑定到交换机上 | |||
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) | |||
//2、取出数据进行消费 | |||
ch.Qos(1) | |||
delivery := ch.Consume(queue.Name, false) | |||
var res amqp.Delivery | |||
var ok bool | |||
for { | |||
res, ok = <-delivery | |||
if ok == true { | |||
//fmt.Println(string(res.Body)) | |||
fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<") | |||
err = handleCancalUserIntegralExchange(res.Body) | |||
//_ = res.Reject(false) | |||
fmt.Println(err) | |||
if err == nil { | |||
_ = res.Ack(true) | |||
} else { | |||
var canalMsg *md.ZhiosAcquisition | |||
var tmpString string | |||
err := json.Unmarshal(res.Body, &tmpString) | |||
if err == nil { | |||
fmt.Println(tmpString) | |||
err = json.Unmarshal([]byte(tmpString), &canalMsg) | |||
if err == nil { | |||
ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey) | |||
} | |||
} | |||
} | |||
} else { | |||
panic(errors.New("error getting message")) | |||
} | |||
} | |||
fmt.Println("get msg done") | |||
} | |||
func handleCancalUserIntegralExchange(msg []byte) error { | |||
//1、解析canal采集至mq中queue的数据结构体 | |||
var canalMsg *md.ZhiosAcquisition | |||
fmt.Println(string(msg)) | |||
var tmpString string | |||
err := json.Unmarshal(msg, &tmpString) | |||
if err != nil { | |||
fmt.Println(err.Error()) | |||
return err | |||
} | |||
fmt.Println(tmpString) | |||
err = json.Unmarshal([]byte(tmpString), &canalMsg) | |||
if err != nil { | |||
return err | |||
} | |||
mid := canalMsg.Mid | |||
eg := db.DBs[mid] | |||
if eg == nil { | |||
return nil | |||
} | |||
uid := canalMsg.Uid | |||
base, _ := db.TaskCenterBase(eg) | |||
if base == nil { | |||
return nil | |||
} | |||
sess := eg.NewSession() | |||
defer sess.Close() | |||
sess.Begin() | |||
amount, _ := db.UserVirtualAmountFindById(sess, utils.StrToInt(uid), base.CoinId) | |||
if amount != nil && utils.StrToFloat64(amount.Amount) > 0 { | |||
err := CoinExchange(eg, sess, utils.StrToInt(uid), utils.IntToStr(base.CoinId), "cny", amount.Amount, mid) | |||
if err != nil { | |||
sess.Rollback() | |||
return err | |||
} | |||
} | |||
sess.Commit() | |||
return nil | |||
} | |||
func CoinExchange(eg *xorm.Engine, sess *xorm.Session, uid int, coinId, coinIdTo, amount, masterId string) error { | |||
//虚拟币转换 | |||
//获取目前可以兑换的虚拟币 | |||
coinMapInUse, err := db.VirtualCoinMapInUseSess(sess, masterId, "") | |||
if err != nil { | |||
return nil | |||
} | |||
fromCoin, ok := coinMapInUse[coinId] | |||
toCoin, ok2 := coinMapInUse[coinIdTo] | |||
//判断兑换的虚拟币是否可以兑换 | |||
if !ok || (!ok2 && coinIdTo != "cny") { | |||
return nil | |||
} | |||
//被兑换虚拟币金额比例,兑换虚拟币金额比例,手续费比例 | |||
var ( | |||
fromRate, toRate float64 | |||
) | |||
//获取用户两个虚拟币的余额数据 | |||
fromWallet, err := db2.GetUserVirtualWalletWithSession(sess, uid, utils.StrToInt(coinId)) | |||
if err != nil { | |||
return nil | |||
} | |||
amount = fromWallet.Amount | |||
fromRate = utils.AnyToFloat64(fromCoin.ExchangeRatio) | |||
toRate = utils.AnyToFloat64(toCoin.ExchangeRatio) | |||
if coinIdTo == "cny" { | |||
toRate = 1 | |||
} | |||
//兑换比例 | |||
rate := fromRate / toRate | |||
//计算兑换的总值 | |||
toValue := utils.AnyToFloat64(amount) / rate | |||
toAmount := toValue | |||
//先扣 | |||
title := coinMapInUse[coinId].Name + "兑换" + coinMapInUse[coinIdTo].Name | |||
_, err = svc.ExchangeUserVirFinValidAndInterFlowWithSession(sess, | |||
utils.StrToFloat64(amount), title, "0", 2, 5, uid, utils.StrToInt(coinId), utils.StrToInt(coinIdTo), -1, "", 0, 0) | |||
if err != nil { | |||
return e.NewErrCode(e.ERR_BAD_REQUEST) | |||
} | |||
//再给 | |||
err = svc.UpdateUserFinValidAndInterFlowWithSession(sess, | |||
utils.Float64ToStr(toAmount), title, "money_exchange", 0, 35, uid, 0, utils.StrToInt64(coinId), 0) | |||
if err != nil { | |||
return err | |||
} | |||
return nil | |||
} |