package svc import ( "applet/app/db" "applet/app/db/im/model" md "applet/app/md/im" "applet/app/pkg/pb" "applet/app/utils" "code.fnuoos.com/EggPlanet/egg_models.git/src/implement" model2 "code.fnuoos.com/EggPlanet/egg_models.git/src/model" md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md" "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" "fmt" "math" "strings" "time" ) func BatchSendGroupMessage(req *md.BatchSendGroupMessageReq) error { now := time.Now() record := model2.ImGroupBatchSendMessageRecords{ SendKind: req.SendKind, Kind: req.Kind, Content: req.Content, SendTime: now.Format("2006-01-02 15:04:05"), State: 1, SendCondition: req.SendCondition, NotCondition: req.NotCondition, CreateTime: now.Format("2006-01-02 15:04:05"), UpdateTime: now.Format("2006-01-02 15:04:05"), } recordsDb := implement.NewImGroupBatchSendMessageRecordsDb(db.Db) recordID, err := recordsDb.ImGroupBatchSendMessageRecordsInsert(&record) if err != nil { return err } go DealGroupMessage(recordID, req) return nil } func BatchSendUserMessage(req *md.BatchSendUserMessageReq) error { now := time.Now() record := model2.ImUserBatchSendMessageRecords{ SendKind: req.SendKind, Kind: req.Kind, Content: req.Content, SendTime: now.Format("2006-01-02 15:04:05"), State: 1, SendCondition: req.SendCondition, NotCondition: req.NotCondition, CreateTime: now.Format("2006-01-02 15:04:05"), UpdateTime: now.Format("2006-01-02 15:04:05"), } recordsDb := implement.NewImUserBatchSendMessageRecordsDb(db.Db) recordID, err := recordsDb.ImUserBatchSendMessageRecordsInsert(&record) if err != nil { return err } go DealUserMessage(recordID, req) return nil } func DealGroupMessage(recordID int, req *md.BatchSendGroupMessageReq) { var groupIDs []string var err error recordsDb := implement.NewImGroupBatchSendMessageRecordsDb(db.Db) ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { BatchSendGroupMessageSetFailed(recordID) return } defer ch.Release() if req.SendKind == 2 { // 1.发送给所有群聊 page := 1 limit := 100 for { var groupWithOwners []model.GroupWithOwner // 1.1 分批查询所有群聊ID 和 群主 ID err = db.DbIm.Table("group"). Join("INNER", "group_user", "group.id = group_user.group_id"). Where("group_user.member_type = 1"). Limit(limit, (page-1)*limit). Find(&groupWithOwners) if err != nil { fmt.Println("DealGroupMessage_ERR:::::", err.Error()) BatchSendGroupMessageSetFailed(recordID) return } for _, groupWithOwner := range groupWithOwners { data := md2.IMEggEnergyStructForBatchSendMessageData{ ReceiverType: int(pb.ReceiverType_RT_GROUP), MessageType: req.Kind, SendId: groupWithOwner.OwnerID, ReceiveIMId: groupWithOwner.GroupID, Content: req.Content, } // 1.2 逐个推送到 mq ch.Publish(md2.IMEggEnergyExchange, data, md2.IMEggEnergyRoutKeyForBatchSendMessageData) // 1.3 记录推送的群ID groupIDs = append(groupIDs, utils.Int64ToStr(groupWithOwner.GroupID)) } if len(groupWithOwners) < limit { break } page++ } } else { var sendCondition, notCondition []string if req.SendCondition != "" { sendCondition = strings.Split(req.SendCondition, ";") } if req.NotCondition != "" { notCondition = strings.Split(req.NotCondition, ";") } // 1.1 去掉发送条件中存在的不发送条件中的内容 if len(notCondition) != 0 && len(sendCondition) != 0 { sendCondition = utils.DiffArray(sendCondition, notCondition) } if len(sendCondition) != 0 { var groupUsers []model.GroupUser err = db.DbIm.Table("group_user"). In("group_id", sendCondition). Where("member_type = 1"). Find(&groupUsers) if err != nil { fmt.Println("DealGroupMessage_ERR:::::", err.Error()) BatchSendGroupMessageSetFailed(recordID) return } for _, groupUser := range groupUsers { data := md2.IMEggEnergyStructForBatchSendMessageData{ ReceiverType: int(pb.ReceiverType_RT_GROUP), MessageType: req.Kind, SendId: groupUser.UserId, ReceiveIMId: groupUser.GroupId, Content: req.Content, } // 1.2 逐个推送到 mq ch.Publish(md2.IMEggEnergyExchange, data, md2.IMEggEnergyRoutKeyForBatchSendMessageData) groupIDs = sendCondition } } else if len(notCondition) != 0 && len(sendCondition) == 0 { var groupUsers []model.GroupUser err = db.DbIm.Table("group_user"). NotIn("group_id", sendCondition). Where("member_type = 1"). Find(&groupUsers) if err != nil { fmt.Println("DealGroupMessage_ERR:::::", err.Error()) BatchSendGroupMessageSetFailed(recordID) return } for _, groupUser := range groupUsers { data := md2.IMEggEnergyStructForBatchSendMessageData{ ReceiverType: int(pb.ReceiverType_RT_GROUP), MessageType: req.Kind, SendId: groupUser.UserId, ReceiveIMId: groupUser.GroupId, Content: req.Content, } // 1.2 逐个推送到 mq ch.Publish(md2.IMEggEnergyExchange, data, md2.IMEggEnergyRoutKeyForBatchSendMessageData) // 1.3 记录推送的群ID groupIDs = append(groupIDs, utils.Int64ToStr(groupUser.GroupId)) } } } m := model2.ImGroupBatchSendMessageRecords{} groupIDsStr := strings.Join(groupIDs, ";") m.GroupId = groupIDsStr _, err1 := recordsDb.ImGroupBatchSendMessageRecordsUpdate(recordID, &m, "group_id") if err1 != nil { fmt.Println("DealGroupMessage_ERR:::::", err1.Error()) return } } func DealUserMessage(recordID int, req *md.BatchSendUserMessageReq) { var userIDs []string var userIMIDs []string var err error recordsDb := implement.NewImUserBatchSendMessageRecordsDb(db.Db) userDb := implement.NewUserDb(db.Db) ch, err := rabbit.Cfg.Pool.GetChannel() if err != nil { BatchSendGroupMessageSetFailed(recordID) return } defer ch.Release() if req.SendKind == 1 { // 1. 发送给所有用户 page := 1 limit := 100 for { var tempUserIds []string var phoneIds []string users, _, err := userDb.UserFindAndCount(0, "", "", 0, page, limit) if err != nil { fmt.Println("DealUserMessage_ERR:::::", err.Error()) BatchSendUserMessageSetFailed(recordID) return } for _, user := range *users { tempUserIds = append(tempUserIds, utils.Int64ToStr(user.Id)) phoneIds = append(phoneIds, user.Phone) } userIDs = append(userIDs, tempUserIds...) var customerServiceWithUsers []model.CustomerServiceWithUser err = db.DbIm.Table("user"). Join("INNER", "customer_service", "customer_service.id = user.customer_id"). In("phone", phoneIds). Where("customer_service.state = 1"). Find(&customerServiceWithUsers) if err != nil { fmt.Println("DealUserMessage_ERR:::::", err.Error()) BatchSendUserMessageSetFailed(recordID) return } for _, customerServiceWithUser := range customerServiceWithUsers { data := md2.IMEggEnergyStructForBatchSendMessageData{ ReceiverType: int(pb.ReceiverType_RT_USER), MessageType: req.Kind, SendId: customerServiceWithUser.CustomerServiceUID, ReceiveIMId: customerServiceWithUser.UserIMID, Content: req.Content, } // 1.2 逐个推送到 mq ch.Publish(md2.IMEggEnergyExchange, data, md2.IMEggEnergyRoutKeyForBatchSendMessageData) // 1.3 记录推送的用户IM ID userIMIDs = append(userIMIDs, utils.Int64ToStr(customerServiceWithUser.UserIMID)) } if len(*users) < limit { break } page++ } } else { // 2.发送给指定用户 var sendCondition, notCondition []string if req.SendCondition != "" { sendCondition = strings.Split(req.SendCondition, ";") } if req.NotCondition != "" { notCondition = strings.Split(req.NotCondition, ";") } // 1.1 去掉发送条件中存在的不发送条件中的内容 if len(notCondition) != 0 && len(sendCondition) != 0 { sendCondition = utils.DiffArray(sendCondition, notCondition) } if len(sendCondition) != 0 { limit := 100 page := 1 conditionLen := len(sendCondition) for { tempPhoneList := sendCondition[limit*(page-1) : int(math.Min(float64(limit*page), float64(conditionLen)))] var customerServiceWithUsers []model.CustomerServiceWithUser err = db.DbIm.Table("user"). Join("INNER", "customer_service", "customer_service.id = user.customer_id"). In("phone", tempPhoneList). Where("customer_service.state = 1"). Find(&customerServiceWithUsers) if err != nil { fmt.Println("DealUserMessage_ERR:::::", err.Error()) BatchSendUserMessageSetFailed(recordID) return } for _, customerServiceWithUser := range customerServiceWithUsers { data := md2.IMEggEnergyStructForBatchSendMessageData{ ReceiverType: int(pb.ReceiverType_RT_USER), MessageType: req.Kind, SendId: customerServiceWithUser.CustomerServiceUID, ReceiveIMId: customerServiceWithUser.UserIMID, Content: req.Content, } // 1.2 逐个推送到 mq ch.Publish(md2.IMEggEnergyExchange, data, md2.IMEggEnergyRoutKeyForBatchSendMessageData) // 1.3 记录推送的群ID userIMIDs = append(userIMIDs, utils.Int64ToStr(customerServiceWithUser.UserIMID)) } users, err := userDb.UserFindByParams(map[string]interface{}{ "key": "phone", "value": tempPhoneList, }) if err != nil { fmt.Println("DealUserMessage_ERR:::::", err.Error()) BatchSendUserMessageSetFailed(recordID) return } for _, user := range users { userIDs = append(userIDs, utils.Int64ToStr(user.Id)) } page++ if limit*(page-1) > conditionLen { break } } } else if len(notCondition) != 0 && len(sendCondition) == 0 { page := 1 limit := 100 for { var tempUserIds []string var phoneIds []string temp := map[string]interface{}{ "key": "phone", "value": notCondition, } users, _, err := userDb.UserFindNotInByParamsByPage(page, limit, temp) if err != nil { fmt.Println("DealUserMessage_ERR:::::", err.Error()) BatchSendUserMessageSetFailed(recordID) return } for _, user := range users { tempUserIds = append(tempUserIds, utils.Int64ToStr(user.Id)) phoneIds = append(phoneIds, user.Phone) } userIDs = append(userIDs, tempUserIds...) var customerServiceWithUsers []model.CustomerServiceWithUser err = db.DbIm.Table("user"). Join("INNER", "customer_service", "customer_service.id = user.customer_id"). In("phone", phoneIds). Where("customer_service.state = 1"). Find(&customerServiceWithUsers) for _, customerServiceWithUser := range customerServiceWithUsers { data := md2.IMEggEnergyStructForBatchSendMessageData{ ReceiverType: int(pb.ReceiverType_RT_USER), MessageType: req.Kind, SendId: customerServiceWithUser.CustomerServiceUID, ReceiveIMId: customerServiceWithUser.UserIMID, Content: req.Content, } // 1.2 逐个推送到 mq ch.Publish(md2.IMEggEnergyExchange, data, md2.IMEggEnergyRoutKeyForBatchSendMessageData) // 1.3 记录推送的用户IM ID userIMIDs = append(userIMIDs, utils.Int64ToStr(customerServiceWithUser.UserIMID)) } if len(users) < limit { break } page++ } } } m := model2.ImUserBatchSendMessageRecords{} uIDStr := strings.Join(userIDs, ";") m.Uid = uIDStr userIMIdStr := strings.Join(userIMIDs, ";") m.ImUid = userIMIdStr _, err1 := recordsDb.ImUserBatchSendMessageRecordsUpdate(recordID, &m, "im_uid", "uid") if err1 != nil { fmt.Println("DealUserMessage_ERR:::::", err1.Error()) return } } func BatchSendUserMessageSetFailed(recordID int) { recordsDb := implement.NewImUserBatchSendMessageRecordsDb(db.Db) m := model2.ImUserBatchSendMessageRecords{ State: 2, } _, err := recordsDb.ImUserBatchSendMessageRecordsUpdate(recordID, &m) if err != nil { fmt.Println("DealUserMessage_ERR:::::", err.Error()) return } } func BatchSendGroupMessageSetFailed(recordID int) { recordsDb := implement.NewImGroupBatchSendMessageRecordsDb(db.Db) m := model2.ImGroupBatchSendMessageRecords{ State: 2, } _, err := recordsDb.ImGroupBatchSendMessageRecordsUpdate(recordID, &m) if err != nil { fmt.Println("DealGroupMessage_ERR:::::", err.Error()) return } }