@@ -79,6 +79,7 @@ func handleEggCanalPersonAddActivityValueConsume(msg []byte) error { | |||||
// 2. 监听插入信息 | // 2. 监听插入信息 | ||||
if canalMsg.Type == md2.CanalMsgInsertSqlType { | if canalMsg.Type == md2.CanalMsgInsertSqlType { | ||||
for _, item := range canalMsg.Data { | for _, item := range canalMsg.Data { | ||||
fmt.Println("item======>", item) | |||||
uid := item.Uid | uid := item.Uid | ||||
id := fmt.Sprintf("%d%d_%s", year, week, uid) | id := fmt.Sprintf("%d%d_%s", year, week, uid) | ||||
@@ -87,6 +88,7 @@ func handleEggCanalPersonAddActivityValueConsume(msg []byte) error { | |||||
} | } | ||||
// 3. 增加个人活跃积分 | // 3. 增加个人活跃积分 | ||||
fmt.Println("amount==========>", item.Amount) | |||||
script := elastic.NewScript("ctx._source.person_add_activity_value += params.inc").Param("inc", utils2.StrToInt(item.Amount)) | script := elastic.NewScript("ctx._source.person_add_activity_value += params.inc").Param("inc", utils2.StrToInt(item.Amount)) | ||||
updateDoc, err := es.EsClient.Update(). | updateDoc, err := es.EsClient.Update(). | ||||
Index(index). | Index(index). | ||||
@@ -0,0 +1,51 @@ | |||||
package consume | |||||
import ( | |||||
"applet/app/cfg" | |||||
"applet/app/db" | |||||
"applet/consume/md" | |||||
"code.fnuoos.com/go_rely_warehouse/zyos_go_es.git/es" | |||||
"testing" | |||||
) | |||||
func TestHandleEggCanalPersonAddActivityValueConsume(t *testing.T) { | |||||
dbcfg := cfg.DBCfg{ | |||||
Host: "119.23.182.117:3306", | |||||
Name: "egg", | |||||
User: "root", | |||||
Psw: "Fnuo123com@", | |||||
ShowLog: true, | |||||
MaxLifetime: 30, | |||||
MaxOpenConns: 100, | |||||
MaxIdleConns: 100, | |||||
Path: "tmp/%s.log", | |||||
} | |||||
db.InitDB(&dbcfg) | |||||
es.Init("http://123.57.140.192:9200/", "elastic", "fnuo123") | |||||
data := md.CanalUserVirtualCoinFlow{ | |||||
Id: "", | |||||
Uid: "36", | |||||
CoinId: "3", | |||||
Direction: "1", | |||||
Title: "", | |||||
Amount: "10", | |||||
BeforeAmount: "", | |||||
AfterAmount: "", | |||||
SysFee: "", | |||||
TransferType: "", | |||||
} | |||||
m := md.CanalUserVirtualCoinFlowMessage[md.CanalUserVirtualCoinFlow]{ | |||||
Data: []md.CanalUserVirtualCoinFlow{data}, | |||||
Database: "", | |||||
ES: 0, | |||||
ID: 0, | |||||
IsDdl: false, | |||||
Old: []md.CanalUserVirtualCoinFlow{}, | |||||
PkNames: []string{}, | |||||
Table: "", | |||||
TS: 0, | |||||
Type: "INSERT", | |||||
} | |||||
HandleEggCanalPersonAddActivityValueConsume(&m) | |||||
} |