Browse Source

update

three
DengBiao 1 year ago
parent
commit
8a25ec1226
2 changed files with 11 additions and 4 deletions
  1. +2
    -2
      app/db/db_master_area_visits_flows.go
  2. +9
    -2
      consume/zhios_user_visit_ip_address_consume.go

+ 2
- 2
app/db/db_master_area_visits_flows.go View File

@@ -29,8 +29,8 @@ func MasterAreaVisitsFlowsInsert(Db *xorm.Engine, MasterAreaVisitsFlows *model.M
return MasterAreaVisitsFlows.Id, nil
}

// BatchAddMasterAreaVisitsFlowss 批量新增数据
func BatchAddMasterAreaVisitsFlowss(Db *xorm.Engine, MasterAreaVisitsFlowsData []*model.MasterAreaVisitsFlows) (int64, error) {
// BatchAddMasterAreaVisitsFlows 批量新增数据
func BatchAddMasterAreaVisitsFlows(Db *xorm.Engine, MasterAreaVisitsFlowsData []*model.MasterAreaVisitsFlows) (int64, error) {
affected, err := Db.Insert(MasterAreaVisitsFlowsData)
if err != nil {
return 0, err


+ 9
- 2
consume/zhios_user_visit_ip_address_consume.go View File

@@ -16,6 +16,8 @@ import (
"xorm.io/xorm"
)

var data []*model.MasterAreaVisitsFlows

func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
@@ -27,7 +29,7 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) {
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
ch.Qos(10)
delivery := ch.Consume(queue.Name)

var res amqp.Delivery
@@ -40,6 +42,10 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) {
err = handleUserVisitIpAddress(res.Body)
//_ = res.Reject(false)
_ = res.Ack(true)
if len(data) >= 100 {
db.BatchAddMasterAreaVisitsFlows(db.DataDb, data)
data = []*model.MasterAreaVisitsFlows{}
}
} else {
panic(errors.New("error getting message"))
}
@@ -78,7 +84,7 @@ func handleUserVisitIpAddress(msg []byte) error {
if cityName == "" {
cityName = msgStruct.City
}
db.MasterAreaVisitsFlowsInsert(db.DataDb, &model.MasterAreaVisitsFlows{
data = append(data, &model.MasterAreaVisitsFlows{
Ip: msgStruct.Ip,
MasterId: utils.StrToInt(msgStruct.MasterId),
Date: today,
@@ -90,6 +96,7 @@ func handleUserVisitIpAddress(msg []byte) error {
CreateAt: now,
UpdateAt: now,
})
//db.MasterAreaVisitsFlowsInsert(db.DataDb, )
}
//4、加入到缓存map中
cache.HSet(cacheKey, msgStruct.Ip, msgStruct.MasterId)


Loading…
Cancel
Save