golang 的 rabbitmq 消费项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

375 lines
12 KiB

  1. package consume
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/model"
  5. "applet/app/utils"
  6. "applet/app/utils/logx"
  7. "applet/consume/md"
  8. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "github.com/streadway/amqp"
  13. "time"
  14. "xorm.io/xorm"
  15. )
  16. func ZhiosAcquisitionCondition(queue md.MqQueue) {
  17. fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
  18. ch, err := rabbit.Cfg.Pool.GetChannel()
  19. if err != nil {
  20. logx.Error(err)
  21. return
  22. }
  23. defer ch.Release()
  24. //1、将自己绑定到交换机上
  25. ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
  26. //2、取出数据进行消费
  27. ch.Qos(100)
  28. delivery := ch.Consume(queue.Name, false)
  29. var res amqp.Delivery
  30. var ok bool
  31. for {
  32. res, ok = <-delivery
  33. if ok == true {
  34. //fmt.Println(string(res.Body))
  35. fmt.Println(">>>>>>>>>>>>>>>>CanalOrderConsume<<<<<<<<<<<<<<<<<<<<<<<<<")
  36. err = handleZhiosAcquisition(res.Body)
  37. //_ = res.Reject(false)
  38. fmt.Println(err)
  39. if err == nil {
  40. _ = res.Ack(true)
  41. } else {
  42. var canalMsg *md.ZhiosAcquisition
  43. var tmpString string
  44. err := json.Unmarshal(res.Body, &tmpString)
  45. if err == nil {
  46. fmt.Println(tmpString)
  47. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  48. if err == nil {
  49. ch.Publish(queue.ExchangeName, utils.SerializeStr(canalMsg), queue.RoutKey)
  50. }
  51. }
  52. }
  53. } else {
  54. panic(errors.New("error getting message"))
  55. }
  56. }
  57. fmt.Println("get msg done")
  58. }
  59. func handleZhiosAcquisition(msg []byte) error {
  60. //1、解析canal采集至mq中queue的数据结构体
  61. var canalMsg *md.ZhiosAcquisition
  62. fmt.Println(string(msg))
  63. var tmpString string
  64. err := json.Unmarshal(msg, &tmpString)
  65. if err != nil {
  66. fmt.Println(err.Error())
  67. return err
  68. }
  69. fmt.Println(tmpString)
  70. err = json.Unmarshal([]byte(tmpString), &canalMsg)
  71. if err != nil {
  72. return err
  73. }
  74. mid := canalMsg.Mid
  75. eg := db.DBs[mid]
  76. userInfo, _ := db.UserFindByID(eg, canalMsg.Uid)
  77. if userInfo == nil {
  78. return nil
  79. }
  80. userProfile, _ := db.UserProfileFindByID(eg, canalMsg.Uid)
  81. cfg := db.GetAcquisitionCfg(eg, canalMsg.Id, userInfo.CreateAt)
  82. if cfg == nil {
  83. return nil
  84. }
  85. nextUserProfile, _ := db.UserProfileFindByID(eg, userProfile.ParentUid)
  86. var user = &md.User{Info: userInfo, Profile: userProfile}
  87. bools, str := checkAllCompleteTmp(eg, user, cfg)
  88. isFull := 0
  89. fullTime := 0
  90. toRewardTime := 0
  91. if bools {
  92. isFull = 1
  93. fullTime = int(time.Now().Unix())
  94. toRewardTime = int(time.Now().Unix()) + utils.StrToInt(cfg.RewardAccountDay)*86400
  95. }
  96. //写入奖励记录
  97. InvitedReward := cfg.RewardRule.InvitedReward
  98. if cfg.RewardRule.RewardType == "1" {
  99. InvitedReward = Rands(cfg.RewardRule.InvitedReward, cfg.RewardRule.InvitedRewardMax)
  100. }
  101. if utils.StrToFloat64(InvitedReward) > 0 {
  102. ownRewardLog, ownhas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  103. Uid: userProfile.Uid,
  104. ToUid: 0,
  105. })
  106. if !ownhas {
  107. ownRewardLog = &model.NewAcquisitionRewardLog{
  108. Uid: user.Profile.Uid,
  109. ToUid: user.Profile.Uid,
  110. Title: user.Info.Nickname,
  111. Source: 0,
  112. SourceText: "注册奖励",
  113. Money: InvitedReward,
  114. CreatedAt: int(time.Now().Unix()),
  115. State: 0,
  116. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  117. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  118. InviteTime: int(userInfo.CreateAt.Unix()),
  119. }
  120. db.InsertNewRewardLog(eg, ownRewardLog)
  121. }
  122. ownRewardLog.CompleteCon = str
  123. ownRewardLog.IsFull = isFull
  124. if ownRewardLog.FullTime == 0 {
  125. ownRewardLog.FullTime = fullTime
  126. }
  127. if ownRewardLog.ToRewardTime == 0 {
  128. ownRewardLog.ToRewardTime = toRewardTime
  129. }
  130. eg.Where("id=?", ownRewardLog.Id).Update(ownRewardLog)
  131. }
  132. //直推
  133. DirectSuccess := cfg.RewardRule.DirectSuccess
  134. if cfg.RewardRule.RewardType == "1" {
  135. DirectSuccess = Rands(cfg.RewardRule.DirectSuccess, cfg.RewardRule.DirectSuccessMax)
  136. }
  137. if utils.StrToFloat64(DirectSuccess) > 0 {
  138. if userProfile.ParentUid > 0 {
  139. //写入奖励记录
  140. extendRewardLog, extendHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  141. Uid: userProfile.ParentUid,
  142. ToUid: userProfile.Uid,
  143. })
  144. if !extendHas {
  145. extendRewardLog = &model.NewAcquisitionRewardLog{
  146. Uid: user.Profile.ParentUid,
  147. ToUid: user.Profile.Uid,
  148. Title: user.Info.Nickname,
  149. Source: 1,
  150. SourceText: "直推好友",
  151. Money: DirectSuccess,
  152. CreatedAt: int(time.Now().Unix()),
  153. State: 0,
  154. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  155. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  156. InviteTime: int(userInfo.CreateAt.Unix()),
  157. }
  158. db.InsertNewRewardLog(eg, extendRewardLog)
  159. }
  160. extendRewardLog.CompleteCon = str
  161. extendRewardLog.IsFull = isFull
  162. if extendRewardLog.FullTime == 0 {
  163. extendRewardLog.FullTime = fullTime
  164. }
  165. if extendRewardLog.ToRewardTime == 0 {
  166. extendRewardLog.ToRewardTime = toRewardTime
  167. }
  168. eg.Where("id=?", extendRewardLog.Id).Update(extendRewardLog)
  169. }
  170. }
  171. //间推
  172. IndirectSuccess := cfg.RewardRule.IndirectSuccess
  173. if cfg.RewardRule.RewardType == "1" {
  174. IndirectSuccess = Rands(cfg.RewardRule.IndirectSuccess, cfg.RewardRule.IndirectSuccessMax)
  175. }
  176. if utils.StrToFloat64(IndirectSuccess) > 0 {
  177. if nextUserProfile != nil && nextUserProfile.ParentUid > 0 {
  178. IndirectRewardLog, IndirectHas, _ := db.GetNewAcquisitionRewardLog(eg, &model.NewAcquisitionRewardLog{
  179. Uid: nextUserProfile.ParentUid,
  180. ToUid: userProfile.Uid,
  181. })
  182. if !IndirectHas {
  183. IndirectRewardLog = &model.NewAcquisitionRewardLog{
  184. Uid: nextUserProfile.ParentUid,
  185. ToUid: user.Profile.Uid,
  186. Title: user.Info.Nickname,
  187. Source: 2,
  188. SourceText: "间推好友",
  189. Money: IndirectSuccess,
  190. CreatedAt: int(time.Now().Unix()),
  191. State: 0,
  192. CoinId: utils.StrToInt(cfg.RewardRule.RewardCoinId),
  193. RewardType: utils.StrToInt(cfg.RewardRule.RewardType),
  194. InviteTime: int(userInfo.CreateAt.Unix()),
  195. }
  196. db.InsertNewRewardLog(eg, IndirectRewardLog)
  197. }
  198. IndirectRewardLog.CompleteCon = str
  199. IndirectRewardLog.IsFull = isFull
  200. if IndirectRewardLog.FullTime == 0 {
  201. IndirectRewardLog.FullTime = fullTime
  202. }
  203. if IndirectRewardLog.ToRewardTime == 0 {
  204. IndirectRewardLog.ToRewardTime = toRewardTime
  205. }
  206. eg.Where("id=?", IndirectRewardLog.Id).Update(IndirectRewardLog)
  207. }
  208. }
  209. return nil
  210. }
  211. func Rands(minVal, maxVal string) string {
  212. min := int(utils.StrToFloat64(minVal) * 100)
  213. max := int(utils.StrToFloat64(maxVal) * 100)
  214. return utils.Float64ToStrByPrec(float64(utils.RandIntRand(min, max))/100, 3)
  215. }
  216. //判断是否符合条件
  217. func checkAllCompleteTmp(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) (bool, string) {
  218. res := true
  219. str := ""
  220. if acqCfg.SuccessConditions.Register.Open == "1" {
  221. res = res && AcqRegisterTmp(user, acqCfg)
  222. if res {
  223. str += ",Register"
  224. }
  225. }
  226. if acqCfg.SuccessConditions.TaobaoAuthorization.Open == "1" {
  227. res = res && AcqTaoBaoAuthTmp(user, acqCfg)
  228. if res {
  229. str += ",TaobaoAuthorization"
  230. }
  231. }
  232. if acqCfg.SuccessConditions.FirstOrder.Open == "1" {
  233. res = res && AcqFirstOrder(eg, user, acqCfg)
  234. if res {
  235. str += ",FirstOrder"
  236. }
  237. }
  238. if acqCfg.SuccessConditions.SelfOrder.Open == "1" {
  239. res = res && AcqSelfOrder(eg, user, acqCfg)
  240. if res {
  241. str += ",SelfOrder"
  242. }
  243. }
  244. if acqCfg.SuccessConditions.OrderPay.Open == "1" {
  245. res = res && AcqOrderPay(eg, user, acqCfg)
  246. if res {
  247. str += ",OrderPay"
  248. }
  249. }
  250. if len(str) > 0 {
  251. str = str[1:]
  252. }
  253. return res, str
  254. }
  255. func AcqRegisterTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  256. var startTime = utils.TimeStdParseUnix(acqCfg.StartTime)
  257. var endTime = utils.TimeStdParseUnix(acqCfg.EndTime)
  258. if startTime == 0 || endTime == 0 {
  259. return false
  260. }
  261. //时间不在活动范围之内返回false
  262. if user.Info.CreateAt.Unix() < startTime {
  263. return false
  264. }
  265. if user.Info.CreateAt.Unix() > endTime {
  266. return false
  267. }
  268. return true
  269. }
  270. func AcqTaoBaoAuthTmp(user *md.User, acqCfg *md.AcquisitionCfg) bool {
  271. if user.Profile.AccTaobaoAuthTime > 0 {
  272. return true
  273. }
  274. return false
  275. }
  276. func AcqFirstOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  277. endTime := int(user.Info.CreateAt.Unix()) + utils.StrToInt(acqCfg.SuccessConditions.FirstOrder.Day)*86400
  278. return commAmount(eg, utils.IntToStr(user.Info.Uid), endTime, 0, acqCfg)
  279. }
  280. func AcqSelfOrder(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  281. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 2, acqCfg)
  282. }
  283. func AcqOrderPay(eg *xorm.Engine, user *md.User, acqCfg *md.AcquisitionCfg) bool {
  284. return commAmount(eg, utils.IntToStr(user.Info.Uid), 0, 1, acqCfg)
  285. }
  286. func sqlSelect(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg, arr []string) int {
  287. sql := `SELECT COUNT(*) as count FROM %s ol
  288. LEFT JOIN %s olr on olr.oid=%s and olr.uid=%s
  289. WHERE ol.uid=? %s %s
  290. `
  291. str := ""
  292. if endTime > 0 {
  293. str += " AND olr.create_at<=" + utils.IntToStr(endTime)
  294. }
  295. if types == 2 {
  296. str += " AND olr.amount>=" + acqCfg.SuccessConditions.SelfOrder.Money
  297. }
  298. if types == 1 {
  299. str += " AND %s>=" + acqCfg.SuccessConditions.OrderPay.Money
  300. str = fmt.Sprintf(str, arr[0])
  301. }
  302. sqlOrd := fmt.Sprintf(sql, arr[1], arr[2], arr[3], arr[4], str, arr[5])
  303. ordResult, err := db.QueryNativeString(eg, sqlOrd, uid)
  304. fmt.Println(sqlOrd)
  305. fmt.Println(err)
  306. count := 0
  307. for _, v := range ordResult {
  308. count = utils.StrToInt(v["count"])
  309. }
  310. return count
  311. }
  312. func commAmount(eg *xorm.Engine, uid string, endTime, types int, acqCfg *md.AcquisitionCfg) bool {
  313. state := "0,1,2,3,5"
  314. psoState := "'订单付款','订单结算'"
  315. mallState := "1,2,3"
  316. o2oState := "1,2,3,4"
  317. b2cState := "1,2,3,4"
  318. if acqCfg.SuccessConditions.FirstOrder.Status == "1" {
  319. state = "1,2,3,5"
  320. mallState = "2,3"
  321. o2oState = "2,3,4"
  322. b2cState = "2,3,4"
  323. }
  324. if acqCfg.SuccessConditions.FirstOrder.Status == "2" {
  325. state = "2,3,5"
  326. mallState = "2,3"
  327. o2oState = "2,3,4"
  328. b2cState = "2,3,4"
  329. }
  330. if acqCfg.SuccessConditions.FirstOrder.Status == "3" {
  331. state = "3,5"
  332. mallState = "3"
  333. o2oState = "3,4"
  334. b2cState = "3,4"
  335. psoState = "'订单结算'"
  336. }
  337. arr := []string{"ol.paid_price", "ord_list", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + state + ")"}
  338. count := sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  339. arr = []string{"ol.paid_price", "privilege_card_ord", "ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state=1"}
  340. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  341. arr = []string{"ol.commission", "duoyou_ord_list", "ord_list_relate", "ol.oid", "ol.uid", " and ol.id>0"}
  342. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  343. arr = []string{"ol.amount", "recharge_order", "ord_list_relate", "ol.oid", "ol.uid", " and ol.status<>'已退款'"}
  344. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  345. arr = []string{"ol.amount", "playlet_sale_order", "ord_list_relate", "ol.custom_oid", "ol.uid", " and ol.status in(" + psoState + ")"}
  346. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  347. arr = []string{"ol.cost_price", "mall_ord", "mall_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + mallState + ")"}
  348. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  349. arr = []string{"ol.cost_price", "o2o_ord", "o2o_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + o2oState + ")"}
  350. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  351. arr = []string{"ol.actual_pay_amount", "o2o_pay_to_merchant", "o2o_ord_list_relate", "ol.pay_id", "ol.uid", " and ol.state >=1"}
  352. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  353. arr = []string{"ol.cost_price", "b2c_ord", "b2c_ord_list_relate", "ol.ord_id", "ol.uid", " and ol.state in(" + b2cState + ")"}
  354. count += sqlSelect(eg, uid, endTime, types, acqCfg, arr)
  355. if count > 0 {
  356. return true
  357. }
  358. return false
  359. }