From 8a25ec1226cf859ad9c30aabf6f9ad61d30af2a2 Mon Sep 17 00:00:00 2001 From: DengBiao <2319963317@qq.com> Date: Tue, 7 Mar 2023 18:12:33 +0800 Subject: [PATCH] update --- app/db/db_master_area_visits_flows.go | 4 ++-- consume/zhios_user_visit_ip_address_consume.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/app/db/db_master_area_visits_flows.go b/app/db/db_master_area_visits_flows.go index 902a9a1..849207f 100644 --- a/app/db/db_master_area_visits_flows.go +++ b/app/db/db_master_area_visits_flows.go @@ -29,8 +29,8 @@ func MasterAreaVisitsFlowsInsert(Db *xorm.Engine, MasterAreaVisitsFlows *model.M return MasterAreaVisitsFlows.Id, nil } -// BatchAddMasterAreaVisitsFlowss 批量新增数据 -func BatchAddMasterAreaVisitsFlowss(Db *xorm.Engine, MasterAreaVisitsFlowsData []*model.MasterAreaVisitsFlows) (int64, error) { +// BatchAddMasterAreaVisitsFlows 批量新增数据 +func BatchAddMasterAreaVisitsFlows(Db *xorm.Engine, MasterAreaVisitsFlowsData []*model.MasterAreaVisitsFlows) (int64, error) { affected, err := Db.Insert(MasterAreaVisitsFlowsData) if err != nil { return 0, err diff --git a/consume/zhios_user_visit_ip_address_consume.go b/consume/zhios_user_visit_ip_address_consume.go index f76742a..6f57d23 100644 --- a/consume/zhios_user_visit_ip_address_consume.go +++ b/consume/zhios_user_visit_ip_address_consume.go @@ -16,6 +16,8 @@ import ( "xorm.io/xorm" ) +var data []*model.MasterAreaVisitsFlows + func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") ch, err := rabbit.Cfg.Pool.GetChannel() @@ -27,7 +29,7 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { //1、将自己绑定到交换机上 ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) //2、取出数据进行消费 - ch.Qos(1) + ch.Qos(10) delivery := ch.Consume(queue.Name) var res amqp.Delivery @@ -40,6 +42,10 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { err = handleUserVisitIpAddress(res.Body) //_ = res.Reject(false) _ = res.Ack(true) + if len(data) >= 100 { + db.BatchAddMasterAreaVisitsFlows(db.DataDb, data) + data = []*model.MasterAreaVisitsFlows{} + } } else { panic(errors.New("error getting message")) } @@ -78,7 +84,7 @@ func handleUserVisitIpAddress(msg []byte) error { if cityName == "" { cityName = msgStruct.City } - db.MasterAreaVisitsFlowsInsert(db.DataDb, &model.MasterAreaVisitsFlows{ + data = append(data, &model.MasterAreaVisitsFlows{ Ip: msgStruct.Ip, MasterId: utils.StrToInt(msgStruct.MasterId), Date: today, @@ -90,6 +96,7 @@ func handleUserVisitIpAddress(msg []byte) error { CreateAt: now, UpdateAt: now, }) + //db.MasterAreaVisitsFlowsInsert(db.DataDb, ) } //4、加入到缓存map中 cache.HSet(cacheKey, msgStruct.Ip, msgStruct.MasterId)