|
@@ -11,10 +11,12 @@ import ( |
|
|
"encoding/json" |
|
|
"encoding/json" |
|
|
"errors" |
|
|
"errors" |
|
|
"fmt" |
|
|
"fmt" |
|
|
|
|
|
"github.com/cc14514/go-geoip2" |
|
|
|
|
|
geoip2db "github.com/cc14514/go-geoip2-db" |
|
|
"github.com/streadway/amqp" |
|
|
"github.com/streadway/amqp" |
|
|
|
|
|
"net" |
|
|
"strings" |
|
|
"strings" |
|
|
"time" |
|
|
"time" |
|
|
"xorm.io/xorm" |
|
|
|
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
var data []*model.MasterAreaVisitsFlows |
|
|
var data []*model.MasterAreaVisitsFlows |
|
@@ -26,6 +28,9 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { |
|
|
logx.Error(err) |
|
|
logx.Error(err) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
geoIp2db, _ := geoip2db.NewGeoipDbByStatik() |
|
|
|
|
|
defer geoIp2db.Close() |
|
|
|
|
|
|
|
|
defer ch.Release() |
|
|
defer ch.Release() |
|
|
//1、将自己绑定到交换机上 |
|
|
//1、将自己绑定到交换机上 |
|
|
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) |
|
|
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) |
|
@@ -40,7 +45,7 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { |
|
|
if ok == true { |
|
|
if ok == true { |
|
|
//fmt.Println(string(res.Body)) |
|
|
//fmt.Println(string(res.Body)) |
|
|
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") |
|
|
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") |
|
|
err = handleUserVisitIpAddress(res.Body) |
|
|
|
|
|
|
|
|
err = handleUserVisitIpAddress(res.Body, geoIp2db) |
|
|
//_ = res.Reject(false) |
|
|
//_ = res.Reject(false) |
|
|
_ = res.Ack(true) |
|
|
_ = res.Ack(true) |
|
|
if len(data) >= 100 { |
|
|
if len(data) >= 100 { |
|
@@ -54,7 +59,7 @@ func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { |
|
|
fmt.Println("get msg done") |
|
|
fmt.Println("get msg done") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func handleUserVisitIpAddress(msg []byte) error { |
|
|
|
|
|
|
|
|
func handleUserVisitIpAddress(msg []byte, geoIp2db *geoip2.DBReader) error { |
|
|
now := time.Now() |
|
|
now := time.Now() |
|
|
today := now.Format("2006-01-02") |
|
|
today := now.Format("2006-01-02") |
|
|
var tmpString string |
|
|
var tmpString string |
|
@@ -89,7 +94,7 @@ func handleUserVisitIpAddress(msg []byte) error { |
|
|
//if get == "" { |
|
|
//if get == "" { |
|
|
if true { |
|
|
if true { |
|
|
//2、分析ip归属地 |
|
|
//2、分析ip归属地 |
|
|
countryName, provinceId, cityId, provinceName, cityName := getIpAddress(db.DBs[msgStruct.MasterId], msgStruct) |
|
|
|
|
|
|
|
|
countryName, provinceId, cityId, provinceName, cityName := getIpAddress(geoIp2db, msgStruct) |
|
|
if provinceId != "" || cityId != "" || countryName != "" || provinceName != "" || cityName != "" { |
|
|
if provinceId != "" || cityId != "" || countryName != "" || provinceName != "" || cityName != "" { |
|
|
//3、插入`master_area_visits_flows` 表 |
|
|
//3、插入`master_area_visits_flows` 表 |
|
|
if provinceName == "" { |
|
|
if provinceName == "" { |
|
@@ -121,7 +126,14 @@ func handleUserVisitIpAddress(msg []byte) error { |
|
|
return nil |
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func getIpAddress(Db *xorm.Engine, message md.ZhiOsUserVisitIpAddressMessage) (countryName, provinceId, cityId, provinceName, cityName string) { |
|
|
|
|
|
|
|
|
func getIpAddress(geoIp2db *geoip2.DBReader, message md.ZhiOsUserVisitIpAddressMessage) (countryName, provinceId, cityId, provinceName, cityName string) { |
|
|
|
|
|
record, _ := geoIp2db.City(net.ParseIP(message.Ip)) |
|
|
|
|
|
if record.Country.Names != nil && record.Subdivisions != nil && record.City.Names != nil { |
|
|
|
|
|
message.Country = record.Country.Names["zh-CN"] |
|
|
|
|
|
message.Province = record.Subdivisions[0].Names["zh-CN"] |
|
|
|
|
|
message.City = record.City.Names["zh-CN"] |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
countryName = message.Country |
|
|
countryName = message.Country |
|
|
if countryName == "中国" && message.Province != "" { |
|
|
if countryName == "中国" && message.Province != "" { |
|
|
if message.Province == "闽" { |
|
|
if message.Province == "闽" { |
|
|