Browse Source

feat: update es record when user comment

withdrawal
shenjiachi 3 weeks ago
parent
commit
15424499c5
5 changed files with 128 additions and 41 deletions
  1. +19
    -40
      consume/egg_canal_violate_nums_consume.go
  2. +97
    -0
      consume/egg_comment_consume.go
  3. +1
    -0
      consume/init.go
  4. +10
    -0
      consume/md/consume_key.go
  5. +1
    -1
      go.mod

+ 19
- 40
consume/egg_canal_violate_nums_consume.go View File

@@ -12,11 +12,9 @@ import (
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es"
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/streadway/amqp"
"strings"
"time"
@@ -72,8 +70,8 @@ func handleEggCanalViolateNumsConsume(msg []byte) error {
yearStr := utils2.IntToStr(year)
weekStr := utils2.IntToStr(week)
index := es2.GetAppointIndexFromAlias(yearStr, weekStr)
// 2. 监听插入信息
if canalMsg.Type == md2.CanalMsgInsertSqlType {
// 2. 监听插入和删除信息
if canalMsg.Type == md2.CanalMsgInsertSqlType || canalMsg.Type == md2.CanalMsgDeleteSqlType {
tagDb := implement.NewUserTagDb(db.Db)
for _, item := range canalMsg.Data {
tag, err1 := tagDb.UserTagGetOneByParams(map[string]interface{}{
@@ -90,13 +88,22 @@ func handleEggCanalViolateNumsConsume(msg []byte) error {
uid := item.Uid
id := fmt.Sprintf("%d%d_%s", year, week, uid)

// 2.2. 增加违规次数记录
script := elastic.NewScript("ctx._source.violate_nums += params.inc").Param("inc", 1)
_, err = es.EsClient.Update().
Index(index).
Id(id).
Script(script).
Do(context.Background())
// 2.2 计算违规次数
sql := "SELECT COUNT(*) AS total " +
"From user_tag_records " +
"left join user_tag on user_tag_records.tag_id = user_tag.id " +
"WHERE user_tag_records.uid = %s " +
"AND user_tag.is_punish = 1"
sql = fmt.Sprintf(sql, uid)
res, err1 := db.QueryNativeString(db.Db, sql)
if err1 != nil {
return err1
}

// 2.3 更新违规记录
updateMap := make(map[string]interface{})
updateMap["violate_nums"] = res[0]["total"]
updateDocRet, err := es.UpdateDoc(index, id, updateMap)
if err != nil {
if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") {
// 蛋蛋分数据还不存在,创建蛋蛋分数据
@@ -109,37 +116,9 @@ func handleEggCanalViolateNumsConsume(msg []byte) error {
}
return err
}
fmt.Println("updateDocRet ========>", updateDocRet)
}
}
// 3. 监听删除信息
if canalMsg.Type == md2.CanalMsgDeleteSqlType {
tagDb := implement.NewUserTagDb(db.Db)
for _, item := range canalMsg.Data {
tag, err1 := tagDb.UserTagGetOneByParams(map[string]interface{}{
"key": "id",
"value": item.TagId,
})
if err1 != nil {
return err1
}
// 3.1 判断是否为处罚标签
if tag.IsPunish == 0 {
continue
}
uid := item.Uid
id := fmt.Sprintf("%d%d_%s", year, week, uid)

// 3.2 减少违规次数记录
script := elastic.NewScript("ctx._source.violate_nums -= params.dec").Param("dec", 1)
_, err = es.EsClient.Update().
Index(index).
Id(id).
Script(script).
Do(context.Background())
if err != nil {
return err
}
}
}
return nil
}

+ 97
- 0
consume/egg_comment_consume.go View File

@@ -0,0 +1,97 @@
package consume

import (
"applet/app/cfg"
utils2 "applet/app/utils"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/EggPlanet/egg_system_rules.git"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy"
"code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/enum"
md2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/rule/egg_energy/md"
es2 "code.fnuoos.com/EggPlanet/egg_system_rules.git/utils/es"
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/olivere/elastic/v7"
"github.com/streadway/amqp"
"strings"
"time"
)

func EggCommentDataConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>EggCommentDataConsume>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name, false)

egg_system_rules.Init(cfg.RedisAddr)
var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
err = handleEggCommentDataConsume(res.Body)
if err != nil {
fmt.Println("EggCommentDataConsume:::::", err.Error())
utils2.FilePutContents("EggCommentDataConsume_ERR", utils2.SerializeStr(map[string]interface{}{
"body": res.Body,
"err": err.Error(),
}))
}
//_ = res.Reject(false)
err = res.Ack(true)
fmt.Println("err ::: ", err)
} else {
panic(errors.New("error getting message"))
}
}
}

func handleEggCommentDataConsume(msgData []byte) error {
time.Sleep(time.Duration(100) * time.Millisecond) //休眠100毫秒
// 1.解析mq中queue的数据结构体
var msg *md2.EggSendFriendCircleData
err := json.Unmarshal(msgData, &msg)
if err != nil {
return err
}

year, week := time.Now().ISOWeek()
yearStr := utils2.IntToStr(year)
weekStr := utils2.IntToStr(week)
index := es2.GetAppointIndexFromAlias(yearStr, weekStr)
id := fmt.Sprintf("%d%d_%d", year, week, msg.Uid)
script := elastic.NewScript("ctx._source.forum_comments_nums += params.inc").Param("inc", 1)
updateDoc, err := es.EsClient.Update().
Index(index).
Id(id).
Script(script).
Do(context.Background())
if err != nil {
if strings.Contains(err.Error(), "elastic: Error 404 (Not Found)") {
// 蛋蛋分数据还不存在,创建蛋蛋分数据
now := time.Now().Format("2006-01-02 15:04:05")
err1 := egg_energy.CreateEsScoreAndAssignValuesDoc(index, id, msg.Uid, enum.ForumCommentsNums, "1", now)
if err1 != nil {
return err1
}
return nil
}
fmt.Println("EggCommentDataConsumeUpdateDoc_ERR::::", err.Error())
return err
}
fmt.Println("updateDoc==========>", updateDoc)
return nil
}

+ 1
- 0
consume/init.go View File

@@ -38,6 +38,7 @@ func initConsumes() {
jobs[consumeMd.EggRecordActiveDataFunName] = EggRecordActiveDataConsume // 用户签到后更新活跃数据
jobs[consumeMd.EggEnergyAutoScoreDataFunName] = EggEnergyAutoScoreConsume // 自动打分
jobs[consumeMd.EggSendFriendCircleDataFunName] = EggSendFriendCircleDataConsume // 用户发送朋友圈后更新es
jobs[consumeMd.EggCommentDataFunName] = EggCommentDataConsume // 用户评论后更新es
jobs[consumeMd.EggCanalEnergyExchangeAccountFunName] = EggCanalEnergyExchangeAccountConsume // 蛋蛋能量兑换为余额的时候更新es

jobs[consumeMd.VideoRewardFunName] = VideoRewardConsume // 短视频奖励


+ 10
- 0
consume/md/consume_key.go View File

@@ -299,6 +299,15 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "EggSendFriendCircleDataConsume",
},
{
ExchangeName: "egg.app",
Name: "egg_comment_queue",
Type: DirectQueueType,
IsPersistent: false,
RoutKey: "egg_comment",
BindKey: "",
ConsumeFunName: "EggCommentDataConsume",
},
{
ExchangeName: "egg.canal.topic",
Name: "egg_canal_energy_exchange_account_queue",
@@ -342,4 +351,5 @@ const (
EggCanalUserVirtualCoinFlowAggregationConsumeFunName = "EggCanalUserVirtualCoinFlowAggregationConsume"
EggSendFriendCircleDataFunName = "EggSendFriendCircleDataConsume"
EggCanalEnergyExchangeAccountFunName = "EggCanalEnergyExchangeAccountConsume"
EggCommentDataFunName = "EggCommentDataConsume"
)

+ 1
- 1
go.mod View File

@@ -8,7 +8,7 @@ go 1.19

require (
code.fnuoos.com/EggPlanet/egg_models.git v0.2.1-0.20241224090637-89a57f7fbb1e
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241227073118-a441564375e2
code.fnuoos.com/EggPlanet/egg_system_rules.git v0.0.4-0.20241227092843-802cf07ae61c
code.fnuoos.com/go_rely_warehouse/zyos_go_es.git v1.0.1-0.20241118083738-0f22da9ba0be
code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git v0.0.5
github.com/boombuler/barcode v1.0.1


Loading…
Cancel
Save