golang 的 rabbitmq 消费项目
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 

378 рядки
12 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "time"
  14. "xorm.io/xorm"
  15. )
  16. func ZhiosAcquisitionCondition(queue md.MqQueue) {
  17. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  18. ch, err := rabbit.Cfg.Pool.GetChannel()
  19. if err != nil {
  20. logx.Error(err)
  21. return
  22. }
  23. defer ch.Release()
  24. //1、将自己绑定到交换机上
  25. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  26. //2、取出数据进行消费
  27. ch.Qos(100)
  28. delivery := ch.Consume(queue.Name, false)
  29. var res amqp.Delivery
  30. var ok bool
  31. for {
  32. res, ok = <-delivery
  33. if ok == true {
  34. //fmt.Println(string(res.Body))
  35. fmt.Println(">>>>>>>>>>>>>>>>ZhiosAcquisitionCondition<<<<<<<<<<<<<<<<<<<<<<<<<")
  36. err = handleZhiosAcquisition(res.Body)
  37. //_ = res.Reject(false)
  38. fmt.Println(err)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. } else {
  42. var canalMsg *md.ZhiosAcquisition
  43. var tmpString string
  44. err := json.Unmarshal(res.Body, &tmpString)
  45. if err == nil {
  46. fmt.Println(tmpString)
  47. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  48. if err == nil {
  49. ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey)
  50. }
  51. }
  52. }
  53. } else {
  54. panic(errors.New("error getting message"))
  55. }
  56. }
  57. fmt.Println("get msg done")
  58. }
  59. func handleZhiosAcquisition(msg []byte) error {
  60. //1、解析canal采集至mq中queue的数据结构体
  61. var canalMsg *md.ZhiosAcquisition
  62. fmt.Println(string(msg))
  63. var tmpString string
  64. err := json.Unmarshal(msg, &tmpString)
  65. if err != nil {
  66. fmt.Println(err.Error())
  67. return err
  68. }
  69. fmt.Println(tmpString)
  70. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  71. if err != nil {
  72. return err
  73. }
  74. mid := canalMsg.Mid
  75. eg := db.DBs[mid]
  76. if eg == nil {
  77. return nil
  78. }
  79. userInfo, _ := db.UserFindByID(eg, canalMsg.Uid)
  80. if userInfo == nil {
  81. return nil
  82. }
  83. userProfile, _ := db.UserProfileFindByID(eg, canalMsg.Uid)
  84. cfg := db.GetAcquisitionCfg(eg, canalMsg.Id, userInfo.CreateAt)
  85. if cfg == nil {
  86. return nil
  87. }
  88. nextUserProfile, _ := db.UserProfileFindByID(eg, userProfile.ParentUid)
  89. var user = &md.User{Info: userInfo, Profile: userProfile}
  90. bools, str := checkAllCompleteTmp(eg, user, cfg)
  91. isFull := 0
  92. fullTime := 0
  93. toRewardTime := 0
  94. if bools {
  95. isFull = 1
  96. fullTime = int(time.Now().Unix())
  97. toRewardTime = int(time.Now().Unix()) + utils.StrToInt(cfg.RewardAccountDay)*86400
  98. }
  99. //写入奖励记录
  100. InvitedReward := cfg.RewardRule.InvitedReward
  101. if cfg.RewardRule.RewardType == "1" {
  102. InvitedReward = Rands(cfg.RewardRule.InvitedReward, cfg.RewardRule.InvitedRewardMax)
  103. }
  104. if utils.StrToFloat64(InvitedReward) > 0 {
  105. ownRewardLog, ownhas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  106. Uid: userProfile.Uid,
  107. ToUid: 0,
  108. })
  109. if !ownhas {
  110. ownRewardLog = &model.NewAcquisitionRewardLog{
  111. Uid: user.Profile.Uid,
  112. ToUid: user.Profile.Uid,
  113. Title: user.Info.Nickname,
  114. Source: 0,
  115. SourceText: "注册奖励",
  116. Money: InvitedReward,
  117. CreatedAt: int(time.Now().Unix()),
  118. State: 0,
  119. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  120. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  121. InviteTime: int(userInfo.CreateAt.Unix()),
  122. }
  123. db.InsertNewRewardLog(eg, ownRewardLog)
  124. }
  125. ownRewardLog.CompleteCon = str
  126. ownRewardLog.IsFull = isFull
  127. if ownRewardLog.FullTime == 0 {
  128. ownRewardLog.FullTime = fullTime
  129. }
  130. if ownRewardLog.ToRewardTime == 0 {
  131. ownRewardLog.ToRewardTime = toRewardTime
  132. }
  133. eg.Where("id=?", ownRewardLog.Id).Update(ownRewardLog)
  134. }
  135. //直推
  136. DirectSuccess := cfg.RewardRule.DirectSuccess
  137. if cfg.RewardRule.RewardType == "1" {
  138. DirectSuccess = Rands(cfg.RewardRule.DirectSuccess, cfg.RewardRule.DirectSuccessMax)
  139. }
  140. if utils.StrToFloat64(DirectSuccess) > 0 {
  141. if userProfile.ParentUid > 0 {
  142. //写入奖励记录
  143. extendRewardLog, extendHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  144. Uid: userProfile.ParentUid,
  145. ToUid: userProfile.Uid,
  146. })
  147. if !extendHas {
  148. extendRewardLog = &model.NewAcquisitionRewardLog{
  149. Uid: user.Profile.ParentUid,
  150. ToUid: user.Profile.Uid,
  151. Title: user.Info.Nickname,
  152. Source: 1,
  153. SourceText: "直推好友",
  154. Money: DirectSuccess,
  155. CreatedAt: int(time.Now().Unix()),
  156. State: 0,
  157. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  158. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  159. InviteTime: int(userInfo.CreateAt.Unix()),
  160. }
  161. db.InsertNewRewardLog(eg, extendRewardLog)
  162. }
  163. extendRewardLog.CompleteCon = str
  164. extendRewardLog.IsFull = isFull
  165. if extendRewardLog.FullTime == 0 {
  166. extendRewardLog.FullTime = fullTime
  167. }
  168. if extendRewardLog.ToRewardTime == 0 {
  169. extendRewardLog.ToRewardTime = toRewardTime
  170. }
  171. eg.Where("id=?", extendRewardLog.Id).Update(extendRewardLog)
  172. }
  173. }
  174. //间推
  175. IndirectSuccess := cfg.RewardRule.IndirectSuccess
  176. if cfg.RewardRule.RewardType == "1" {
  177. IndirectSuccess = Rands(cfg.RewardRule.IndirectSuccess, cfg.RewardRule.IndirectSuccessMax)
  178. }
  179. if utils.StrToFloat64(IndirectSuccess) > 0 {
  180. if nextUserProfile != nil && nextUserProfile.ParentUid > 0 {
  181. IndirectRewardLog, IndirectHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  182. Uid: nextUserProfile.ParentUid,
  183. ToUid: userProfile.Uid,
  184. })
  185. if !IndirectHas {
  186. IndirectRewardLog = &model.NewAcquisitionRewardLog{
  187. Uid: nextUserProfile.ParentUid,
  188. ToUid: user.Profile.Uid,
  189. Title: user.Info.Nickname,
  190. Source: 2,
  191. SourceText: "间推好友",
  192. Money: IndirectSuccess,
  193. CreatedAt: int(time.Now().Unix()),
  194. State: 0,
  195. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  196. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  197. InviteTime: int(userInfo.CreateAt.Unix()),
  198. }
  199. db.InsertNewRewardLog(eg, IndirectRewardLog)
  200. }
  201. IndirectRewardLog.CompleteCon = str
  202. IndirectRewardLog.IsFull = isFull
  203. if IndirectRewardLog.FullTime == 0 {
  204. IndirectRewardLog.FullTime = fullTime
  205. }
  206. if IndirectRewardLog.ToRewardTime == 0 {
  207. IndirectRewardLog.ToRewardTime = toRewardTime
  208. }
  209. eg.Where("id=?", IndirectRewardLog.Id).Update(IndirectRewardLog)
  210. }
  211. }
  212. return nil
  213. }
  214. func Rands(minVal, maxVal string) string {
  215. min := int(utils.StrToFloat64(minVal) * 100)
  216. max := int(utils.StrToFloat64(maxVal) * 100)
  217. return utils.Float64ToStrByPrec(float64(utils.RandIntRand(min, max))/100, 3)
  218. }
  219. //判断是否符合条件
  220. func checkAllCompleteTmp(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) (bool, string) {
  221. res := true
  222. str := ""
  223. if acqCfg.SuccessConditions.Register.Open == "1" {
  224. res = res && AcqRegisterTmp(user, acqCfg)
  225. if res {
  226. str += ",Register"
  227. }
  228. }
  229. if acqCfg.SuccessConditions.TaobaoAuthorization.Open == "1" {
  230. res = res && AcqTaoBaoAuthTmp(user, acqCfg)
  231. if res {
  232. str += ",TaobaoAuthorization"
  233. }
  234. }
  235. if acqCfg.SuccessConditions.FirstOrder.Open == "1" {
  236. res = res && AcqFirstOrder(eg, user, acqCfg)
  237. if res {
  238. str += ",FirstOrder"
  239. }
  240. }
  241. if acqCfg.SuccessConditions.SelfOrder.Open == "1" {
  242. res = res && AcqSelfOrder(eg, user, acqCfg)
  243. if res {
  244. str += ",SelfOrder"
  245. }
  246. }
  247. if acqCfg.SuccessConditions.OrderPay.Open == "1" {
  248. res = res && AcqOrderPay(eg, user, acqCfg)
  249. if res {
  250. str += ",OrderPay"
  251. }
  252. }
  253. if len(str) > 0 {
  254. str = str[1:]
  255. }
  256. return res, str
  257. }
  258. func AcqRegisterTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  259. var startTime = utils.TimeStdParseUnix(acqCfg.StartTime)
  260. var endTime = utils.TimeStdParseUnix(acqCfg.EndTime)
  261. if startTime == 0 || endTime == 0 {
  262. return false
  263. }
  264. //时间不在活动范围之内返回false
  265. if user.Info.CreateAt.Unix() < startTime {
  266. return false
  267. }
  268. if user.Info.CreateAt.Unix() > endTime {
  269. return false
  270. }
  271. return true
  272. }
  273. func AcqTaoBaoAuthTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  274. if user.Profile.AccTaobaoAuthTime > 0 {
  275. return true
  276. }
  277. return false
  278. }
  279. func AcqFirstOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  280. endTime := int(user.Info.CreateAt.Unix()) + utils.StrToInt(acqCfg.SuccessConditions.FirstOrder.Day)*86400
  281. return commAmount(eg, utils.IntToStr(user.Info.Uid), endTime, 0, acqCfg)
  282. }
  283. func AcqSelfOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  284. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 2, acqCfg)
  285. }
  286. func AcqOrderPay(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  287. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 1, acqCfg)
  288. }
  289. func sqlSelect(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg, arr []string) int {
  290. sql := `SELECT COUNT(*) as count FROM %s ol
  291. LEFT JOIN %s olr on olr.oid=%s and olr.uid=%s
  292. WHERE ol.uid=? %s %s
  293. `
  294. str := ""
  295. if endTime > 0 {
  296. str += " AND olr.create_at<=" + utils.IntToStr(endTime)
  297. }
  298. if types == 2 {
  299. str += " AND olr.amount>=" + acqCfg.SuccessConditions.SelfOrder.Money
  300. }
  301. if types == 1 {
  302. str += " AND %s>=" + acqCfg.SuccessConditions.OrderPay.Money
  303. str = fmt.Sprintf(str, arr[0])
  304. }
  305. sqlOrd := fmt.Sprintf(sql, arr[1], arr[2], arr[3], arr[4], str, arr[5])
  306. ordResult, err := db.QueryNativeString(eg, sqlOrd, uid)
  307. fmt.Println(sqlOrd)
  308. fmt.Println(err)
  309. count := 0
  310. for _, v := range ordResult {
  311. count = utils.StrToInt(v["count"])
  312. }
  313. return count
  314. }
  315. func commAmount(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg) bool {
  316. state := "0,1,2,3,5"
  317. psoState := "'订单付款','订单结算'"
  318. mallState := "1,2,3"
  319. o2oState := "1,2,3,4"
  320. b2cState := "1,2,3,4"
  321. if acqCfg.SuccessConditions.FirstOrder.Status == "1" {
  322. state = "1,2,3,5"
  323. mallState = "2,3"
  324. o2oState = "2,3,4"
  325. b2cState = "2,3,4"
  326. }
  327. if acqCfg.SuccessConditions.FirstOrder.Status == "2" {
  328. state = "2,3,5"
  329. mallState = "2,3"
  330. o2oState = "2,3,4"
  331. b2cState = "2,3,4"
  332. }
  333. if acqCfg.SuccessConditions.FirstOrder.Status == "3" {
  334. state = "3,5"
  335. mallState = "3"
  336. o2oState = "3,4"
  337. b2cState = "3,4"
  338. psoState = "'订单结算'"
  339. }
  340. arr := []string{"ol.paid_price", "ord_list", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + state + ")"}
  341. count := sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  342. arr = []string{"ol.paid_price", "privilege_card_ord", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state=1"}
  343. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  344. arr = []string{"ol.commission", "duoyou_ord_list", "ord_list_relate", "ol.oid", "ol.uid", " and ol.id>0"}
  345. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  346. arr = []string{"ol.amount", "recharge_order", "ord_list_relate", "ol.oid", "ol.uid", " and ol.status<>'已退款'"}
  347. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  348. arr = []string{"ol.amount", "playlet_sale_order", "ord_list_relate", "ol.custom_oid", "ol.uid", " and ol.status in(" + psoState + ")"}
  349. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  350. arr = []string{"ol.cost_price", "mall_ord", "mall_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + mallState + ")"}
  351. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  352. arr = []string{"ol.cost_price", "o2o_ord", "o2o_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + o2oState + ")"}
  353. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  354. arr = []string{"ol.actual_pay_amount", "o2o_pay_to_merchant", "o2o_ord_list_relate", "ol.pay_id", "ol.uid", " and ol.state >=1"}
  355. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  356. arr = []string{"ol.cost_price", "b2c_ord", "b2c_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + b2cState + ")"}
  357. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  358. if count > 0 {
  359. return true
  360. }
  361. return false
  362. }