diff --git a/app/cfg/cfg_app.go b/app/cfg/cfg_app.go index 871fea0..9296bba 100644 --- a/app/cfg/cfg_app.go +++ b/app/cfg/cfg_app.go @@ -14,6 +14,7 @@ type Config struct { RabbitMqAddr string `yaml:"rabbitMq_addr"` RabbitMqAddrTest string `yaml:"rabbitMq_addr_test"` DB DBCfg `yaml:"db"` + DataDB DBCfg `yaml:"data_db"` MQ MQCfg `yaml:"mq"` ES ESCfg `yaml:"es"` Log LogCfg `yaml:"log"` diff --git a/app/cfg/init_cfg.go b/app/cfg/init_cfg.go index accedaf..e96d0c2 100644 --- a/app/cfg/init_cfg.go +++ b/app/cfg/init_cfg.go @@ -15,6 +15,7 @@ var ( SrvAddr string RedisAddr string DB *DBCfg + DataDB *DBCfg MQ *MQCfg ES *ESCfg Log *LogCfg @@ -53,6 +54,7 @@ func InitCfg() { Local = conf.Local CurlDebug = conf.CurlDebug DB = &conf.DB + DataDB = &conf.DataDB Log = &conf.Log ArkID = &conf.ArkID RedisAddr = conf.RedisAddr diff --git a/app/cfg/init_task.go b/app/cfg/init_task.go index d54079e..6ed15e2 100644 --- a/app/cfg/init_task.go +++ b/app/cfg/init_task.go @@ -27,6 +27,7 @@ func InitTaskCfg() { Prd = conf.Prd Debug = conf.Debug DB = &conf.DB + DataDB = &conf.DataDB Log = &conf.Log Admin = &conf.Admin RedisAddr = conf.RedisAddr diff --git a/app/db/data_db.go b/app/db/data_db.go new file mode 100644 index 0000000..6514694 --- /dev/null +++ b/app/db/data_db.go @@ -0,0 +1,50 @@ +package db + +import ( + "fmt" + "os" + "time" + + _ "github.com/go-sql-driver/mysql" //必须导入mysql驱动,否则会panic + "xorm.io/xorm" + "xorm.io/xorm/log" + + "applet/app/cfg" +) + +var DataDb *xorm.Engine + +//根据DB配置文件初始化数据库 +func InitDataDB(c *cfg.DBCfg) error { + var ( + err error + f *os.File + ) + //创建Orm引擎 + if DataDb, err = xorm.NewEngine("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", c.User, c.Psw, c.Host, c.Name)); err != nil { + return err + } + DataDb.SetConnMaxLifetime(c.MaxLifetime * time.Second) //设置最长连接时间 + DataDb.SetMaxOpenConns(c.MaxOpenConns) //设置最大打开连接数 + DataDb.SetMaxIdleConns(c.MaxIdleConns) //设置连接池的空闲数大小 + if err = DataDb.Ping(); err != nil { //尝试ping数据库 + return err + } + if c.ShowLog { //根据配置文件设置日志 + DataDb.ShowSQL(true) //设置是否打印sql + DataDb.Logger().SetLevel(0) //设置日志等级 + //修改日志文件存放路径文件名是%s.log + path := fmt.Sprintf(c.Path, c.Name) + f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0777) + if err != nil { + os.RemoveAll(c.Path) + if f, err = os.OpenFile(c.Path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0777); err != nil { + return err + } + } + logger := log.NewSimpleLogger(f) + logger.ShowSQL(true) + DataDb.SetLogger(logger) + } + return nil +} diff --git a/app/db/db_city.go b/app/db/db_city.go index 20ee2f1..c7c0778 100644 --- a/app/db/db_city.go +++ b/app/db/db_city.go @@ -28,6 +28,17 @@ func CityGetOne(key string) (*model.City, error) { return &City, nil } +func CityGetOneByName(name string) (*model.City, error) { + var City model.City + if has, err := Db.Where("name LIKE ?", "%"+name+"%").Get(&City); err != nil || has == false { + if has == false { + return &City, nil + } + return nil, logx.Error(err) + } + return &City, nil +} + //单条记录获取DB func CityGetWithDb(HKey string) string { cacheKey := md.OfficialCityCacheKey diff --git a/app/db/db_master_area_visits_flows.go b/app/db/db_master_area_visits_flows.go new file mode 100644 index 0000000..902a9a1 --- /dev/null +++ b/app/db/db_master_area_visits_flows.go @@ -0,0 +1,117 @@ +package db + +import ( + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/logx" + "errors" + "fmt" + "reflect" + "xorm.io/xorm" +) + +// BatchSelectMasterAreaVisitsFlows 批量查询数据 TODO::和下面的方法重复了,建议采用下面的 `MasterAreaVisitsFlowsFindByParams` 方法 +func BatchSelectMasterAreaVisitsFlows(Db *xorm.Engine, params map[string]interface{}) (*[]model.MasterAreaVisitsFlows, error) { + var MasterAreaVisitsFlowsData []model.MasterAreaVisitsFlows + if err := Db.In(utils.AnyToString(params["key"]), params["value"]). + Find(&MasterAreaVisitsFlowsData); err != nil { + return nil, logx.Warn(err) + } + return &MasterAreaVisitsFlowsData, nil +} + +// MasterAreaVisitsFlowsInsert 插入单条数据 +func MasterAreaVisitsFlowsInsert(Db *xorm.Engine, MasterAreaVisitsFlows *model.MasterAreaVisitsFlows) (int, error) { + _, err := Db.InsertOne(MasterAreaVisitsFlows) + if err != nil { + return 0, err + } + return MasterAreaVisitsFlows.Id, nil +} + +// BatchAddMasterAreaVisitsFlowss 批量新增数据 +func BatchAddMasterAreaVisitsFlowss(Db *xorm.Engine, MasterAreaVisitsFlowsData []*model.MasterAreaVisitsFlows) (int64, error) { + affected, err := Db.Insert(MasterAreaVisitsFlowsData) + if err != nil { + return 0, err + } + return affected, nil +} + +func GetMasterAreaVisitsFlowsCount(Db *xorm.Engine) int { + var MasterAreaVisitsFlows model.MasterAreaVisitsFlows + session := Db.Where("") + count, err := session.Count(&MasterAreaVisitsFlows) + if err != nil { + return 0 + } + return int(count) +} + +// MasterAreaVisitsFlowsDelete 删除记录 +func MasterAreaVisitsFlowsDelete(Db *xorm.Engine, id interface{}) (int64, error) { + if reflect.TypeOf(id).Kind() == reflect.Slice { + return Db.In("id", id).Delete(model.MasterAreaVisitsFlows{}) + } else { + return Db.Where("id = ?", id).Delete(model.MasterAreaVisitsFlows{}) + } +} + +// MasterAreaVisitsFlowsUpdate 更新记录 +func MasterAreaVisitsFlowsUpdate(Db *xorm.Engine, id interface{}, MasterAreaVisitsFlows *model.MasterAreaVisitsFlows, forceColums ...string) (int64, error) { + var ( + affected int64 + err error + ) + if forceColums != nil { + affected, err = Db.Where("id=?", id).Cols(forceColums...).Update(MasterAreaVisitsFlows) + } else { + affected, err = Db.Where("id=?", id).Update(MasterAreaVisitsFlows) + } + if err != nil { + return 0, err + } + return affected, nil +} + +// MasterAreaVisitsFlowsGetOneByParams 通过传入的参数查询数据(单条) +func MasterAreaVisitsFlowsGetOneByParams(Db *xorm.Engine, params map[string]interface{}) (*model.MasterAreaVisitsFlows, error) { + var m model.MasterAreaVisitsFlows + var query = fmt.Sprintf("%s =?", params["key"]) + if has, err := Db.Where(query, params["value"]).Get(&m); err != nil || has == false { + return nil, logx.Error(err) + } + return &m, nil +} + +// MasterAreaVisitsFlowsFindByParams 通过传入的参数查询数据(多条) +func MasterAreaVisitsFlowsFindByParams(Db *xorm.Engine, params map[string]interface{}) (*[]model.MasterAreaVisitsFlows, error) { + var m []model.MasterAreaVisitsFlows + if params["value"] == nil { + return nil, errors.New("参数有误") + } + if params["key"] == nil { + //查询全部数据 + err := Db.Find(&m) + if err != nil { + return nil, logx.Error(err) + } + return &m, nil + } else { + if reflect.TypeOf(params["value"]).Kind() == reflect.Slice { + //指定In查询 + if err := Db.In(utils.AnyToString(params["key"]), params["value"]).Find(&m); err != nil { + return nil, logx.Warn(err) + } + return &m, nil + } else { + var query = fmt.Sprintf("%s =?", params["key"]) + err := Db.Where(query, params["value"]).Find(&m) + if err != nil { + return nil, logx.Error(err) + } + return &m, nil + } + + } +} diff --git a/app/db/db_province.go b/app/db/db_province.go index 47af262..5af4319 100644 --- a/app/db/db_province.go +++ b/app/db/db_province.go @@ -28,6 +28,18 @@ func ProvinceGetOne(key string) (*model.Province, error) { return &province, nil } +func ProvinceGetOneByName(name string) (*model.Province, error) { + var province model.Province + + if has, err := Db.Where("name LIKE ?", "%"+name+"%").Get(&province); err != nil || has == false { + if has == false { + return &province, nil + } + return nil, logx.Error(err) + } + return &province, nil +} + //单条记录获取DB func ProvinceGetWithDb(HKey string) string { cacheKey := md.OfficialProvinceCacheKey diff --git a/app/db/model/master_area_visits_flows.go b/app/db/model/master_area_visits_flows.go new file mode 100644 index 0000000..69575df --- /dev/null +++ b/app/db/model/master_area_visits_flows.go @@ -0,0 +1,17 @@ +package model + +import "time" + +type MasterAreaVisitsFlows struct { + Id int `json:"id"` + Ip string `json:"ip"` + MasterId int `json:"master_id"` + Date string `json:"date"` + CountryName string `json:"country_name"` + ProvinceName string `json:"province_name"` + ProvinceId string `json:"province_id"` + CityName string `json:"city_name"` + CityId string `json:"city_id"` + CreateAt time.Time `json:"create_at" xorm:"not null default CURRENT_TIMESTAMP comment('创建时间') TIMESTAMP"` + UpdateAt time.Time `json:"update_at" xorm:"not null default CURRENT_TIMESTAMP comment('创建时间') TIMESTAMP"` +} diff --git a/cmd/task/main.go b/cmd/task/main.go index ec3c112..727009b 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -25,6 +25,11 @@ func init() { if err := db.InitDB(&baseDb); err != nil { panic(err) } + dataDb := *cfg.DataDB + dataDb.Path = fmt.Sprintf(cfg.DataDB.Path, cfg.DataDB.Name) + if err := db.InitDataDB(&dataDb); err != nil { + panic(err) + } utils.CurlDebug = true //cfg.InitMemCache() } diff --git a/consume/init.go b/consume/init.go index ab53d8a..66301c3 100644 --- a/consume/init.go +++ b/consume/init.go @@ -19,6 +19,7 @@ func Init() { func initConsumes() { jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume + jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume } func Run() { diff --git a/consume/md/consume_key.go b/consume/md/consume_key.go index 0566600..092d9b3 100644 --- a/consume/md/consume_key.go +++ b/consume/md/consume_key.go @@ -29,9 +29,19 @@ var RabbitMqQueueKeyList = []*MqQueue{ BindKey: "", ConsumeFunName: "CanalGuideOrderConsume", }, + { + ExchangeName: "zhios.app.user.visit.ip.address.exchange", + Name: "zhios_user_visit_ip_address_queue", + Type: FanOutQueueType, + IsPersistent: false, + RoutKey: "queue_one", + BindKey: "", + ConsumeFunName: "ZhiOsUserVisitIpAddressConsume", + }, } const ( - CanalOrderConsumeFunName = "CanalOrderConsume" - CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" + CanalOrderConsumeFunName = "CanalOrderConsume" + CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume" + ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume" ) diff --git a/consume/md/md_user_visit_ip_address_consume.go b/consume/md/md_user_visit_ip_address_consume.go new file mode 100644 index 0000000..dd1aeaf --- /dev/null +++ b/consume/md/md_user_visit_ip_address_consume.go @@ -0,0 +1,12 @@ +package md + +type ZhiOsUserVisitIpAddressMessage struct { + Country string `json:"country"` + Province string `json:"province"` + City string `json:"city"` + Ip string `json:"ip"` + MasterId string `json:"master_id"` +} + +const ZhiOsUserVisitIpAddressHashMapCacheKey = "zhiOs_user_visit_ip_address_hash_map_cache:%s:%s" //访问ip缓存hashMap键zhiOs_user_visit_ip_address_hash_map_cache:masterId:date +const ZhiOsUserVisitIpAddressHashMapCacheTime = 60 * 60 * 24 //1天 diff --git a/consume/md/mq.go b/consume/md/mq.go index f86cb5c..5b94ebf 100644 --- a/consume/md/mq.go +++ b/consume/md/mq.go @@ -7,6 +7,7 @@ const ( BroadQueueType = "BroadQueue" DirectQueueType = "DirectQueue" TopicQueueType = "TopicQueue" + FanOutQueueType = "FanOutQueue" ) type MsgClient struct { diff --git a/consume/zhios_user_visit_ip_address_consume.go b/consume/zhios_user_visit_ip_address_consume.go new file mode 100644 index 0000000..1bc73d4 --- /dev/null +++ b/consume/zhios_user_visit_ip_address_consume.go @@ -0,0 +1,123 @@ +package consume + +import ( + "applet/app/db" + "applet/app/db/model" + "applet/app/utils" + "applet/app/utils/cache" + "applet/app/utils/logx" + "applet/consume/md" + "code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit" + "encoding/json" + "errors" + "fmt" + "github.com/streadway/amqp" + "time" + "xorm.io/xorm" +) + +func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) { + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>") + ch, err := rabbit.Cfg.Pool.GetChannel() + if err != nil { + logx.Error(err) + return + } + defer ch.Release() + //1、将自己绑定到交换机上 + ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey) + //2、取出数据进行消费 + ch.Qos(1) + delivery := ch.Consume(queue.Name) + + var res amqp.Delivery + var ok bool + for { + res, ok = <-delivery + if ok == true { + //fmt.Println(string(res.Body)) + fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>") + err = handleUserVisitIpAddress(res.Body) + //_ = res.Reject(false) + _ = res.Ack(true) + } else { + panic(errors.New("error getting message")) + } + } + fmt.Println("get msg done") +} + +func handleUserVisitIpAddress(msg []byte) error { + now := time.Now() + today := now.Format("2006-01-02") + var tmpString string + err := json.Unmarshal(msg, &tmpString) + if err != nil { + fmt.Println(err.Error()) + return err + } + var msgStruct md.ZhiOsUserVisitIpAddressMessage + err = json.Unmarshal([]byte(tmpString), &msgStruct) + if err != nil { + fmt.Println(err.Error()) + return err + } + + //1、判断ip是否已统计 + cacheKey := fmt.Sprintf(md.ZhiOsUserVisitIpAddressHashMapCacheKey, msgStruct.MasterId, today) + get, _ := cache.HGetString(cacheKey, msgStruct.Ip) + if get == "" { + //2、分析ip归属地 + countryName, provinceId, cityId, provinceName, cityName := getIpAddress(db.DBs[msgStruct.MasterId], msgStruct) + if provinceId != "" || cityId != "" || countryName != "" || provinceName != "" || cityName != "" { + //3、插入`master_area_visits_flows` 表 + if provinceName == "" { + provinceName = msgStruct.Province + } + if cityName == "" { + cityName = msgStruct.City + } + db.MasterAreaVisitsFlowsInsert(db.DataDb, &model.MasterAreaVisitsFlows{ + Ip: msgStruct.Ip, + MasterId: utils.StrToInt(msgStruct.MasterId), + Date: today, + CountryName: countryName, + ProvinceName: provinceName, + ProvinceId: provinceId, + CityName: cityName, + CityId: cityId, + CreateAt: now, + UpdateAt: now, + }) + } + //4、加入到缓存map中 + cache.HSet(cacheKey, msgStruct.Ip, msgStruct.MasterId) + cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime) + } + return nil +} + +func getIpAddress(Db *xorm.Engine, message md.ZhiOsUserVisitIpAddressMessage) (countryName, provinceId, cityId, provinceName, cityName string) { + countryName = message.Country + if countryName == "中国" && message.Province != "" { + if message.Province == "闽" { + message.Province = "福建" + } + province, err := db.ProvinceGetOneByName(message.Province) + if err != nil { + return "", "", "", "", "" + } + provinceId = province.Id + provinceName = province.Name + + if message.City != "" { + city, err := db.CityGetOneByName(message.City) + if err != nil { + return "", "", "", "", "" + } + cityId = city.Id + cityName = city.Name + } + } + return +} diff --git a/etc/cfg.yml b/etc/cfg.yml index e4c7f20..87ac86f 100644 --- a/etc/cfg.yml +++ b/etc/cfg.yml @@ -39,6 +39,18 @@ db: max_idle_conns: 100 path: 'tmp/%s.log' +# 连接 big_data_screen 数据库 +data_db: + host: '119.23.182.117:3306' + name: 'big_data_screen' + user: 'root' + psw: 'Fnuo123com@' + show_log: true + max_lifetime: 30 + max_open_conns: 100 + max_idle_conns: 100 + path: 'tmp/%s.log' + # 日志 log: app_name: 'applet' diff --git a/main.go b/main.go index d02a917..721f30f 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,9 @@ func init() { if err := db.InitDB(cfg.DB); err != nil { //主数据库初始化 panic(err) } + if err := db.InitDataDB(cfg.DataDB); err != nil { //数据大屏本身库初始化 + panic(err) + } channel := make(chan int, 0) //开辟管道,缓冲为 go db.InitDBs(channel) <-channel