package consume import ( "applet/app/cfg" "applet/app/db" utils2 "applet/app/utils" "applet/app/utils/logx" "applet/consume/md" "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" "code.fnuoos.com/EggPlanet/egg_models.git/src/model" "code.fnuoos.com/EggPlanet/egg_system_rules.git" "code.fnuoos.com/EggPlanet/egg_system_rules.git/enum" md3 "code.fnuoos.com/EggPlanet/egg_system_rules.git/md" "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule" md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" "code.fnuoos.com/EggPlanet/egg_system_rules.git/svc/sys_cfg" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "context" "encoding/json" "errors" "fmt" "github.com/go-pay/gopay" "github.com/go-pay/gopay/alipay" "github.com/go-pay/xlog" "github.com/jinzhu/copier" "github.com/streadway/amqp" "time" ) func EggFinWithdrawApplyDataConsume(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>EggFinWithdrawApplyDataConsume>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { logx.Error(err) return } defer ch.Release() //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 ch.Qos(1) delivery := ch.Consume(queue.Name, false) egg_system_rules.Init(cfg.RedisAddr) var alipayStruct *InitAlipayStruct client, err := InitAlipay(alipayStruct) if err != nil { fmt.Println("EggFinWithdrawApplyDataConsume:::::", err.Error()) utils2.FilePutContents("EggFinWithdrawApplyDataConsume", utils2.SerializeStr(map[string]interface{}{ "err": err.Error(), })) return } var res amqp.Delivery var ok bool for { res, ok = <-delivery if ok == true { err = handleEggFinWithdrawApplyDataConsume(res.Body, client, ch) if err != nil { fmt.Println("EggFinWithdrawApplyDataConsume:::::", err.Error()) utils2.FilePutContents("EggFinWithdrawApplyDataConsume", utils2.SerializeStr(map[string]interface{}{ "body": res.Body, "err": err.Error(), })) var data md2.EggFinWithdrawApplyErrorData data.ErrorInfo = err.Error() // 尝试解析数据结构体 var msg *md2.EggFinWithdrawApplyData err1 := json.Unmarshal(res.Body, &msg) if err1 != nil { // 无法解析 直接将[]byte推进队列 data.Ext = res.Body } else { // 解析成功 err2 := copier.Copy(&data, &msg) if err2 != nil { data.Ext = res.Body } } ch.Publish(md2.EggAppExchange, utils2.SerializeStr(data), md2.EggFinWithdrawApplyError) } //_ = res.Reject(false) err = res.Ack(true) fmt.Println("err ::: ", err) } else { panic(errors.New("error getting message")) } } } func handleEggFinWithdrawApplyDataConsume(msgData []byte, client *alipay.Client, ch *rabbit.Channel) error { time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒 // 1.解析mq中queue的数据结构体 var msg *md2.EggFinWithdrawApplyData err := json.Unmarshal(msgData, &msg) if err != nil { return err } applyDb := implement.NewFinWithdrawApplyDb(db.Db) apply, err := applyDb.FinWithdrawApplyGet(msg.Id) if err != nil { return err } if apply == nil { return errors.New("提现记录不存在~") } // 如果不在队列处理阶段或不是自动处理订单,直接返回 if apply.State != 4 && apply.Type != 2 { return errors.New("提现记录无需处理~") } session := db.Db.NewSession() defer func() { session.Close() if err := recover(); err != nil { _ = logx.Error(err) } }() session.Begin() // 2.3 更新 apply 信息 apply.State = 2 affected, err2 := applyDb.UpdateFinWithdrawApplyBySession(session, apply, "state") if err2 != nil { return err2 } if affected == 0 { return errors.New("更新 apply 状态失败") } if msg.WithdrawKind == 1 { // 3.支付宝提现 alipayUserInfoDb := implement.NewAlipayUserInfoDb(db.Db) userInfo, err1 := alipayUserInfoDb.GetAlipayUserInfo(msg.Uid) if err != nil { return err1 } bm := make(gopay.BodyMap) bm.Set("out_biz_no", msg.Id). Set("trans_amount", msg.Amount). Set("biz_scene", "DIRECT_TRANSFER"). Set("product_code", "TRANS_ACCOUNT_NO_PWD"). SetBodyMap("payee_info", func(bm gopay.BodyMap) { bm.Set("identity", userInfo.OpenId) bm.Set("identity_type", "ALIPAY_OPEN_ID") }) _, err = client.FundTransUniTransfer(context.Background(), bm) if err != nil { // 如果需要重试 推回队尾 if err.Error() == "Lock wait timeout exceeded; try restarting transaction" { ch.Publish(md2.EggAppExchange, msg, md2.EggFinWithdrawApply) return nil } //TODO::处理提现失败 err = dealFinWithdrawApplyStateForBad(apply) if err != nil { return err } session.Rollback() return err } } else if msg.WithdrawKind == 2 { } return session.Commit() } func dealFinWithdrawApplyStateForBad(finWithdrawApply *model.FinWithdrawApply) error { session := db.Db.NewSession() defer session.Close() session.Begin() finWithdrawApplyDb := implement.NewFinWithdrawApplyDb(db.Db) finWithdrawApply.State = int(enum.FinWithdrawApplyStateForBad) updateAffected, err1 := finWithdrawApplyDb.UpdateFinWithdrawApplyBySession(session, finWithdrawApply, "state") if err1 != nil { return err1 } if updateAffected <= 0 { return errors.New("更新提现单状态失败") } dealUserWalletReq := md3.DealUserWalletReq{ Direction: "sub", Kind: int(enum.UserWithdrawBad), Title: enum.UserWithdrawBad.String(), Uid: finWithdrawApply.Uid, Amount: utils2.StrToFloat64(finWithdrawApply.Amount), } err := rule.DealUserWallet(session, dealUserWalletReq) if err != nil { session.Rollback() return err } err = session.Commit() if err != nil { _ = session.Rollback() } return err } type InitAlipayStruct struct { IsProd bool `json:"is_prod" label:"是否生产环境"` AlipayAppId string `json:"alipay_app_id" label:"支付宝商家应用appid"` AlipayPrivateKey string `json:"alipay_private_key" label:"支付宝商家应用私钥"` AlipayPublicKey string `json:"alipay_public_key" label:"支付宝商家应用公钥"` AlipayPublicContentRSA2 []byte `json:"alipay_public_content_rsa_2" label:"支付宝公钥证书"` AlipayRootContent []byte `json:"alipay_root_content" label:"支付宝根证书"` AppPublicContent []byte `json:"app_public_content" label:"应用公钥证书"` } // InitAlipay 初始化支付宝客户端 // appid:应用ID // privateKey:应用私钥,支持PKCS1和PKCS8 // isProd:是否是正式环境,沙箱环境请选择新版沙箱应用。 func InitAlipay(initData *InitAlipayStruct) (client *alipay.Client, err error) { if initData == nil { sysCfgDb := sys_cfg.NewSysCfgDb(db.Db) sysCfgMap := sysCfgDb.SysCfgFindWithDb(enum.AlipayAppId, enum.AlipayPrivateKey, enum.AlipayPublicKey, enum.AlipayPublicContentRSA2, enum.AlipayRootContent, enum.AppPublicContent) initData = &InitAlipayStruct{ IsProd: true, AlipayAppId: sysCfgMap[enum.AlipayAppId], AlipayPrivateKey: sysCfgMap[enum.AlipayPrivateKey], AlipayPublicKey: sysCfgMap[enum.AlipayPublicKey], AlipayPublicContentRSA2: []byte(sysCfgMap[enum.AlipayPublicContentRSA2]), AlipayRootContent: []byte(sysCfgMap[enum.AlipayRootContent]), AppPublicContent: []byte(sysCfgMap[enum.AppPublicContent]), } } client, err = alipay.NewClient(initData.AlipayAppId, initData.AlipayPrivateKey, initData.IsProd) if err != nil { xlog.Error(err) return } // 自定义配置http请求接收返回结果body大小,默认 10MB client.SetBodySize(10) // 没有特殊需求,可忽略此配置 // 打开Debug开关,输出日志,默认关闭 client.DebugSwitch = gopay.DebugOn client.SetLocation(alipay.LocationShanghai). // 设置时区,不设置或出错均为默认服务器时间 SetCharset(alipay.UTF8). // 设置字符编码,不设置默认 utf-8 SetSignType(alipay.RSA2) // 设置签名类型,不设置默认 RSA2 // SetAppAuthToken("") //授权token //SetReturnUrl("https://www.fmm.ink"). // 设置返回URL // SetNotifyUrl("https://www.fmm.ink"). // 设置异步通知URL // 传入 支付宝公钥证书 alipayPublicCert.crt 内容 client.AutoVerifySign(initData.AlipayPublicContentRSA2) // 传入证书内容 err = client.SetCertSnByContent(initData.AppPublicContent, initData.AlipayRootContent, initData.AlipayPublicContentRSA2) if err != nil { xlog.Debug("SetCertSn:", err) return } return }