golang 的 rabbitmq 消费项目
Não pode escolher mais do que 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 

534 linhas
17 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "time"
  14. "xorm.io/xorm"
  15. )
  16. func ZhiosAcquisitionCondition(queue md.MqQueue) {
  17. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  18. ch, err := rabbit.Cfg.Pool.GetChannel()
  19. if err != nil {
  20. logx.Error(err)
  21. return
  22. }
  23. defer ch.Release()
  24. //1、将自己绑定到交换机上
  25. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  26. //2、取出数据进行消费
  27. ch.Qos(300)
  28. delivery := ch.Consume(queue.Name, false)
  29. var res amqp.Delivery
  30. var ok bool
  31. for {
  32. res, ok = <-delivery
  33. if ok == true {
  34. //fmt.Println(string(res.Body))
  35. fmt.Println(">>>>>>>>>>>>>>>>ZhiosAcquisitionCondition<<<<<<<<<<<<<<<<<<<<<<<<<")
  36. err = handleZhiosAcquisition(res.Body)
  37. //_ = res.Reject(false)
  38. fmt.Println(err)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. } else {
  42. var canalMsg *md.ZhiosAcquisition
  43. var tmpString string
  44. err := json.Unmarshal(res.Body, &tmpString)
  45. if err == nil {
  46. fmt.Println(tmpString)
  47. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  48. if err == nil {
  49. ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey)
  50. }
  51. }
  52. }
  53. } else {
  54. panic(errors.New("error getting message"))
  55. }
  56. }
  57. fmt.Println("get msg done")
  58. }
  59. func handleZhiosAcquisition(msg []byte) error {
  60. //1、解析canal采集至mq中queue的数据结构体
  61. var canalMsg *md.ZhiosAcquisition
  62. fmt.Println(string(msg))
  63. var tmpString string
  64. err := json.Unmarshal(msg, &tmpString)
  65. if err != nil {
  66. fmt.Println(err.Error())
  67. return err
  68. }
  69. fmt.Println(tmpString)
  70. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  71. if err != nil {
  72. return err
  73. }
  74. mid := canalMsg.Mid
  75. eg := db.DBs[mid]
  76. if eg == nil {
  77. return nil
  78. }
  79. if canalMsg.Uid == "" {
  80. return nil
  81. }
  82. userInfo, _ := db.UserFindByID(eg, canalMsg.Uid)
  83. if userInfo == nil {
  84. return nil
  85. }
  86. userProfile, _ := db.UserProfileFindByID(eg, canalMsg.Uid)
  87. cfg := db.GetAcquisitionCfg(eg, canalMsg.Id, userInfo.CreateAt)
  88. if cfg == nil {
  89. return nil
  90. }
  91. nextUserProfile, _ := db.UserProfileFindByID(eg, userProfile.ParentUid)
  92. var user = &md.User{Info: userInfo, Profile: userProfile}
  93. bools, str := checkAllCompleteTmp(eg, user, cfg)
  94. isFull := 0
  95. fullTime := 0
  96. toRewardTime := 0
  97. if bools {
  98. isFull = 1
  99. fullTime = int(time.Now().Unix())
  100. toRewardTime = int(time.Now().Unix()) + utils.StrToInt(cfg.RewardAccountDay)*86400
  101. }
  102. //写入奖励记录
  103. //新的设置 读第一个新注册的奖励
  104. lv := 0
  105. if len(cfg.RewardRule.LvRewardList) > 0 {
  106. isEnd := 0
  107. for k, v := range cfg.RewardRule.LvRewardList {
  108. if isEnd == 1 {
  109. continue
  110. }
  111. if utils.StrToInt(v.Id) == user.Info.Level {
  112. isEnd = 1
  113. }
  114. lv = utils.StrToInt(v.Id)
  115. InvitedReward := v.InvitedReward
  116. if cfg.RewardRule.RewardType == "1" {
  117. InvitedReward = Rands(v.InvitedReward, v.InvitedRewardMax)
  118. }
  119. //直推
  120. DirectSuccess := v.DirectSuccess
  121. if cfg.RewardRule.RewardType == "1" {
  122. DirectSuccess = Rands(v.DirectSuccess, v.DirectSuccessMax)
  123. }
  124. //间推
  125. IndirectSuccess := v.IndirectSuccess
  126. if cfg.RewardRule.RewardType == "1" {
  127. IndirectSuccess = Rands(v.IndirectSuccess, v.IndirectSuccessMax)
  128. }
  129. InvitedSource := 0
  130. DirectSource := 1
  131. IndirectSource := 2
  132. InvitedSourceStr := "注册奖励"
  133. DirectSourceStr := "直推好友"
  134. IndirectSourceStr := "间推好友"
  135. if k > 0 {
  136. InvitedSource = 3
  137. DirectSource = 4
  138. IndirectSource = 5
  139. InvitedSourceStr = "升级" + v.Name + "奖励"
  140. DirectSourceStr = "直推好友升级" + v.Name + "奖励"
  141. IndirectSourceStr = "间推好友升级" + v.Name + "奖励"
  142. }
  143. if utils.StrToFloat64(InvitedReward) > 0 {
  144. ownRewardLog, ownhas, _ := db.GetNewAcquisitionRewardLogWhere(eg, userProfile.Uid, user.Profile.Uid, lv)
  145. if !ownhas {
  146. ownRewardLog = &model.NewAcquisitionRewardLog{
  147. Uid: user.Profile.Uid,
  148. ToUid: user.Profile.Uid,
  149. Title: user.Info.Nickname,
  150. Source: InvitedSource,
  151. SourceText: InvitedSourceStr,
  152. Money: InvitedReward,
  153. CreatedAt: int(time.Now().Unix()),
  154. State: 0,
  155. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  156. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  157. InviteTime: int(userInfo.CreateAt.Unix()),
  158. Lv: lv,
  159. }
  160. db.InsertNewRewardLog(eg, ownRewardLog)
  161. }
  162. if ownRewardLog.GivenAt == 0 {
  163. ownRewardLog.CompleteCon = str
  164. ownRewardLog.IsFull = isFull
  165. if ownRewardLog.FullTime == 0 {
  166. ownRewardLog.FullTime = fullTime
  167. }
  168. if ownRewardLog.ToRewardTime == 0 {
  169. ownRewardLog.ToRewardTime = toRewardTime
  170. }
  171. eg.Where("id=?", ownRewardLog.Id).Update(ownRewardLog)
  172. }
  173. }
  174. if utils.StrToFloat64(DirectSuccess) > 0 {
  175. if userProfile.ParentUid > 0 {
  176. //写入奖励记录
  177. extendRewardLog, extendHas, _ := db.GetNewAcquisitionRewardLogWhere(eg, userProfile.ParentUid, userProfile.Uid, lv)
  178. if !extendHas {
  179. extendRewardLog = &model.NewAcquisitionRewardLog{
  180. Uid: user.Profile.ParentUid,
  181. ToUid: user.Profile.Uid,
  182. Title: user.Info.Nickname,
  183. Source: DirectSource,
  184. SourceText: DirectSourceStr,
  185. Money: DirectSuccess,
  186. CreatedAt: int(time.Now().Unix()),
  187. State: 0,
  188. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  189. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  190. InviteTime: int(userInfo.CreateAt.Unix()),
  191. Lv: lv,
  192. }
  193. db.InsertNewRewardLog(eg, extendRewardLog)
  194. }
  195. if extendRewardLog.GivenAt == 0 {
  196. extendRewardLog.CompleteCon = str
  197. extendRewardLog.IsFull = isFull
  198. if extendRewardLog.FullTime == 0 {
  199. extendRewardLog.FullTime = fullTime
  200. }
  201. if extendRewardLog.ToRewardTime == 0 {
  202. extendRewardLog.ToRewardTime = toRewardTime
  203. }
  204. eg.Where("id=?", extendRewardLog.Id).Update(extendRewardLog)
  205. }
  206. }
  207. }
  208. if utils.StrToFloat64(IndirectSuccess) > 0 {
  209. if nextUserProfile != nil && nextUserProfile.ParentUid > 0 {
  210. IndirectRewardLog, IndirectHas, _ := db.GetNewAcquisitionRewardLogWhere(eg, nextUserProfile.ParentUid, userProfile.Uid, lv)
  211. if !IndirectHas {
  212. IndirectRewardLog = &model.NewAcquisitionRewardLog{
  213. Uid: nextUserProfile.ParentUid,
  214. ToUid: user.Profile.Uid,
  215. Title: user.Info.Nickname,
  216. Source: IndirectSource,
  217. SourceText: IndirectSourceStr,
  218. Money: IndirectSuccess,
  219. CreatedAt: int(time.Now().Unix()),
  220. State: 0,
  221. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  222. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  223. InviteTime: int(userInfo.CreateAt.Unix()),
  224. Lv: lv,
  225. }
  226. db.InsertNewRewardLog(eg, IndirectRewardLog)
  227. }
  228. if IndirectRewardLog.GivenAt == 0 {
  229. IndirectRewardLog.CompleteCon = str
  230. IndirectRewardLog.IsFull = isFull
  231. if IndirectRewardLog.FullTime == 0 {
  232. IndirectRewardLog.FullTime = fullTime
  233. }
  234. if IndirectRewardLog.ToRewardTime == 0 {
  235. IndirectRewardLog.ToRewardTime = toRewardTime
  236. }
  237. eg.Where("id=?", IndirectRewardLog.Id).Update(IndirectRewardLog)
  238. }
  239. }
  240. }
  241. }
  242. } else {
  243. InvitedReward := cfg.RewardRule.InvitedReward
  244. if cfg.RewardRule.RewardType == "1" {
  245. InvitedReward = Rands(cfg.RewardRule.InvitedReward, cfg.RewardRule.InvitedRewardMax)
  246. }
  247. //直推
  248. DirectSuccess := cfg.RewardRule.DirectSuccess
  249. if cfg.RewardRule.RewardType == "1" {
  250. DirectSuccess = Rands(cfg.RewardRule.DirectSuccess, cfg.RewardRule.DirectSuccessMax)
  251. }
  252. //间推
  253. IndirectSuccess := cfg.RewardRule.IndirectSuccess
  254. if cfg.RewardRule.RewardType == "1" {
  255. IndirectSuccess = Rands(cfg.RewardRule.IndirectSuccess, cfg.RewardRule.IndirectSuccessMax)
  256. }
  257. if utils.StrToFloat64(InvitedReward) > 0 {
  258. ownRewardLog, ownhas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  259. Uid: user.Profile.Uid,
  260. ToUid: user.Profile.Uid,
  261. })
  262. if !ownhas {
  263. ownRewardLog = &model.NewAcquisitionRewardLog{
  264. Uid: user.Profile.Uid,
  265. ToUid: user.Profile.Uid,
  266. Title: user.Info.Nickname,
  267. Source: 0,
  268. SourceText: "注册奖励",
  269. Money: InvitedReward,
  270. CreatedAt: int(time.Now().Unix()),
  271. State: 0,
  272. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  273. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  274. InviteTime: int(userInfo.CreateAt.Unix()),
  275. Lv: lv,
  276. }
  277. db.InsertNewRewardLog(eg, ownRewardLog)
  278. }
  279. if ownRewardLog.GivenAt == 0 {
  280. ownRewardLog.CompleteCon = str
  281. ownRewardLog.IsFull = isFull
  282. if ownRewardLog.FullTime == 0 {
  283. ownRewardLog.FullTime = fullTime
  284. }
  285. if ownRewardLog.ToRewardTime == 0 {
  286. ownRewardLog.ToRewardTime = toRewardTime
  287. }
  288. eg.Where("id=?", ownRewardLog.Id).Update(ownRewardLog)
  289. }
  290. }
  291. if utils.StrToFloat64(DirectSuccess) > 0 {
  292. if userProfile.ParentUid > 0 {
  293. //写入奖励记录
  294. extendRewardLog, extendHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  295. Uid: userProfile.ParentUid,
  296. ToUid: userProfile.Uid,
  297. })
  298. if !extendHas {
  299. extendRewardLog = &model.NewAcquisitionRewardLog{
  300. Uid: user.Profile.ParentUid,
  301. ToUid: user.Profile.Uid,
  302. Title: user.Info.Nickname,
  303. Source: 1,
  304. SourceText: "直推好友",
  305. Money: DirectSuccess,
  306. CreatedAt: int(time.Now().Unix()),
  307. State: 0,
  308. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  309. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  310. InviteTime: int(userInfo.CreateAt.Unix()),
  311. Lv: lv,
  312. }
  313. db.InsertNewRewardLog(eg, extendRewardLog)
  314. }
  315. if extendRewardLog.GivenAt == 0 {
  316. extendRewardLog.CompleteCon = str
  317. extendRewardLog.IsFull = isFull
  318. if extendRewardLog.FullTime == 0 {
  319. extendRewardLog.FullTime = fullTime
  320. }
  321. if extendRewardLog.ToRewardTime == 0 {
  322. extendRewardLog.ToRewardTime = toRewardTime
  323. }
  324. eg.Where("id=?", extendRewardLog.Id).Update(extendRewardLog)
  325. }
  326. }
  327. }
  328. if utils.StrToFloat64(IndirectSuccess) > 0 {
  329. if nextUserProfile != nil && nextUserProfile.ParentUid > 0 {
  330. IndirectRewardLog, IndirectHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  331. Uid: nextUserProfile.ParentUid,
  332. ToUid: userProfile.Uid,
  333. })
  334. if !IndirectHas {
  335. IndirectRewardLog = &model.NewAcquisitionRewardLog{
  336. Uid: nextUserProfile.ParentUid,
  337. ToUid: user.Profile.Uid,
  338. Title: user.Info.Nickname,
  339. Source: 2,
  340. SourceText: "间推好友",
  341. Money: IndirectSuccess,
  342. CreatedAt: int(time.Now().Unix()),
  343. State: 0,
  344. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  345. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  346. InviteTime: int(userInfo.CreateAt.Unix()),
  347. Lv: lv,
  348. }
  349. db.InsertNewRewardLog(eg, IndirectRewardLog)
  350. }
  351. if IndirectRewardLog.GivenAt == 0 {
  352. IndirectRewardLog.CompleteCon = str
  353. IndirectRewardLog.IsFull = isFull
  354. if IndirectRewardLog.FullTime == 0 {
  355. IndirectRewardLog.FullTime = fullTime
  356. }
  357. if IndirectRewardLog.ToRewardTime == 0 {
  358. IndirectRewardLog.ToRewardTime = toRewardTime
  359. }
  360. eg.Where("id=?", IndirectRewardLog.Id).Update(IndirectRewardLog)
  361. }
  362. }
  363. }
  364. }
  365. return nil
  366. }
  367. func Rands(minVal, maxVal string) string {
  368. min := int(utils.StrToFloat64(minVal) * 100)
  369. max := int(utils.StrToFloat64(maxVal) * 100)
  370. return utils.Float64ToStrByPrec(float64(utils.RandIntRand(min, max))/100, 3)
  371. }
  372. //判断是否符合条件
  373. func checkAllCompleteTmp(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) (bool, string) {
  374. res := true
  375. str := ""
  376. if acqCfg.SuccessConditions.Register.Open == "1" {
  377. res = res && AcqRegisterTmp(user, acqCfg)
  378. if res {
  379. str += ",Register"
  380. }
  381. }
  382. if acqCfg.SuccessConditions.TaobaoAuthorization.Open == "1" {
  383. res = res && AcqTaoBaoAuthTmp(user, acqCfg)
  384. if res {
  385. str += ",TaobaoAuthorization"
  386. }
  387. }
  388. if acqCfg.SuccessConditions.FirstOrder.Open == "1" {
  389. res = res && AcqFirstOrder(eg, user, acqCfg)
  390. if res {
  391. str += ",FirstOrder"
  392. }
  393. }
  394. if acqCfg.SuccessConditions.SelfOrder.Open == "1" {
  395. res = res && AcqSelfOrder(eg, user, acqCfg)
  396. if res {
  397. str += ",SelfOrder"
  398. }
  399. }
  400. if acqCfg.SuccessConditions.OrderPay.Open == "1" {
  401. res = res && AcqOrderPay(eg, user, acqCfg)
  402. if res {
  403. str += ",OrderPay"
  404. }
  405. }
  406. if len(str) > 0 {
  407. str = str[1:]
  408. }
  409. return res, str
  410. }
  411. func AcqRegisterTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  412. var startTime = utils.TimeStdParseUnix(acqCfg.StartTime)
  413. var endTime = utils.TimeStdParseUnix(acqCfg.EndTime)
  414. if startTime == 0 || endTime == 0 {
  415. return false
  416. }
  417. //时间不在活动范围之内返回false
  418. if user.Info.CreateAt.Unix() < startTime {
  419. return false
  420. }
  421. if user.Info.CreateAt.Unix() > endTime {
  422. return false
  423. }
  424. return true
  425. }
  426. func AcqTaoBaoAuthTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  427. if user.Profile.AccTaobaoAuthTime > 0 {
  428. return true
  429. }
  430. return false
  431. }
  432. func AcqFirstOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  433. endTime := int(user.Info.CreateAt.Unix()) + utils.StrToInt(acqCfg.SuccessConditions.FirstOrder.Day)*86400
  434. return commAmount(eg, utils.IntToStr(user.Info.Uid), endTime, 0, acqCfg)
  435. }
  436. func AcqSelfOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  437. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 2, acqCfg)
  438. }
  439. func AcqOrderPay(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  440. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 1, acqCfg)
  441. }
  442. func sqlSelect(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg, arr []string) int {
  443. sql := `SELECT COUNT(*) as count FROM %s ol
  444. LEFT JOIN %s olr on olr.oid=%s and olr.uid=%s
  445. WHERE ol.uid=? %s %s
  446. `
  447. str := ""
  448. if endTime > 0 {
  449. str += " AND olr.create_at<=" + utils.IntToStr(endTime)
  450. }
  451. if types == 2 {
  452. str += " AND olr.amount>=" + acqCfg.SuccessConditions.SelfOrder.Money
  453. }
  454. if types == 1 {
  455. str += " AND %s>=" + acqCfg.SuccessConditions.OrderPay.Money
  456. str = fmt.Sprintf(str, arr[0])
  457. }
  458. sqlOrd := fmt.Sprintf(sql, arr[1], arr[2], arr[3], arr[4], str, arr[5])
  459. ordResult, err := db.QueryNativeString(eg, sqlOrd, uid)
  460. fmt.Println(sqlOrd)
  461. fmt.Println(err)
  462. count := 0
  463. for _, v := range ordResult {
  464. count = utils.StrToInt(v["count"])
  465. }
  466. return count
  467. }
  468. func commAmount(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg) bool {
  469. state := "0,1,2,3,5"
  470. psoState := "'订单付款','订单结算'"
  471. mallState := "1,2,3"
  472. o2oState := "1,2,3,4"
  473. b2cState := "1,2,3,4"
  474. if acqCfg.SuccessConditions.FirstOrder.Status == "1" {
  475. state = "1,2,3,5"
  476. mallState = "2,3"
  477. o2oState = "2,3,4"
  478. b2cState = "2,3,4"
  479. }
  480. if acqCfg.SuccessConditions.FirstOrder.Status == "2" {
  481. state = "2,3,5"
  482. mallState = "2,3"
  483. o2oState = "2,3,4"
  484. b2cState = "2,3,4"
  485. }
  486. if acqCfg.SuccessConditions.FirstOrder.Status == "3" {
  487. state = "3,5"
  488. mallState = "3"
  489. o2oState = "3,4"
  490. b2cState = "3,4"
  491. psoState = "'订单结算'"
  492. }
  493. arr := []string{"ol.paid_price", "ord_list", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + state + ")"}
  494. count := sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  495. arr = []string{"ol.paid_price", "privilege_card_ord", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state=1"}
  496. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  497. arr = []string{"ol.commission", "duoyou_ord_list", "ord_list_relate", "ol.oid", "ol.uid", " and ol.id>0"}
  498. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  499. arr = []string{"ol.amount", "recharge_order", "ord_list_relate", "ol.oid", "ol.uid", " and ol.status<>'已退款'"}
  500. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  501. arr = []string{"ol.amount", "playlet_sale_order", "ord_list_relate", "ol.custom_oid", "ol.uid", " and ol.status in(" + psoState + ")"}
  502. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  503. arr = []string{"ol.cost_price", "mall_ord", "mall_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + mallState + ")"}
  504. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  505. arr = []string{"ol.cost_price", "o2o_ord", "o2o_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + o2oState + ")"}
  506. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  507. arr = []string{"ol.actual_pay_amount", "o2o_pay_to_merchant", "o2o_ord_list_relate", "ol.pay_id", "ol.uid", " and ol.state >=1"}
  508. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  509. arr = []string{"ol.cost_price", "b2c_ord", "b2c_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + b2cState + ")"}
  510. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  511. if count > 0 {
  512. return true
  513. }
  514. return false
  515. }