dengbiao 2 weeks ago
parent
commit
3f09da139f
11 changed files with 331 additions and 2 deletions
  1. +1
    -1
      app/svc/article/svc_cate.go
  2. +107
    -0
      app/svc/egg_energy/svc_egg_energy.go
  3. +1
    -1
      app/svc/egg_energy/svc_egg_energy_test.go
  4. +4
    -0
      app/svc/user_real_name/svc_list.go
  5. +2
    -0
      app/task/init.go
  6. +2
    -0
      app/task/md/cron_key.go
  7. +78
    -0
      app/task/svc/svc_auto_caching_top100.go
  8. +26
    -0
      app/task/svc/svc_auto_caching_top100_test.go
  9. +66
    -0
      app/task/svc/svc_egg_energy_virtual_coin_flow.go
  10. +22
    -0
      app/task/task_auto_caching_top100.go
  11. +22
    -0
      app/task/task_egg_energy_virtual_coin_flow.go

+ 1
- 1
app/svc/article/svc_cate.go View File

@@ -69,7 +69,7 @@ func CateSave(c *gin.Context) {
data.Sort = utils.StrToInt(req.Sort)
data.Pid = utils.StrToInt(req.Pid)
data.IsShow = utils.StrToInt(req.IsShow)
db.Db.Where("id=?", data.Id).Update(data)
db.Db.Where("id=?", data.Id).AllCols().Update(data)
e.OutSuc(c, "success", nil)
return
}


+ 107
- 0
app/svc/egg_energy/svc_egg_energy.go View File

@@ -325,6 +325,113 @@ func StatisticsUserEggEcpmRange(esIndexName string) (resp []map[string]string, e
return resp, nil
}

func StatisticsUserEggEcpmRange1(esIndexName string) (resp []map[string]string, err error) {
resp = []map[string]string{}
var result *elastic.SearchResult
// 创建百分位数聚合
percentBoolQuery := elastic.NewRangeQuery("ecpm").Gt(0)
percentilesAgg := elastic.NewPercentilesAggregation().Field("ecpm").Percentiles([]float64{5, 90}...)
// 执行查询
response, err := es.EsClient.Search().
Index(esIndexName).
Query(percentBoolQuery).
Aggregation("percentiles_agg", percentilesAgg).
Size(0).
Do(context.Background())
if err != nil {
// 处理错误
return resp, err
}
// 获取百分位数结果
percentiles, found := response.Aggregations.Percentiles("percentiles_agg")
if !found {
return resp, err
}
// 获取第10和第90百分位数
p5 := percentiles.Values["5.0"]
p90 := percentiles.Values["90.0"]
boolQuery := elastic.NewRangeQuery("ecpm").Gte(p5).Lte(p90)

// 1、 创建 extended_stats 高级统计
aggs := elastic.NewExtendedStatsAggregation().Field("ecpm")
result, err = es.EsClient.Search().
Index(esIndexName).
TrackTotalHits(true).
Query(boolQuery). // 设置查询条件
Aggregation("result", aggs). // 设置聚合条件,并为聚合条件设置一个名字
Size(0). // 设置分页参数 - 每页大小,设置为0,代表不返回搜索结果,仅返回聚合分析结果
Pretty(true). // 返回可读的json格式
Do(context.Background())
if err != nil {
return resp, err
}

// 使用Cardinality函数和前面定义的聚合条件名称,查询结果
agg, found := result.Aggregations.ExtendedStats("result")
if !found {
// 打印结果,注意:这里使用的是取值运算符
return resp, errors.New("未聚合出数据")
}

if agg.StdDeviation != nil && *agg.StdDeviation != 0 {
//2、histogram 直方图聚合
stdDeviation := *agg.StdDeviation //标准平方差
min := *agg.Min //最小值
max := *agg.Max //最大值
avg := *agg.Avg //平均数
totalCount := agg.Count //总数
discreteCoefficient := stdDeviation / avg //离散系数
if stdDeviation < avg {
//TODO::如果标准差相对较小,而平均值较大,离散系数可能会导致较大的间隔,直接使用标准差或其他更合适的值作为间隔。
discreteCoefficient = stdDeviation
}
newAggs := elastic.NewHistogramAggregation().Field("ecpm").Interval(discreteCoefficient).MinDocCount(1).ExtendedBounds(min, max)
searchResult, err := es.EsClient.Search().
Index(esIndexName).
TrackTotalHits(true).
Query(boolQuery). // 设置查询条件
Aggregation("result", newAggs). // 设置聚合条件,并为聚合条件设置一个名字
Size(0). // 设置分页参数 - 每页大小,设置为0,代表不返回搜索结果,仅返回聚合分析结果
Pretty(true). // 返回可读的json格式
Do(context.Background())
if err != nil {
return resp, err
}
// 使用Terms函数和前面定义的聚合条件名称,查询结果
newAgg, found := searchResult.Aggregations.Histogram("result")
if !found {
return resp, errors.New("未聚合出数据")
}

// 3、组装数据
var keys []string
var values []int64
for _, bucket := range newAgg.Buckets {
// 每一个桶都有一个key值,其实就是分组的值,可以理解为SQL的group by值
bucketValue := bucket.Key
keys = append(keys, utils.Float64ToStr(bucketValue))
values = append(values, bucket.DocCount)
}

for k, v := range keys {
value, _ := Operate(values[k], totalCount)
if k+1 == len(keys) {
resp = append(resp, map[string]string{
"key": v + " ~ " + utils.Float64ToStr(max) + " 元",
"value": utils.Float64ToStrPrec10(value),
})
} else {
resp = append(resp, map[string]string{
"key": v + " ~ " + keys[k+1] + " 元",
"value": utils.Float64ToStrPrec10(value),
})
}
}
}
fmt.Println(resp)
return resp, nil
}

// StatisticsUserEggInviteUserNumsRange 统计用户"拉新人数"范围
func StatisticsUserEggInviteUserNumsRange(esIndexName string) (resp []map[string]string, err error) {
resp = []map[string]string{}


+ 1
- 1
app/svc/egg_energy/svc_egg_energy_test.go View File

@@ -11,7 +11,7 @@ func TestStatisticsUserEggEcpmRange(t *testing.T) {
es.Init("http://120.55.48.175:9200", "elastic", "fnuo123")
var tempResp md.StatisticsEggPointResp
var err error
tempResp.StatisticsUserEggEcpmRange, err = StatisticsUserEggEcpmRange("egg_energy_user_egg_score_202451")
tempResp.StatisticsUserEggEcpmRange, err = StatisticsUserEggEcpmRange1("egg_energy_user_egg_score_202451")
if err != nil {
fmt.Println(err)
}


+ 4
- 0
app/svc/user_real_name/svc_list.go View File

@@ -6,6 +6,7 @@ import (
"applet/app/md"
"applet/app/utils"
"code.fnuoos.com/EggPlanet/egg_models.git/src/implement"
"code.fnuoos.com/EggPlanet/egg_models.git/src/model"
"github.com/gin-gonic/gin"
)

@@ -78,6 +79,9 @@ func Save(c *gin.Context) {
}
data.State = utils.StrToInt(req.State)
db.Db.Where("id=?", data.Id).Cols("state").Update(data)
if data.State == 1 {
db.Db.Where("id=?", data.Uid).Cols("is_real_name").Update(&model.User{IsRealName: 1})
}
e.OutSuc(c, "success", nil)
return
}

+ 2
- 0
app/task/init.go View File

@@ -100,4 +100,6 @@ func initTasks() {
jobs[taskMd.CornEggEnergyAutoCachingEggPointStatistics] = taskAutoCachingEggPointStatistics // 蛋蛋分统计落地页-缓存
jobs[taskMd.CornEggEnergyAutoUpdateUserAccess] = taskEggEnergyAutoUpdateUserAccess // es蛋蛋分记录-自动更新访问次数
jobs[taskMd.CornEggEnergyAutoDeleteTableAdvertisingCallback] = taskAutoDeleteTableAdvertisingCallback // 定时清除七天以前的广告回调数据
jobs[taskMd.CornEggEnergyCoinFlow] = taskEggEnergyCoinFlow // 定时重置用户流水聚合信息
jobs[taskMd.CornEggEnergyAutoCachingTop100] = taskAutoCachingTop100 // 英雄榜缓存前一百数据
}

+ 2
- 0
app/task/md/cron_key.go View File

@@ -14,4 +14,6 @@ const (
CornEggEnergyAutoCachingEggPointStatistics = "cron_egg_energy_auto_caching_egg_point_statistics" // 缓存蛋蛋分统计落地页
CornEggEnergyAutoUpdateUserAccess = "cron_egg_energy_auto_update_user_access" // 同步用户访问次数到es
CornEggEnergyAutoDeleteTableAdvertisingCallback = "cron_egg_energy_auto_delete_table_advertising_callback" // 同步用户访问次数到es
CornEggEnergyCoinFlow = "cron_egg_energy_coin_flow" // 定时重置用户流水聚合信息
CornEggEnergyAutoCachingTop100 = "cron_egg_energy_auto_caching_top100" // 英雄榜缓存前一百数据
)

+ 78
- 0
app/task/svc/svc_auto_caching_top100.go View File

@@ -0,0 +1,78 @@
package svc

import (
"applet/app/db"
"applet/app/utils"
"applet/app/utils/cache"
"fmt"
"xorm.io/xorm"
)

const AutoCachingTop100LockKey = "egg_energy_auto_caching_top100_lock_key"
const HeroListRedisRankKey = "EggEnergy:HomePage:HeroList:Rank:%d" // 1.kind

func AutoCachingTop100(engine *xorm.Engine) {
fmt.Println("egg_energy_auto_caching_top100...")
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
return
}
}()
fmt.Println("----------------------------AutoCachingTop100_Begin-------------------------------")

getString, _ := cache.GetString(AutoCachingTop100LockKey)
if getString != "" {
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "上一次缓存前一百用户未执行完")
return
}
cache.SetEx(AutoCachingTop100LockKey, "running", 10*60) // 10 min
defer cache.Del(AutoCachingTop100LockKey)

for i := 1; i <= 3; i++ {
sql := "SELECT user_virtual_coin_flow_aggregation.*" +
"FROM `user_virtual_coin_flow_aggregation` " +
"WHERE 1=1"
switch i {
case 1:
sql += " ORDER BY user_virtual_coin_flow_aggregation.today_data"
case 2:
sql += " ORDER BY user_virtual_coin_flow_aggregation.this_week_data"
case 3:
sql += " ORDER BY user_virtual_coin_flow_aggregation.this_month_data"
}
sql += " Desc LIMIT %d, %d;"
sql = fmt.Sprintf(sql, 0, 100)
res, err := db.QueryNativeString(engine, sql)
if err != nil {
fmt.Println("AutoCachingTop100_Err ===================>" + err.Error())
return
}
rankCacheKey := fmt.Sprintf(HeroListRedisRankKey, i)
for _, item := range res {
switch i {
case 1:
_, err = cache.ZAdd(rankCacheKey, utils.StrToFloat64(item["today_data"]), item["uid"])
if err != nil {
fmt.Println("AutoCachingTop100_FailedToCacheData_Err ==================>" + err.Error())
return
}
case 2:
_, err := cache.ZAdd(rankCacheKey, utils.StrToFloat64(item["this_week_data"]), item["uid"])
if err != nil {
fmt.Println("AutoCachingTop100_FailedToCacheData_Err ==================>" + err.Error())
return
}
case 3:
_, err := cache.ZAdd(rankCacheKey, utils.StrToFloat64(item["this_month_data"]), item["uid"])
if err != nil {
fmt.Println("AutoCachingTop100_FailedToCacheData_Err ==================>" + err.Error())
return
}
}
}

fmt.Println("----------------------------AutoCachingTop100_End-------------------------------")
}

}

+ 26
- 0
app/task/svc/svc_auto_caching_top100_test.go View File

@@ -0,0 +1,26 @@
package svc

import (
"applet/app/cfg"
"applet/app/db"
"applet/app/utils/cache"
"testing"
)

func TestAutoCachingTop100(t *testing.T) {
dbcfg := cfg.DBCfg{
Host: "119.23.182.117:3306",
Name: "egg",
User: "root",
Psw: "Fnuo123com@",
ShowLog: true,
MaxLifetime: 30,
MaxOpenConns: 100,
MaxIdleConns: 100,
Path: "tmp/%s.log",
}
db.InitDB(&dbcfg)
cache.NewRedis("127.0.0.1:6379")

AutoCachingTop100(db.Db)
}

+ 66
- 0
app/task/svc/svc_egg_energy_virtual_coin_flow.go View File

@@ -0,0 +1,66 @@
package svc

import (
"applet/app/db"
"applet/app/utils/cache"
"fmt"
"time"
"xorm.io/xorm"
)

func EggEnergyCoinFlow(engine *xorm.Engine) {
fmt.Println(">>>>>>>>>>>>>>>>>>EggEnergyCoinFlow-----<<<<<<<<<<<<<<<<<<<")
var now = time.Now()
year, month, day := now.Date()
today := time.Date(year, month, day, 0, 0, 0, 0, time.Local)
thisWeek := today.AddDate(0, 0, -int(today.Weekday())+1).Format("2006-01-02")
thisMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Format("2006-01-02")

//判断今天是否清空
getString1, _ := cache.GetString("user_virtual_coin_flow_aggregation_today_data")
if getString1 != today.Format("2006-01-02") {
//go func() {
sql := "UPDATE `user_virtual_coin_flow_aggregation` SET today_data= '0.00'"
_, err := db.ExecuteOriginalSql(engine.Where(""), sql)
if err != nil {
fmt.Println("err:>>>>>>>>", err.Error())
return
}
fmt.Println("success:update:day>>>>>>>>")
cache.Set("user_virtual_coin_flow_aggregation_today_data", today.Format("2006-01-02"))
//}()
}

//判断本周是否清空
getString2, _ := cache.GetString("user_virtual_coin_flow_aggregation_this_week_data")
if getString2 != today.Format("2006-01-02") && thisWeek == today.Format("2006-01-02") {
//go func() {
sql := "UPDATE `user_virtual_coin_flow_aggregation` SET this_week_data= '0.00'"
_, err := db.ExecuteOriginalSql(engine.Where(""), sql)
if err != nil {
fmt.Println("err:>>>>>>>>", err.Error())
return
}
fmt.Println("success:update:week>>>>>>>>")
cache.Set("user_virtual_coin_flow_aggregation_this_week_data", today.Format("2006-01-02"))
//}()
}

//判断本月是否清空
getString3, _ := cache.GetString("user_virtual_coin_flow_aggregation_this_month_data")
if getString3 != today.Format("2006-01-02") && thisMonth == today.Format("2006-01-02") {
//go func() {
sql := "UPDATE `user_virtual_coin_flow_aggregation` SET this_month_data= '0.00'"
_, err := db.ExecuteOriginalSql(engine.Where(""), sql)
if err != nil {
fmt.Println("err:>>>>>>>>", err.Error())
return
}
fmt.Println("success:update:month>>>>>>>>")
cache.Set("user_virtual_coin_flow_aggregation_this_month_data", today.Format("2006-01-02"))
//}()
}

fmt.Println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>EggEnergyCoinFlow----end<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<")
return
}

+ 22
- 0
app/task/task_auto_caching_top100.go View File

@@ -0,0 +1,22 @@
package task

import (
"applet/app/task/svc"
"math/rand"
"time"
"xorm.io/xorm"
)

func taskAutoCachingTop100(eg *xorm.Engine) {
for {
if len(ch) > workerNum {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
} else {
goto START
}
}
START:
ch <- 1
svc.AutoCachingTop100(eg)
<-ch
}

+ 22
- 0
app/task/task_egg_energy_virtual_coin_flow.go View File

@@ -0,0 +1,22 @@
package task

import (
"applet/app/task/svc"
"math/rand"
"time"
"xorm.io/xorm"
)

func taskEggEnergyCoinFlow(eg *xorm.Engine) {
for {
if len(ch) > workerNum {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
} else {
goto START
}
}
START:
ch <- 1
svc.EggEnergyCoinFlow(eg)
<-ch
}

Loading…
Cancel
Save