DengBiao преди 1 година
родител
ревизия
1c39113f86
променени са 16 файла, в които са добавени 380 реда и са изтрити 2 реда
  1. +1
    -0
      app/cfg/cfg_app.go
  2. +2
    -0
      app/cfg/init_cfg.go
  3. +1
    -0
      app/cfg/init_task.go
  4. +50
    -0
      app/db/data_db.go
  5. +11
    -0
      app/db/db_city.go
  6. +117
    -0
      app/db/db_master_area_visits_flows.go
  7. +12
    -0
      app/db/db_province.go
  8. +17
    -0
      app/db/model/master_area_visits_flows.go
  9. +5
    -0
      cmd/task/main.go
  10. +1
    -0
      consume/init.go
  11. +12
    -2
      consume/md/consume_key.go
  12. +12
    -0
      consume/md/md_user_visit_ip_address_consume.go
  13. +1
    -0
      consume/md/mq.go
  14. +123
    -0
      consume/zhios_user_visit_ip_address_consume.go
  15. +12
    -0
      etc/cfg.yml
  16. +3
    -0
      main.go

+ 1
- 0
app/cfg/cfg_app.go Целия файл

@@ -14,6 +14,7 @@ type Config struct {
RabbitMqAddr string `yaml:"rabbitMq_addr"`
RabbitMqAddrTest string `yaml:"rabbitMq_addr_test"`
DB DBCfg `yaml:"db"`
DataDB DBCfg `yaml:"data_db"`
MQ MQCfg `yaml:"mq"`
ES ESCfg `yaml:"es"`
Log LogCfg `yaml:"log"`


+ 2
- 0
app/cfg/init_cfg.go Целия файл

@@ -15,6 +15,7 @@ var (
SrvAddr string
RedisAddr string
DB *DBCfg
DataDB *DBCfg
MQ *MQCfg
ES *ESCfg
Log *LogCfg
@@ -53,6 +54,7 @@ func InitCfg() {
Local = conf.Local
CurlDebug = conf.CurlDebug
DB = &conf.DB
DataDB = &conf.DataDB
Log = &conf.Log
ArkID = &conf.ArkID
RedisAddr = conf.RedisAddr


+ 1
- 0
app/cfg/init_task.go Целия файл

@@ -27,6 +27,7 @@ func InitTaskCfg() {
Prd = conf.Prd
Debug = conf.Debug
DB = &conf.DB
DataDB = &conf.DataDB
Log = &conf.Log
Admin = &conf.Admin
RedisAddr = conf.RedisAddr


+ 50
- 0
app/db/data_db.go Целия файл

@@ -0,0 +1,50 @@
package db

import (
"fmt"
"os"
"time"

_ "github.com/go-sql-driver/mysql" //必须导入mysql驱动,否则会panic
"xorm.io/xorm"
"xorm.io/xorm/log"

"applet/app/cfg"
)

var DataDb *xorm.Engine

//根据DB配置文件初始化数据库
func InitDataDB(c *cfg.DBCfg) error {
var (
err error
f *os.File
)
//创建Orm引擎
if DataDb, err = xorm.NewEngine("mysql", fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4", c.User, c.Psw, c.Host, c.Name)); err != nil {
return err
}
DataDb.SetConnMaxLifetime(c.MaxLifetime * time.Second) //设置最长连接时间
DataDb.SetMaxOpenConns(c.MaxOpenConns) //设置最大打开连接数
DataDb.SetMaxIdleConns(c.MaxIdleConns) //设置连接池的空闲数大小
if err = DataDb.Ping(); err != nil { //尝试ping数据库
return err
}
if c.ShowLog { //根据配置文件设置日志
DataDb.ShowSQL(true) //设置是否打印sql
DataDb.Logger().SetLevel(0) //设置日志等级
//修改日志文件存放路径文件名是%s.log
path := fmt.Sprintf(c.Path, c.Name)
f, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0777)
if err != nil {
os.RemoveAll(c.Path)
if f, err = os.OpenFile(c.Path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0777); err != nil {
return err
}
}
logger := log.NewSimpleLogger(f)
logger.ShowSQL(true)
DataDb.SetLogger(logger)
}
return nil
}

+ 11
- 0
app/db/db_city.go Целия файл

@@ -28,6 +28,17 @@ func CityGetOne(key string) (*model.City, error) {
return &City, nil
}

func CityGetOneByName(name string) (*model.City, error) {
var City model.City
if has, err := Db.Where("name LIKE ?", "%"+name+"%").Get(&City); err != nil || has == false {
if has == false {
return &City, nil
}
return nil, logx.Error(err)
}
return &City, nil
}

//单条记录获取DB
func CityGetWithDb(HKey string) string {
cacheKey := md.OfficialCityCacheKey


+ 117
- 0
app/db/db_master_area_visits_flows.go Целия файл

@@ -0,0 +1,117 @@
package db

import (
"applet/app/db/model"
"applet/app/utils"
"applet/app/utils/logx"
"errors"
"fmt"
"reflect"
"xorm.io/xorm"
)

// BatchSelectMasterAreaVisitsFlows 批量查询数据 TODO::和下面的方法重复了,建议采用下面的 `MasterAreaVisitsFlowsFindByParams` 方法
func BatchSelectMasterAreaVisitsFlows(Db *xorm.Engine, params map[string]interface{}) (*[]model.MasterAreaVisitsFlows, error) {
var MasterAreaVisitsFlowsData []model.MasterAreaVisitsFlows
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).
Find(&MasterAreaVisitsFlowsData); err != nil {
return nil, logx.Warn(err)
}
return &MasterAreaVisitsFlowsData, nil
}

// MasterAreaVisitsFlowsInsert 插入单条数据
func MasterAreaVisitsFlowsInsert(Db *xorm.Engine, MasterAreaVisitsFlows *model.MasterAreaVisitsFlows) (int, error) {
_, err := Db.InsertOne(MasterAreaVisitsFlows)
if err != nil {
return 0, err
}
return MasterAreaVisitsFlows.Id, nil
}

// BatchAddMasterAreaVisitsFlowss 批量新增数据
func BatchAddMasterAreaVisitsFlowss(Db *xorm.Engine, MasterAreaVisitsFlowsData []*model.MasterAreaVisitsFlows) (int64, error) {
affected, err := Db.Insert(MasterAreaVisitsFlowsData)
if err != nil {
return 0, err
}
return affected, nil
}

func GetMasterAreaVisitsFlowsCount(Db *xorm.Engine) int {
var MasterAreaVisitsFlows model.MasterAreaVisitsFlows
session := Db.Where("")
count, err := session.Count(&MasterAreaVisitsFlows)
if err != nil {
return 0
}
return int(count)
}

// MasterAreaVisitsFlowsDelete 删除记录
func MasterAreaVisitsFlowsDelete(Db *xorm.Engine, id interface{}) (int64, error) {
if reflect.TypeOf(id).Kind() == reflect.Slice {
return Db.In("id", id).Delete(model.MasterAreaVisitsFlows{})
} else {
return Db.Where("id = ?", id).Delete(model.MasterAreaVisitsFlows{})
}
}

// MasterAreaVisitsFlowsUpdate 更新记录
func MasterAreaVisitsFlowsUpdate(Db *xorm.Engine, id interface{}, MasterAreaVisitsFlows *model.MasterAreaVisitsFlows, forceColums ...string) (int64, error) {
var (
affected int64
err error
)
if forceColums != nil {
affected, err = Db.Where("id=?", id).Cols(forceColums...).Update(MasterAreaVisitsFlows)
} else {
affected, err = Db.Where("id=?", id).Update(MasterAreaVisitsFlows)
}
if err != nil {
return 0, err
}
return affected, nil
}

// MasterAreaVisitsFlowsGetOneByParams 通过传入的参数查询数据(单条)
func MasterAreaVisitsFlowsGetOneByParams(Db *xorm.Engine, params map[string]interface{}) (*model.MasterAreaVisitsFlows, error) {
var m model.MasterAreaVisitsFlows
var query = fmt.Sprintf("%s =?", params["key"])
if has, err := Db.Where(query, params["value"]).Get(&m); err != nil || has == false {
return nil, logx.Error(err)
}
return &m, nil
}

// MasterAreaVisitsFlowsFindByParams 通过传入的参数查询数据(多条)
func MasterAreaVisitsFlowsFindByParams(Db *xorm.Engine, params map[string]interface{}) (*[]model.MasterAreaVisitsFlows, error) {
var m []model.MasterAreaVisitsFlows
if params["value"] == nil {
return nil, errors.New("参数有误")
}
if params["key"] == nil {
//查询全部数据
err := Db.Find(&m)
if err != nil {
return nil, logx.Error(err)
}
return &m, nil
} else {
if reflect.TypeOf(params["value"]).Kind() == reflect.Slice {
//指定In查询
if err := Db.In(utils.AnyToString(params["key"]), params["value"]).Find(&m); err != nil {
return nil, logx.Warn(err)
}
return &m, nil
} else {
var query = fmt.Sprintf("%s =?", params["key"])
err := Db.Where(query, params["value"]).Find(&m)
if err != nil {
return nil, logx.Error(err)
}
return &m, nil
}

}
}

+ 12
- 0
app/db/db_province.go Целия файл

@@ -28,6 +28,18 @@ func ProvinceGetOne(key string) (*model.Province, error) {
return &province, nil
}

func ProvinceGetOneByName(name string) (*model.Province, error) {
var province model.Province

if has, err := Db.Where("name LIKE ?", "%"+name+"%").Get(&province); err != nil || has == false {
if has == false {
return &province, nil
}
return nil, logx.Error(err)
}
return &province, nil
}

//单条记录获取DB
func ProvinceGetWithDb(HKey string) string {
cacheKey := md.OfficialProvinceCacheKey


+ 17
- 0
app/db/model/master_area_visits_flows.go Целия файл

@@ -0,0 +1,17 @@
package model

import "time"

type MasterAreaVisitsFlows struct {
Id int `json:"id"`
Ip string `json:"ip"`
MasterId int `json:"master_id"`
Date string `json:"date"`
CountryName string `json:"country_name"`
ProvinceName string `json:"province_name"`
ProvinceId string `json:"province_id"`
CityName string `json:"city_name"`
CityId string `json:"city_id"`
CreateAt time.Time `json:"create_at" xorm:"not null default CURRENT_TIMESTAMP comment('创建时间') TIMESTAMP"`
UpdateAt time.Time `json:"update_at" xorm:"not null default CURRENT_TIMESTAMP comment('创建时间') TIMESTAMP"`
}

+ 5
- 0
cmd/task/main.go Целия файл

@@ -25,6 +25,11 @@ func init() {
if err := db.InitDB(&baseDb); err != nil {
panic(err)
}
dataDb := *cfg.DataDB
dataDb.Path = fmt.Sprintf(cfg.DataDB.Path, cfg.DataDB.Name)
if err := db.InitDataDB(&dataDb); err != nil {
panic(err)
}
utils.CurlDebug = true
//cfg.InitMemCache()
}


+ 1
- 0
consume/init.go Целия файл

@@ -19,6 +19,7 @@ func Init() {
func initConsumes() {
jobs[consumeMd.CanalOrderConsumeFunName] = CanalOrderConsume
jobs[consumeMd.CanalGuideOrderConsumeFunName] = CanalGuideOrderConsume
jobs[consumeMd.ZhiOsUserVisitIpAddressConsumeFunName] = ZhiOsUserVisitIpAddressConsume
}

func Run() {


+ 12
- 2
consume/md/consume_key.go Целия файл

@@ -29,9 +29,19 @@ var RabbitMqQueueKeyList = []*MqQueue{
BindKey: "",
ConsumeFunName: "CanalGuideOrderConsume",
},
{
ExchangeName: "zhios.app.user.visit.ip.address.exchange",
Name: "zhios_user_visit_ip_address_queue",
Type: FanOutQueueType,
IsPersistent: false,
RoutKey: "queue_one",
BindKey: "",
ConsumeFunName: "ZhiOsUserVisitIpAddressConsume",
},
}

const (
CanalOrderConsumeFunName = "CanalOrderConsume"
CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume"
CanalOrderConsumeFunName = "CanalOrderConsume"
CanalGuideOrderConsumeFunName = "CanalGuideOrderConsume"
ZhiOsUserVisitIpAddressConsumeFunName = "ZhiOsUserVisitIpAddressConsume"
)

+ 12
- 0
consume/md/md_user_visit_ip_address_consume.go Целия файл

@@ -0,0 +1,12 @@
package md

type ZhiOsUserVisitIpAddressMessage struct {
Country string `json:"country"`
Province string `json:"province"`
City string `json:"city"`
Ip string `json:"ip"`
MasterId string `json:"master_id"`
}

const ZhiOsUserVisitIpAddressHashMapCacheKey = "zhiOs_user_visit_ip_address_hash_map_cache:%s:%s" //访问ip缓存hashMap键zhiOs_user_visit_ip_address_hash_map_cache:masterId:date
const ZhiOsUserVisitIpAddressHashMapCacheTime = 60 * 60 * 24 //1天

+ 1
- 0
consume/md/mq.go Целия файл

@@ -7,6 +7,7 @@ const (
BroadQueueType = "BroadQueue"
DirectQueueType = "DirectQueue"
TopicQueueType = "TopicQueue"
FanOutQueueType = "FanOutQueue"
)

type MsgClient struct {


+ 123
- 0
consume/zhios_user_visit_ip_address_consume.go Целия файл

@@ -0,0 +1,123 @@
package consume

import (
"applet/app/db"
"applet/app/db/model"
"applet/app/utils"
"applet/app/utils/cache"
"applet/app/utils/logx"
"applet/consume/md"
"code.fnuoos.com/go_rely_warehouse/zyos_go_mq.git/rabbit"
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"time"
"xorm.io/xorm"
)

func ZhiOsUserVisitIpAddressConsume(queue md.MqQueue) {
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>")
ch, err := rabbit.Cfg.Pool.GetChannel()
if err != nil {
logx.Error(err)
return
}
defer ch.Release()
//1、将自己绑定到交换机上
ch.Bind(queue.Name, queue.ExchangeName, queue.RoutKey)
//2、取出数据进行消费
ch.Qos(1)
delivery := ch.Consume(queue.Name)

var res amqp.Delivery
var ok bool
for {
res, ok = <-delivery
if ok == true {
//fmt.Println(string(res.Body))
fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
err = handleUserVisitIpAddress(res.Body)
//_ = res.Reject(false)
_ = res.Ack(true)
} else {
panic(errors.New("error getting message"))
}
}
fmt.Println("get msg done")
}

func handleUserVisitIpAddress(msg []byte) error {
now := time.Now()
today := now.Format("2006-01-02")
var tmpString string
err := json.Unmarshal(msg, &tmpString)
if err != nil {
fmt.Println(err.Error())
return err
}
var msgStruct md.ZhiOsUserVisitIpAddressMessage
err = json.Unmarshal([]byte(tmpString), &msgStruct)
if err != nil {
fmt.Println(err.Error())
return err
}

//1、判断ip是否已统计
cacheKey := fmt.Sprintf(md.ZhiOsUserVisitIpAddressHashMapCacheKey, msgStruct.MasterId, today)
get, _ := cache.HGetString(cacheKey, msgStruct.Ip)
if get == "" {
//2、分析ip归属地
countryName, provinceId, cityId, provinceName, cityName := getIpAddress(db.DBs[msgStruct.MasterId], msgStruct)
if provinceId != "" || cityId != "" || countryName != "" || provinceName != "" || cityName != "" {
//3、插入`master_area_visits_flows` 表
if provinceName == "" {
provinceName = msgStruct.Province
}
if cityName == "" {
cityName = msgStruct.City
}
db.MasterAreaVisitsFlowsInsert(db.DataDb, &model.MasterAreaVisitsFlows{
Ip: msgStruct.Ip,
MasterId: utils.StrToInt(msgStruct.MasterId),
Date: today,
CountryName: countryName,
ProvinceName: provinceName,
ProvinceId: provinceId,
CityName: cityName,
CityId: cityId,
CreateAt: now,
UpdateAt: now,
})
}
//4、加入到缓存map中
cache.HSet(cacheKey, msgStruct.Ip, msgStruct.MasterId)
cache.Expire(cacheKey, md.ZhiOsUserVisitIpAddressHashMapCacheTime)
}
return nil
}

func getIpAddress(Db *xorm.Engine, message md.ZhiOsUserVisitIpAddressMessage) (countryName, provinceId, cityId, provinceName, cityName string) {
countryName = message.Country
if countryName == "中国" && message.Province != "" {
if message.Province == "闽" {
message.Province = "福建"
}
province, err := db.ProvinceGetOneByName(message.Province)
if err != nil {
return "", "", "", "", ""
}
provinceId = province.Id
provinceName = province.Name

if message.City != "" {
city, err := db.CityGetOneByName(message.City)
if err != nil {
return "", "", "", "", ""
}
cityId = city.Id
cityName = city.Name
}
}
return
}

+ 12
- 0
etc/cfg.yml Целия файл

@@ -39,6 +39,18 @@ db:
max_idle_conns: 100
path: 'tmp/%s.log'

# 连接 big_data_screen 数据库
data_db:
host: '119.23.182.117:3306'
name: 'big_data_screen'
user: 'root'
psw: 'Fnuo123com@'
show_log: true
max_lifetime: 30
max_open_conns: 100
max_idle_conns: 100
path: 'tmp/%s.log'

# 日志
log:
app_name: 'applet'


+ 3
- 0
main.go Целия файл

@@ -23,6 +23,9 @@ func init() {
if err := db.InitDB(cfg.DB); err != nil { //主数据库初始化
panic(err)
}
if err := db.InitDataDB(cfg.DataDB); err != nil { //数据大屏本身库初始化
panic(err)
}
channel := make(chan int, 0) //开辟管道,缓冲为
go db.InitDBs(channel)
<-channel


Зареждане…
Отказ
Запис