golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

537 lines
17 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "time"
  14. "xorm.io/xorm"
  15. )
  16. func ZhiosAcquisitionCondition(queue md.MqQueue) {
  17. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  18. ch, err := rabbit.Cfg.Pool.GetChannel()
  19. if err != nil {
  20. logx.Error(err)
  21. return
  22. }
  23. defer ch.Release()
  24. //1、将自己绑定到交换机上
  25. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  26. //2、取出数据进行消费
  27. ch.Qos(300)
  28. delivery := ch.Consume(queue.Name, false)
  29. var res amqp.Delivery
  30. var ok bool
  31. for {
  32. res, ok = <-delivery
  33. if ok == true {
  34. //fmt.Println(string(res.Body))
  35. fmt.Println(">>>>>>>>>>>>>>>>ZhiosAcquisitionCondition<<<<<<<<<<<<<<<<<<<<<<<<<")
  36. err = handleZhiosAcquisition(res.Body)
  37. //_ = res.Reject(false)
  38. fmt.Println(err)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. } else {
  42. var canalMsg *md.ZhiosAcquisition
  43. var tmpString string
  44. err := json.Unmarshal(res.Body, &tmpString)
  45. if err == nil {
  46. fmt.Println(tmpString)
  47. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  48. if err == nil {
  49. ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey)
  50. }
  51. }
  52. }
  53. } else {
  54. panic(errors.New("error getting message"))
  55. }
  56. }
  57. fmt.Println("get msg done")
  58. }
  59. func handleZhiosAcquisition(msg []byte) error {
  60. //1、解析canal采集至mq中queue的数据结构体
  61. var canalMsg *md.ZhiosAcquisition
  62. fmt.Println(string(msg))
  63. var tmpString string
  64. err := json.Unmarshal(msg, &tmpString)
  65. if err != nil {
  66. fmt.Println(err.Error())
  67. return err
  68. }
  69. fmt.Println(tmpString)
  70. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  71. if err != nil {
  72. return err
  73. }
  74. mid := canalMsg.Mid
  75. eg := db.DBs[mid]
  76. if eg == nil {
  77. return nil
  78. }
  79. if canalMsg.Uid == "" {
  80. return nil
  81. }
  82. userInfo, _ := db.UserFindByID(eg, canalMsg.Uid)
  83. if userInfo == nil {
  84. return nil
  85. }
  86. userProfile, _ := db.UserProfileFindByID(eg, canalMsg.Uid)
  87. if userProfile == nil {
  88. return nil
  89. }
  90. cfg := db.GetAcquisitionCfg(eg, canalMsg.Id, userInfo.CreateAt)
  91. if cfg == nil {
  92. return nil
  93. }
  94. nextUserProfile, _ := db.UserProfileFindByID(eg, userProfile.ParentUid)
  95. var user = &md.User{Info: userInfo, Profile: userProfile}
  96. bools, str := checkAllCompleteTmp(eg, user, cfg)
  97. isFull := 0
  98. fullTime := 0
  99. toRewardTime := 0
  100. if bools {
  101. isFull = 1
  102. fullTime = int(time.Now().Unix())
  103. toRewardTime = int(time.Now().Unix()) + utils.StrToInt(cfg.RewardAccountDay)*86400
  104. }
  105. //写入奖励记录
  106. //新的设置 读第一个新注册的奖励
  107. lv := 0
  108. if len(cfg.RewardRule.LvRewardList) > 0 {
  109. isEnd := 0
  110. for k, v := range cfg.RewardRule.LvRewardList {
  111. if isEnd == 1 {
  112. continue
  113. }
  114. if utils.StrToInt(v.Id) == user.Info.Level {
  115. isEnd = 1
  116. }
  117. lv = utils.StrToInt(v.Id)
  118. InvitedReward := v.InvitedReward
  119. if cfg.RewardRule.RewardType == "1" {
  120. InvitedReward = Rands(v.InvitedReward, v.InvitedRewardMax)
  121. }
  122. //直推
  123. DirectSuccess := v.DirectSuccess
  124. if cfg.RewardRule.RewardType == "1" {
  125. DirectSuccess = Rands(v.DirectSuccess, v.DirectSuccessMax)
  126. }
  127. //间推
  128. IndirectSuccess := v.IndirectSuccess
  129. if cfg.RewardRule.RewardType == "1" {
  130. IndirectSuccess = Rands(v.IndirectSuccess, v.IndirectSuccessMax)
  131. }
  132. InvitedSource := 0
  133. DirectSource := 1
  134. IndirectSource := 2
  135. InvitedSourceStr := "注册奖励"
  136. DirectSourceStr := "直推好友"
  137. IndirectSourceStr := "间推好友"
  138. if k > 0 {
  139. InvitedSource = 3
  140. DirectSource = 4
  141. IndirectSource = 5
  142. InvitedSourceStr = "升级" + v.Name + "奖励"
  143. DirectSourceStr = "直推好友升级" + v.Name + "奖励"
  144. IndirectSourceStr = "间推好友升级" + v.Name + "奖励"
  145. }
  146. if utils.StrToFloat64(InvitedReward) > 0 {
  147. ownRewardLog, ownhas, _ := db.GetNewAcquisitionRewardLogWhere(eg, userProfile.Uid, user.Profile.Uid, lv)
  148. if !ownhas {
  149. ownRewardLog = &model.NewAcquisitionRewardLog{
  150. Uid: user.Profile.Uid,
  151. ToUid: user.Profile.Uid,
  152. Title: user.Info.Nickname,
  153. Source: InvitedSource,
  154. SourceText: InvitedSourceStr,
  155. Money: InvitedReward,
  156. CreatedAt: int(time.Now().Unix()),
  157. State: 0,
  158. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  159. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  160. InviteTime: int(userInfo.CreateAt.Unix()),
  161. Lv: lv,
  162. }
  163. db.InsertNewRewardLog(eg, ownRewardLog)
  164. }
  165. if ownRewardLog.GivenAt == 0 {
  166. ownRewardLog.CompleteCon = str
  167. ownRewardLog.IsFull = isFull
  168. if ownRewardLog.FullTime == 0 {
  169. ownRewardLog.FullTime = fullTime
  170. }
  171. if ownRewardLog.ToRewardTime == 0 {
  172. ownRewardLog.ToRewardTime = toRewardTime
  173. }
  174. eg.Where("id=?", ownRewardLog.Id).Update(ownRewardLog)
  175. }
  176. }
  177. if utils.StrToFloat64(DirectSuccess) > 0 {
  178. if userProfile.ParentUid > 0 {
  179. //写入奖励记录
  180. extendRewardLog, extendHas, _ := db.GetNewAcquisitionRewardLogWhere(eg, userProfile.ParentUid, userProfile.Uid, lv)
  181. if !extendHas {
  182. extendRewardLog = &model.NewAcquisitionRewardLog{
  183. Uid: user.Profile.ParentUid,
  184. ToUid: user.Profile.Uid,
  185. Title: user.Info.Nickname,
  186. Source: DirectSource,
  187. SourceText: DirectSourceStr,
  188. Money: DirectSuccess,
  189. CreatedAt: int(time.Now().Unix()),
  190. State: 0,
  191. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  192. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  193. InviteTime: int(userInfo.CreateAt.Unix()),
  194. Lv: lv,
  195. }
  196. db.InsertNewRewardLog(eg, extendRewardLog)
  197. }
  198. if extendRewardLog.GivenAt == 0 {
  199. extendRewardLog.CompleteCon = str
  200. extendRewardLog.IsFull = isFull
  201. if extendRewardLog.FullTime == 0 {
  202. extendRewardLog.FullTime = fullTime
  203. }
  204. if extendRewardLog.ToRewardTime == 0 {
  205. extendRewardLog.ToRewardTime = toRewardTime
  206. }
  207. eg.Where("id=?", extendRewardLog.Id).Update(extendRewardLog)
  208. }
  209. }
  210. }
  211. if utils.StrToFloat64(IndirectSuccess) > 0 {
  212. if nextUserProfile != nil && nextUserProfile.ParentUid > 0 {
  213. IndirectRewardLog, IndirectHas, _ := db.GetNewAcquisitionRewardLogWhere(eg, nextUserProfile.ParentUid, userProfile.Uid, lv)
  214. if !IndirectHas {
  215. IndirectRewardLog = &model.NewAcquisitionRewardLog{
  216. Uid: nextUserProfile.ParentUid,
  217. ToUid: user.Profile.Uid,
  218. Title: user.Info.Nickname,
  219. Source: IndirectSource,
  220. SourceText: IndirectSourceStr,
  221. Money: IndirectSuccess,
  222. CreatedAt: int(time.Now().Unix()),
  223. State: 0,
  224. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  225. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  226. InviteTime: int(userInfo.CreateAt.Unix()),
  227. Lv: lv,
  228. }
  229. db.InsertNewRewardLog(eg, IndirectRewardLog)
  230. }
  231. if IndirectRewardLog.GivenAt == 0 {
  232. IndirectRewardLog.CompleteCon = str
  233. IndirectRewardLog.IsFull = isFull
  234. if IndirectRewardLog.FullTime == 0 {
  235. IndirectRewardLog.FullTime = fullTime
  236. }
  237. if IndirectRewardLog.ToRewardTime == 0 {
  238. IndirectRewardLog.ToRewardTime = toRewardTime
  239. }
  240. eg.Where("id=?", IndirectRewardLog.Id).Update(IndirectRewardLog)
  241. }
  242. }
  243. }
  244. }
  245. } else {
  246. InvitedReward := cfg.RewardRule.InvitedReward
  247. if cfg.RewardRule.RewardType == "1" {
  248. InvitedReward = Rands(cfg.RewardRule.InvitedReward, cfg.RewardRule.InvitedRewardMax)
  249. }
  250. //直推
  251. DirectSuccess := cfg.RewardRule.DirectSuccess
  252. if cfg.RewardRule.RewardType == "1" {
  253. DirectSuccess = Rands(cfg.RewardRule.DirectSuccess, cfg.RewardRule.DirectSuccessMax)
  254. }
  255. //间推
  256. IndirectSuccess := cfg.RewardRule.IndirectSuccess
  257. if cfg.RewardRule.RewardType == "1" {
  258. IndirectSuccess = Rands(cfg.RewardRule.IndirectSuccess, cfg.RewardRule.IndirectSuccessMax)
  259. }
  260. if utils.StrToFloat64(InvitedReward) > 0 {
  261. ownRewardLog, ownhas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  262. Uid: user.Profile.Uid,
  263. ToUid: user.Profile.Uid,
  264. })
  265. if !ownhas {
  266. ownRewardLog = &model.NewAcquisitionRewardLog{
  267. Uid: user.Profile.Uid,
  268. ToUid: user.Profile.Uid,
  269. Title: user.Info.Nickname,
  270. Source: 0,
  271. SourceText: "注册奖励",
  272. Money: InvitedReward,
  273. CreatedAt: int(time.Now().Unix()),
  274. State: 0,
  275. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  276. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  277. InviteTime: int(userInfo.CreateAt.Unix()),
  278. Lv: lv,
  279. }
  280. db.InsertNewRewardLog(eg, ownRewardLog)
  281. }
  282. if ownRewardLog.GivenAt == 0 {
  283. ownRewardLog.CompleteCon = str
  284. ownRewardLog.IsFull = isFull
  285. if ownRewardLog.FullTime == 0 {
  286. ownRewardLog.FullTime = fullTime
  287. }
  288. if ownRewardLog.ToRewardTime == 0 {
  289. ownRewardLog.ToRewardTime = toRewardTime
  290. }
  291. eg.Where("id=?", ownRewardLog.Id).Update(ownRewardLog)
  292. }
  293. }
  294. if utils.StrToFloat64(DirectSuccess) > 0 {
  295. if userProfile.ParentUid > 0 {
  296. //写入奖励记录
  297. extendRewardLog, extendHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  298. Uid: userProfile.ParentUid,
  299. ToUid: userProfile.Uid,
  300. })
  301. if !extendHas {
  302. extendRewardLog = &model.NewAcquisitionRewardLog{
  303. Uid: user.Profile.ParentUid,
  304. ToUid: user.Profile.Uid,
  305. Title: user.Info.Nickname,
  306. Source: 1,
  307. SourceText: "直推好友",
  308. Money: DirectSuccess,
  309. CreatedAt: int(time.Now().Unix()),
  310. State: 0,
  311. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  312. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  313. InviteTime: int(userInfo.CreateAt.Unix()),
  314. Lv: lv,
  315. }
  316. db.InsertNewRewardLog(eg, extendRewardLog)
  317. }
  318. if extendRewardLog.GivenAt == 0 {
  319. extendRewardLog.CompleteCon = str
  320. extendRewardLog.IsFull = isFull
  321. if extendRewardLog.FullTime == 0 {
  322. extendRewardLog.FullTime = fullTime
  323. }
  324. if extendRewardLog.ToRewardTime == 0 {
  325. extendRewardLog.ToRewardTime = toRewardTime
  326. }
  327. eg.Where("id=?", extendRewardLog.Id).Update(extendRewardLog)
  328. }
  329. }
  330. }
  331. if utils.StrToFloat64(IndirectSuccess) > 0 {
  332. if nextUserProfile != nil && nextUserProfile.ParentUid > 0 {
  333. IndirectRewardLog, IndirectHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  334. Uid: nextUserProfile.ParentUid,
  335. ToUid: userProfile.Uid,
  336. })
  337. if !IndirectHas {
  338. IndirectRewardLog = &model.NewAcquisitionRewardLog{
  339. Uid: nextUserProfile.ParentUid,
  340. ToUid: user.Profile.Uid,
  341. Title: user.Info.Nickname,
  342. Source: 2,
  343. SourceText: "间推好友",
  344. Money: IndirectSuccess,
  345. CreatedAt: int(time.Now().Unix()),
  346. State: 0,
  347. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  348. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  349. InviteTime: int(userInfo.CreateAt.Unix()),
  350. Lv: lv,
  351. }
  352. db.InsertNewRewardLog(eg, IndirectRewardLog)
  353. }
  354. if IndirectRewardLog.GivenAt == 0 {
  355. IndirectRewardLog.CompleteCon = str
  356. IndirectRewardLog.IsFull = isFull
  357. if IndirectRewardLog.FullTime == 0 {
  358. IndirectRewardLog.FullTime = fullTime
  359. }
  360. if IndirectRewardLog.ToRewardTime == 0 {
  361. IndirectRewardLog.ToRewardTime = toRewardTime
  362. }
  363. eg.Where("id=?", IndirectRewardLog.Id).Update(IndirectRewardLog)
  364. }
  365. }
  366. }
  367. }
  368. return nil
  369. }
  370. func Rands(minVal, maxVal string) string {
  371. min := int(utils.StrToFloat64(minVal) * 100)
  372. max := int(utils.StrToFloat64(maxVal) * 100)
  373. return utils.Float64ToStrByPrec(float64(utils.RandIntRand(min, max))/100, 3)
  374. }
  375. //判断是否符合条件
  376. func checkAllCompleteTmp(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) (bool, string) {
  377. res := true
  378. str := ""
  379. if acqCfg.SuccessConditions.Register.Open == "1" {
  380. res = res && AcqRegisterTmp(user, acqCfg)
  381. if res {
  382. str += ",Register"
  383. }
  384. }
  385. if acqCfg.SuccessConditions.TaobaoAuthorization.Open == "1" {
  386. res = res && AcqTaoBaoAuthTmp(user, acqCfg)
  387. if res {
  388. str += ",TaobaoAuthorization"
  389. }
  390. }
  391. if acqCfg.SuccessConditions.FirstOrder.Open == "1" {
  392. res = res && AcqFirstOrder(eg, user, acqCfg)
  393. if res {
  394. str += ",FirstOrder"
  395. }
  396. }
  397. if acqCfg.SuccessConditions.SelfOrder.Open == "1" {
  398. res = res && AcqSelfOrder(eg, user, acqCfg)
  399. if res {
  400. str += ",SelfOrder"
  401. }
  402. }
  403. if acqCfg.SuccessConditions.OrderPay.Open == "1" {
  404. res = res && AcqOrderPay(eg, user, acqCfg)
  405. if res {
  406. str += ",OrderPay"
  407. }
  408. }
  409. if len(str) > 0 {
  410. str = str[1:]
  411. }
  412. return res, str
  413. }
  414. func AcqRegisterTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  415. var startTime = utils.TimeStdParseUnix(acqCfg.StartTime)
  416. var endTime = utils.TimeStdParseUnix(acqCfg.EndTime)
  417. if startTime == 0 || endTime == 0 {
  418. return false
  419. }
  420. //时间不在活动范围之内返回false
  421. if user.Info.CreateAt.Unix() < startTime {
  422. return false
  423. }
  424. if user.Info.CreateAt.Unix() > endTime {
  425. return false
  426. }
  427. return true
  428. }
  429. func AcqTaoBaoAuthTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  430. if user.Profile.AccTaobaoAuthTime > 0 {
  431. return true
  432. }
  433. return false
  434. }
  435. func AcqFirstOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  436. endTime := int(user.Info.CreateAt.Unix()) + utils.StrToInt(acqCfg.SuccessConditions.FirstOrder.Day)*86400
  437. return commAmount(eg, utils.IntToStr(user.Info.Uid), endTime, 0, acqCfg)
  438. }
  439. func AcqSelfOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  440. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 2, acqCfg)
  441. }
  442. func AcqOrderPay(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  443. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 1, acqCfg)
  444. }
  445. func sqlSelect(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg, arr []string) int {
  446. sql := `SELECT COUNT(*) as count FROM %s ol
  447. LEFT JOIN %s olr on olr.oid=%s and olr.uid=%s
  448. WHERE ol.uid=? %s %s
  449. `
  450. str := ""
  451. if endTime > 0 {
  452. str += " AND olr.create_at<=" + utils.IntToStr(endTime)
  453. }
  454. if types == 2 {
  455. str += " AND olr.amount>=" + acqCfg.SuccessConditions.SelfOrder.Money
  456. }
  457. if types == 1 {
  458. str += " AND %s>=" + acqCfg.SuccessConditions.OrderPay.Money
  459. str = fmt.Sprintf(str, arr[0])
  460. }
  461. sqlOrd := fmt.Sprintf(sql, arr[1], arr[2], arr[3], arr[4], str, arr[5])
  462. ordResult, err := db.QueryNativeString(eg, sqlOrd, uid)
  463. fmt.Println(sqlOrd)
  464. fmt.Println(err)
  465. count := 0
  466. for _, v := range ordResult {
  467. count = utils.StrToInt(v["count"])
  468. }
  469. return count
  470. }
  471. func commAmount(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg) bool {
  472. state := "0,1,2,3,5"
  473. psoState := "'订单付款','订单结算'"
  474. mallState := "1,2,3"
  475. o2oState := "1,2,3,4"
  476. b2cState := "1,2,3,4"
  477. if acqCfg.SuccessConditions.FirstOrder.Status == "1" {
  478. state = "1,2,3,5"
  479. mallState = "2,3"
  480. o2oState = "2,3,4"
  481. b2cState = "2,3,4"
  482. }
  483. if acqCfg.SuccessConditions.FirstOrder.Status == "2" {
  484. state = "2,3,5"
  485. mallState = "2,3"
  486. o2oState = "2,3,4"
  487. b2cState = "2,3,4"
  488. }
  489. if acqCfg.SuccessConditions.FirstOrder.Status == "3" {
  490. state = "3,5"
  491. mallState = "3"
  492. o2oState = "3,4"
  493. b2cState = "3,4"
  494. psoState = "'订单结算'"
  495. }
  496. arr := []string{"ol.paid_price", "ord_list", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + state + ")"}
  497. count := sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  498. arr = []string{"ol.paid_price", "privilege_card_ord", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state=1"}
  499. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  500. arr = []string{"ol.commission", "duoyou_ord_list", "ord_list_relate", "ol.oid", "ol.uid", " and ol.id>0"}
  501. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  502. arr = []string{"ol.amount", "recharge_order", "ord_list_relate", "ol.oid", "ol.uid", " and ol.status<>'已退款'"}
  503. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  504. arr = []string{"ol.amount", "playlet_sale_order", "ord_list_relate", "ol.custom_oid", "ol.uid", " and ol.status in(" + psoState + ")"}
  505. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  506. arr = []string{"ol.cost_price", "mall_ord", "mall_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + mallState + ")"}
  507. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  508. arr = []string{"ol.cost_price", "o2o_ord", "o2o_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + o2oState + ")"}
  509. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  510. arr = []string{"ol.actual_pay_amount", "o2o_pay_to_merchant", "o2o_ord_list_relate", "ol.pay_id", "ol.uid", " and ol.state >=1"}
  511. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  512. arr = []string{"ol.cost_price", "b2c_ord", "b2c_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + b2cState + ")"}
  513. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  514. if count > 0 {
  515. return true
  516. }
  517. return false
  518. }