|
- package consume
-
- import (
- "applet/app/cfg"
- "applet/app/db"
- utils2 "applet/app/utils"
- "applet/app/utils/logx"
- "applet/consume/md"
- "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
- "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
- "code.fnuoos.com/EggPlanet/egg_system_rules.git"
- "code.fnuoos.com/EggPlanet/egg_system_rules.git/enum"
- md3 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md"
- "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule"
- md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
- "code.fnuoos.com/EggPlanet/egg_system_rules.git/svc/sys_cfg"
- "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/go-pay/gopay"
- "github.com/go-pay/gopay/alipay"
- "github.com/go-pay/xlog"
- "github.com/streadway/amqp"
- "time"
- )
-
- func EggFinWithdrawApplyDataConsume(queue md.MqQueue) {
- fmt.Println(">>>>>>>>>>>>EggFinWithdrawApplyDataConsume>>>>>>>>>>>>")
- ch, err := rabbit.Cfg.Pool.GetChannel()
- if err != nil {
- logx.Error(err)
- return
- }
- defer ch.Release()
- //1、将自己绑定到交换机上
- ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
- //2、取出数据进行消费
- ch.Qos(1)
- delivery := ch.Consume(queue.Name, false)
- egg_system_rules.Init(cfg.RedisAddr)
- var alipayStruct *InitAlipayStruct
- client, err := InitAlipay(alipayStruct)
- if err != nil {
- fmt.Println("EggFinWithdrawApplyDataConsume:::::", err.Error())
- utils2.FilePutContents("EggFinWithdrawApplyDataConsume", utils2.SerializeStr(map[string]interface{}{
- "err": err.Error(),
- }))
- return
- }
- var res amqp.Delivery
- var ok bool
- for {
- res, ok = <-delivery
- if ok == true {
- err = handleEggFinWithdrawApplyDataConsume(res.Body, client, ch)
- if err != nil {
- fmt.Println("EggFinWithdrawApplyDataConsume:::::", err.Error())
- utils2.FilePutContents("EggFinWithdrawApplyDataConsume", utils2.SerializeStr(map[string]interface{}{
- "body": res.Body,
- "err": err.Error(),
- }))
-
- var data md2.EggFinWithdrawApplyErrorData
- data.ErrorInfo = err.Error()
- // 尝试解析数据结构体
- var msg *md2.EggFinWithdrawApplyData
- err1 := json.Unmarshal(res.Body, &msg)
- if err1 != nil {
- // 无法解析 直接将[]byte推进队列
- data.Ext = res.Body
- } else {
- // 解析成功
- data.EggFinWithdrawApplyData = *msg
- }
- ch.Publish(md2.EggAppExchange, data, md2.EggFinWithdrawApplyError)
- }
- //_ = res.Reject(false)
- err = res.Ack(true)
- fmt.Println("err ::: ", err)
- } else {
- panic(errors.New("error getting message"))
- }
- }
- }
-
- func handleEggFinWithdrawApplyDataConsume(msgData []byte, client *alipay.Client, ch *rabbit.Channel) error {
- time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
- // 1.解析mq中queue的数据结构体
- var msg *md2.EggFinWithdrawApplyData
- err := json.Unmarshal(msgData, &msg)
- if err != nil {
- return err
- }
-
- applyDb := implement.NewFinWithdrawApplyDb(db.Db)
- apply, err := applyDb.FinWithdrawApplyGet(msg.Id)
- if err != nil {
- return err
- }
- if apply == nil {
- return errors.New("提现记录不存在~")
- }
-
- // 如果不在队列处理阶段或不是自动处理订单,直接返回
- if apply.State != 4 && apply.Type != 2 {
- return errors.New("提现记录无需处理~")
- }
-
- session := db.Db.NewSession()
- defer func() {
- session.Close()
- if err := recover(); err != nil {
- _ = logx.Error(err)
- }
- }()
- session.Begin()
- // 2.3 更新 apply 信息
- apply.State = 2
- affected, err2 := applyDb.UpdateFinWithdrawApplyBySession(session, apply, "state")
- if err2 != nil {
- return err2
- }
- if affected == 0 {
- return errors.New("更新 apply 状态失败")
- }
-
- if msg.WithdrawKind == 1 {
- // 3.支付宝提现
- alipayUserInfoDb := implement.NewAlipayUserInfoDb(db.Db)
- userInfo, err1 := alipayUserInfoDb.GetAlipayUserInfo(msg.Uid)
- if err != nil {
- return err1
- }
- bm := make(gopay.BodyMap)
- bm.Set("out_biz_no", msg.Id).
- Set("trans_amount", msg.Amount).
- Set("biz_scene", "DIRECT_TRANSFER").
- Set("product_code", "TRANS_ACCOUNT_NO_PWD").
- SetBodyMap("payee_info", func(bm gopay.BodyMap) {
- bm.Set("identity", userInfo.OpenId)
- bm.Set("identity_type", "ALIPAY_OPEN_ID")
- })
- _, err = client.FundTransUniTransfer(context.Background(), bm)
- if err != nil {
- // 如果需要重试 推回队尾
- if err.Error() == "Lock wait timeout exceeded; try restarting transaction" {
- ch.Publish(md2.EggAppExchange, msg, md2.EggFinWithdrawApply)
- return nil
- }
- //TODO::处理提现失败
- err = dealFinWithdrawApplyStateForBad(apply)
- if err != nil {
- return err
- }
- session.Rollback()
- return err
- }
- } else if msg.WithdrawKind == 2 {
-
- }
-
- return session.Commit()
- }
-
- func dealFinWithdrawApplyStateForBad(finWithdrawApply *model.FinWithdrawApply) error {
- session := db.Db.NewSession()
- defer session.Close()
- session.Begin()
-
- finWithdrawApplyDb := implement.NewFinWithdrawApplyDb(db.Db)
- finWithdrawApply.State = int(enum.FinWithdrawApplyStateForBad)
- updateAffected, err1 := finWithdrawApplyDb.UpdateFinWithdrawApplyBySession(session, finWithdrawApply, "state")
- if err1 != nil {
- return err1
- }
- if updateAffected <= 0 {
- return errors.New("更新提现单状态失败")
- }
- dealUserWalletReq := md3.DealUserWalletReq{
- Direction: "sub",
- Kind: int(enum.UserWithdrawBad),
- Title: enum.UserWithdrawBad.String(),
- Uid: finWithdrawApply.Uid,
- Amount: utils2.StrToFloat64(finWithdrawApply.Amount),
- }
-
- err := rule.DealUserWallet(session, dealUserWalletReq)
- if err != nil {
- session.Rollback()
- return err
- }
-
- err = session.Commit()
- if err != nil {
- _ = session.Rollback()
- }
- return err
- }
-
- type InitAlipayStruct struct {
- IsProd bool `json:"is_prod" label:"是否生产环境"`
- AlipayAppId string `json:"alipay_app_id" label:"支付宝商家应用appid"`
- AlipayPrivateKey string `json:"alipay_private_key" label:"支付宝商家应用私钥"`
- AlipayPublicKey string `json:"alipay_public_key" label:"支付宝商家应用公钥"`
- AlipayPublicContentRSA2 []byte `json:"alipay_public_content_rsa_2" label:"支付宝公钥证书"`
- AlipayRootContent []byte `json:"alipay_root_content" label:"支付宝根证书"`
- AppPublicContent []byte `json:"app_public_content" label:"应用公钥证书"`
- }
-
- // InitAlipay 初始化支付宝客户端
- // appid:应用ID
- // privateKey:应用私钥,支持PKCS1和PKCS8
- // isProd:是否是正式环境,沙箱环境请选择新版沙箱应用。
- func InitAlipay(initData *InitAlipayStruct) (client *alipay.Client, err error) {
- if initData == nil {
- sysCfgDb := sys_cfg.NewSysCfgDb(db.Db)
- sysCfgMap := sysCfgDb.SysCfgFindWithDb(enum.AlipayAppId, enum.AlipayPrivateKey, enum.AlipayPublicKey, enum.AlipayPublicContentRSA2, enum.AlipayRootContent, enum.AppPublicContent)
- initData = &InitAlipayStruct{
- IsProd: true,
- AlipayAppId: sysCfgMap[enum.AlipayAppId],
- AlipayPrivateKey: sysCfgMap[enum.AlipayPrivateKey],
- AlipayPublicKey: sysCfgMap[enum.AlipayPublicKey],
- AlipayPublicContentRSA2: []byte(sysCfgMap[enum.AlipayPublicContentRSA2]),
- AlipayRootContent: []byte(sysCfgMap[enum.AlipayRootContent]),
- AppPublicContent: []byte(sysCfgMap[enum.AppPublicContent]),
- }
- }
- client, err = alipay.NewClient(initData.AlipayAppId, initData.AlipayPrivateKey, initData.IsProd)
- if err != nil {
- xlog.Error(err)
- return
- }
-
- // 自定义配置http请求接收返回结果body大小,默认 10MB
- client.SetBodySize(10) // 没有特殊需求,可忽略此配置
-
- // 打开Debug开关,输出日志,默认关闭
- client.DebugSwitch = gopay.DebugOn
-
- client.SetLocation(alipay.LocationShanghai). // 设置时区,不设置或出错均为默认服务器时间
- SetCharset(alipay.UTF8). // 设置字符编码,不设置默认 utf-8
- SetSignType(alipay.RSA2) // 设置签名类型,不设置默认 RSA2
-
- // SetAppAuthToken("") //授权token
-
- //SetReturnUrl("https://www.fmm.ink"). // 设置返回URL
- // SetNotifyUrl("https://www.fmm.ink"). // 设置异步通知URL
-
- // 传入 支付宝公钥证书 alipayPublicCert.crt 内容
- client.AutoVerifySign(initData.AlipayPublicContentRSA2)
-
- // 传入证书内容
- err = client.SetCertSnByContent(initData.AppPublicContent, initData.AlipayRootContent, initData.AlipayPublicContentRSA2)
- if err != nil {
- xlog.Debug("SetCertSn:", err)
- return
- }
-
- return
- }
|