蛋蛋星球 后台端
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

384 lines
12 KiB

  1. package svc
  2. import (
  3. "applet/app/db"
  4. "applet/app/db/im/model"
  5. md "applet/app/md/im"
  6. "applet/app/pkg/pb"
  7. "applet/app/utils"
  8. "code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
  9. model2 "code.fnuoos.com/EggPlanet/egg_models.git/src/model"
  10. md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
  11. "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
  12. "fmt"
  13. "math"
  14. "strings"
  15. "time"
  16. )
  17. func BatchSendGroupMessage(req *md.BatchSendGroupMessageReq) error {
  18. now := time.Now()
  19. record := model2.ImGroupBatchSendMessageRecords{
  20. SendKind: req.SendKind,
  21. Kind: req.Kind,
  22. Content: req.Content,
  23. SendTime: now.Format("2006-01-02 15:04:05"),
  24. State: 1,
  25. SendCondition: req.SendCondition,
  26. NotCondition: req.NotCondition,
  27. CreateTime: now.Format("2006-01-02 15:04:05"),
  28. UpdateTime: now.Format("2006-01-02 15:04:05"),
  29. }
  30. recordsDb := implement.NewImGroupBatchSendMessageRecordsDb(db.Db)
  31. recordID, err := recordsDb.ImGroupBatchSendMessageRecordsInsert(&record)
  32. if err != nil {
  33. return err
  34. }
  35. go DealGroupMessage(recordID, req)
  36. return nil
  37. }
  38. func BatchSendUserMessage(req *md.BatchSendUserMessageReq) error {
  39. now := time.Now()
  40. record := model2.ImUserBatchSendMessageRecords{
  41. SendKind: req.SendKind,
  42. Kind: req.Kind,
  43. Content: req.Content,
  44. SendTime: now.Format("2006-01-02 15:04:05"),
  45. State: 1,
  46. SendCondition: req.SendCondition,
  47. NotCondition: req.NotCondition,
  48. CreateTime: now.Format("2006-01-02 15:04:05"),
  49. UpdateTime: now.Format("2006-01-02 15:04:05"),
  50. }
  51. recordsDb := implement.NewImUserBatchSendMessageRecordsDb(db.Db)
  52. recordID, err := recordsDb.ImUserBatchSendMessageRecordsInsert(&record)
  53. if err != nil {
  54. return err
  55. }
  56. go DealUserMessage(recordID, req)
  57. return nil
  58. }
  59. func DealGroupMessage(recordID int, req *md.BatchSendGroupMessageReq) {
  60. var groupIDs []string
  61. var err error
  62. recordsDb := implement.NewImGroupBatchSendMessageRecordsDb(db.Db)
  63. ch, err := rabbit.Cfg.Pool.GetChannel()
  64. if err != nil {
  65. BatchSendGroupMessageSetFailed(recordID)
  66. return
  67. }
  68. defer ch.Release()
  69. if req.SendKind == 2 {
  70. // 1.发送给所有群聊
  71. page := 1
  72. limit := 100
  73. for {
  74. var groupWithOwners []model.GroupWithOwner
  75. // 1.1 分批查询所有群聊ID 和 群主 ID
  76. err = db.DbIm.Table("group").
  77. Join("INNER", "group_user", "group.id = group_user.group_id").
  78. Where("group_user.member_type = 1").
  79. Limit(limit, (page-1)*limit).
  80. Find(&groupWithOwners)
  81. if err != nil {
  82. fmt.Println("DealGroupMessage_ERR:::::", err.Error())
  83. BatchSendGroupMessageSetFailed(recordID)
  84. return
  85. }
  86. for _, groupWithOwner := range groupWithOwners {
  87. data := md2.IMEggEnergyStructForBatchSendMessageData{
  88. ReceiverType: int(pb.ReceiverType_RT_GROUP),
  89. MessageType: req.Kind,
  90. SendId: groupWithOwner.OwnerID,
  91. ReceiveIMId: groupWithOwner.GroupID,
  92. Content: req.Content,
  93. }
  94. // 1.2 逐个推送到 mq
  95. ch.Publish(md2.IMEggEnergyExchange, utils.SerializeStr(data), md2.IMEggEnergyRoutKeyForBatchSendMessageData)
  96. // 1.3 记录推送的群ID
  97. groupIDs = append(groupIDs, utils.Int64ToStr(groupWithOwner.GroupID))
  98. }
  99. if len(groupWithOwners) < limit {
  100. break
  101. }
  102. page++
  103. }
  104. } else {
  105. var sendCondition, notCondition []string
  106. if req.SendCondition != "" {
  107. sendCondition = strings.Split(req.SendCondition, ";")
  108. }
  109. if req.NotCondition != "" {
  110. notCondition = strings.Split(req.NotCondition, ";")
  111. }
  112. // 1.1 去掉发送条件中存在的不发送条件中的内容
  113. if len(notCondition) != 0 && len(sendCondition) != 0 {
  114. sendCondition = utils.DiffArray(sendCondition, notCondition)
  115. }
  116. if len(sendCondition) != 0 {
  117. var groupUsers []model.GroupUser
  118. err = db.DbIm.Table("group_user").
  119. In("group_id", sendCondition).
  120. Where("member_type = 1").
  121. Find(&groupUsers)
  122. if err != nil {
  123. fmt.Println("DealGroupMessage_ERR:::::", err.Error())
  124. BatchSendGroupMessageSetFailed(recordID)
  125. return
  126. }
  127. for _, groupUser := range groupUsers {
  128. data := md2.IMEggEnergyStructForBatchSendMessageData{
  129. ReceiverType: int(pb.ReceiverType_RT_GROUP),
  130. MessageType: req.Kind,
  131. SendId: groupUser.UserId,
  132. ReceiveIMId: groupUser.GroupId,
  133. Content: req.Content,
  134. }
  135. // 1.2 逐个推送到 mq
  136. ch.Publish(md2.IMEggEnergyExchange, utils.SerializeStr(data), md2.IMEggEnergyRoutKeyForBatchSendMessageData)
  137. groupIDs = sendCondition
  138. }
  139. } else if len(notCondition) != 0 && len(sendCondition) == 0 {
  140. var groupUsers []model.GroupUser
  141. err = db.DbIm.Table("group_user").
  142. NotIn("group_id", sendCondition).
  143. Where("member_type = 1").
  144. Find(&groupUsers)
  145. if err != nil {
  146. fmt.Println("DealGroupMessage_ERR:::::", err.Error())
  147. BatchSendGroupMessageSetFailed(recordID)
  148. return
  149. }
  150. for _, groupUser := range groupUsers {
  151. data := md2.IMEggEnergyStructForBatchSendMessageData{
  152. ReceiverType: int(pb.ReceiverType_RT_GROUP),
  153. MessageType: req.Kind,
  154. SendId: groupUser.UserId,
  155. ReceiveIMId: groupUser.GroupId,
  156. Content: req.Content,
  157. }
  158. // 1.2 逐个推送到 mq
  159. ch.Publish(md2.IMEggEnergyExchange, utils.SerializeStr(data), md2.IMEggEnergyRoutKeyForBatchSendMessageData)
  160. // 1.3 记录推送的群ID
  161. groupIDs = append(groupIDs, utils.Int64ToStr(groupUser.GroupId))
  162. }
  163. }
  164. }
  165. m := model2.ImGroupBatchSendMessageRecords{}
  166. groupIDsStr := strings.Join(groupIDs, ";")
  167. m.GroupId = groupIDsStr
  168. _, err1 := recordsDb.ImGroupBatchSendMessageRecordsUpdate(recordID, &m, "group_id")
  169. if err1 != nil {
  170. fmt.Println("DealGroupMessage_ERR:::::", err1.Error())
  171. return
  172. }
  173. }
  174. func DealUserMessage(recordID int, req *md.BatchSendUserMessageReq) {
  175. var userIDs []string
  176. var userIMIDs []string
  177. var err error
  178. recordsDb := implement.NewImUserBatchSendMessageRecordsDb(db.Db)
  179. userDb := implement.NewUserDb(db.Db)
  180. ch, err := rabbit.Cfg.Pool.GetChannel()
  181. if err != nil {
  182. BatchSendGroupMessageSetFailed(recordID)
  183. return
  184. }
  185. defer ch.Release()
  186. if req.SendKind == 1 {
  187. // 1. 发送给所有用户
  188. page := 1
  189. limit := 100
  190. for {
  191. var tempUserIds []string
  192. var phoneIds []string
  193. users, _, err := userDb.UserFindAndCount(0, "", "", 0, page, limit)
  194. if err != nil {
  195. fmt.Println("DealUserMessage_ERR:::::", err.Error())
  196. BatchSendUserMessageSetFailed(recordID)
  197. return
  198. }
  199. for _, user := range *users {
  200. tempUserIds = append(tempUserIds, utils.Int64ToStr(user.Id))
  201. phoneIds = append(phoneIds, user.Phone)
  202. }
  203. userIDs = append(userIDs, tempUserIds...)
  204. var customerServiceWithUsers []model.CustomerServiceWithUser
  205. err = db.DbIm.Table("user").
  206. Join("INNER", "customer_service", "customer_service.id = user.customer_id").
  207. In("phone_number", phoneIds).
  208. Where("customer_service.state = 1").
  209. Find(&customerServiceWithUsers)
  210. if err != nil {
  211. fmt.Println("DealUserMessage_ERR:::::", err.Error())
  212. BatchSendUserMessageSetFailed(recordID)
  213. return
  214. }
  215. for _, customerServiceWithUser := range customerServiceWithUsers {
  216. data := md2.IMEggEnergyStructForBatchSendMessageData{
  217. ReceiverType: int(pb.ReceiverType_RT_USER),
  218. MessageType: req.Kind,
  219. SendId: customerServiceWithUser.CustomerServiceUID,
  220. ReceiveIMId: customerServiceWithUser.UserIMID,
  221. Content: req.Content,
  222. }
  223. // 1.2 逐个推送到 mq
  224. ch.Publish(md2.IMEggEnergyExchange, utils.SerializeStr(data), md2.IMEggEnergyRoutKeyForBatchSendMessageData)
  225. // 1.3 记录推送的用户IM ID
  226. userIMIDs = append(userIMIDs, utils.Int64ToStr(customerServiceWithUser.UserIMID))
  227. }
  228. if len(*users) < limit {
  229. break
  230. }
  231. page++
  232. }
  233. } else {
  234. // 2.发送给指定用户
  235. var sendCondition, notCondition []string
  236. if req.SendCondition != "" {
  237. sendCondition = strings.Split(req.SendCondition, ";")
  238. }
  239. if req.NotCondition != "" {
  240. notCondition = strings.Split(req.NotCondition, ";")
  241. }
  242. // 1.1 去掉发送条件中存在的不发送条件中的内容
  243. if len(notCondition) != 0 && len(sendCondition) != 0 {
  244. sendCondition = utils.DiffArray(sendCondition, notCondition)
  245. }
  246. if len(sendCondition) != 0 {
  247. limit := 100
  248. page := 1
  249. conditionLen := len(sendCondition)
  250. for {
  251. tempPhoneList := sendCondition[limit*(page-1) : int(math.Min(float64(limit*page), float64(conditionLen)))]
  252. var customerServiceWithUsers []model.CustomerServiceWithUser
  253. err = db.DbIm.Table("user").
  254. Join("INNER", "customer_service", "customer_service.id = user.customer_id").
  255. In("phone_number", tempPhoneList).
  256. Where("customer_service.state = 1").
  257. Find(&customerServiceWithUsers)
  258. if err != nil {
  259. fmt.Println("DealUserMessage_ERR:::::", err.Error())
  260. BatchSendUserMessageSetFailed(recordID)
  261. return
  262. }
  263. for _, customerServiceWithUser := range customerServiceWithUsers {
  264. data := md2.IMEggEnergyStructForBatchSendMessageData{
  265. ReceiverType: int(pb.ReceiverType_RT_USER),
  266. MessageType: req.Kind,
  267. SendId: customerServiceWithUser.CustomerServiceUID,
  268. ReceiveIMId: customerServiceWithUser.UserIMID,
  269. Content: req.Content,
  270. }
  271. // 1.2 逐个推送到 mq
  272. ch.Publish(md2.IMEggEnergyExchange, utils.SerializeStr(data), md2.IMEggEnergyRoutKeyForBatchSendMessageData)
  273. // 1.3 记录推送的群ID
  274. userIMIDs = append(userIMIDs, utils.Int64ToStr(customerServiceWithUser.UserIMID))
  275. }
  276. users, err := userDb.UserFindByParams(map[string]interface{}{
  277. "key": "phone",
  278. "value": tempPhoneList,
  279. })
  280. if err != nil {
  281. fmt.Println("DealUserMessage_ERR:::::", err.Error())
  282. BatchSendUserMessageSetFailed(recordID)
  283. return
  284. }
  285. for _, user := range users {
  286. userIDs = append(userIDs, utils.Int64ToStr(user.Id))
  287. }
  288. page++
  289. if limit*(page-1) > conditionLen {
  290. break
  291. }
  292. }
  293. } else if len(notCondition) != 0 && len(sendCondition) == 0 {
  294. page := 1
  295. limit := 100
  296. for {
  297. var tempUserIds []string
  298. var phoneIds []string
  299. temp := map[string]interface{}{
  300. "key": "phone",
  301. "value": notCondition,
  302. }
  303. users, _, err := userDb.UserFindNotInByParamsByPage(page, limit, temp)
  304. if err != nil {
  305. fmt.Println("DealUserMessage_ERR:::::", err.Error())
  306. BatchSendUserMessageSetFailed(recordID)
  307. return
  308. }
  309. for _, user := range users {
  310. tempUserIds = append(tempUserIds, utils.Int64ToStr(user.Id))
  311. phoneIds = append(phoneIds, user.Phone)
  312. }
  313. userIDs = append(userIDs, tempUserIds...)
  314. var customerServiceWithUsers []model.CustomerServiceWithUser
  315. err = db.DbIm.Table("user").
  316. Join("INNER", "customer_service", "customer_service.id = user.customer_id").
  317. In("phone_number", phoneIds).
  318. Where("customer_service.state = 1").
  319. Find(&customerServiceWithUsers)
  320. for _, customerServiceWithUser := range customerServiceWithUsers {
  321. data := md2.IMEggEnergyStructForBatchSendMessageData{
  322. ReceiverType: int(pb.ReceiverType_RT_USER),
  323. MessageType: req.Kind,
  324. SendId: customerServiceWithUser.CustomerServiceUID,
  325. ReceiveIMId: customerServiceWithUser.UserIMID,
  326. Content: req.Content,
  327. }
  328. // 1.2 逐个推送到 mq
  329. ch.Publish(md2.IMEggEnergyExchange, utils.SerializeStr(data), md2.IMEggEnergyRoutKeyForBatchSendMessageData)
  330. // 1.3 记录推送的用户IM ID
  331. userIMIDs = append(userIMIDs, utils.Int64ToStr(customerServiceWithUser.UserIMID))
  332. }
  333. if len(users) < limit {
  334. break
  335. }
  336. page++
  337. }
  338. }
  339. }
  340. m := model2.ImUserBatchSendMessageRecords{}
  341. uIDStr := strings.Join(userIDs, ";")
  342. m.Uid = uIDStr
  343. userIMIdStr := strings.Join(userIMIDs, ";")
  344. m.ImUid = userIMIdStr
  345. _, err1 := recordsDb.ImUserBatchSendMessageRecordsUpdate(recordID, &m, "im_uid", "uid")
  346. if err1 != nil {
  347. fmt.Println("DealUserMessage_ERR:::::", err1.Error())
  348. return
  349. }
  350. }
  351. func BatchSendUserMessageSetFailed(recordID int) {
  352. recordsDb := implement.NewImUserBatchSendMessageRecordsDb(db.Db)
  353. m := model2.ImUserBatchSendMessageRecords{
  354. State: 2,
  355. }
  356. _, err := recordsDb.ImUserBatchSendMessageRecordsUpdate(recordID, &m)
  357. if err != nil {
  358. fmt.Println("DealUserMessage_ERR:::::", err.Error())
  359. return
  360. }
  361. }
  362. func BatchSendGroupMessageSetFailed(recordID int) {
  363. recordsDb := implement.NewImGroupBatchSendMessageRecordsDb(db.Db)
  364. m := model2.ImGroupBatchSendMessageRecords{
  365. State: 2,
  366. }
  367. _, err := recordsDb.ImGroupBatchSendMessageRecordsUpdate(recordID, &m)
  368. if err != nil {
  369. fmt.Println("DealGroupMessage_ERR:::::", err.Error())
  370. return
  371. }
  372. }