|
|
@@ -59,6 +59,7 @@ func handleEggCanalInviteUserNumsConsume(msg []byte) error { |
|
|
|
var canalMsg *md.CanalUserRelateMessage[md.CanalUserRelate] |
|
|
|
err := json.Unmarshal(msg, &canalMsg) |
|
|
|
if err != nil { |
|
|
|
fmt.Println("EggCanalInviteUserNumsConsumeFaliedUnMarshal_ERR:::::", err.Error()) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
@@ -70,19 +71,22 @@ func handleEggCanalInviteUserNumsConsume(msg []byte) error { |
|
|
|
if canalMsg.Type == md2.CanalMsgInsertSqlType { |
|
|
|
for _, item := range canalMsg.Data { |
|
|
|
parentUid := item.ParentUid |
|
|
|
id := fmt.Sprintf("%d%d-%d", year, week, parentUid) |
|
|
|
id := fmt.Sprintf("%d%d_%s", year, week, parentUid) |
|
|
|
|
|
|
|
// 新增拉新人数 |
|
|
|
script := elastic.NewScript("ctx._source.invite_user_nums += params.inc").Param("inc", 1) |
|
|
|
_, err = es.EsClient.Update(). |
|
|
|
service, err := es.EsClient.Update(). |
|
|
|
Index(index). |
|
|
|
Id(id). |
|
|
|
Script(script). |
|
|
|
Do(context.Background()) |
|
|
|
if err != nil { |
|
|
|
fmt.Println("EggCanalInviteUserNumsConsumeFailedUpdateInviteUserNums_ERR:::::", err.Error()) |
|
|
|
return err |
|
|
|
} |
|
|
|
fmt.Println(service) |
|
|
|
} |
|
|
|
} |
|
|
|
fmt.Println("EggCanalInviteUserNumsConsume_Succeed::::") |
|
|
|
return nil |
|
|
|
} |