Browse Source

init task caching top100

master
shenjiachi 1 day ago
parent
commit
184bc34623
5 changed files with 132 additions and 1 deletions
  1. +107
    -0
      app/svc/egg_energy/svc_egg_energy.go
  2. +1
    -1
      app/svc/egg_energy/svc_egg_energy_test.go
  3. +1
    -0
      app/task/init.go
  4. +1
    -0
      app/task/md/cron_key.go
  5. +22
    -0
      app/task/task_auto_caching_top100.go

+ 107
- 0
app/svc/egg_energy/svc_egg_energy.go View File

@@ -325,6 +325,113 @@ func StatisticsUserEggEcpmRange(esIndexName string) (resp []map[string]string, e
return resp, nil
}

func StatisticsUserEggEcpmRange1(esIndexName string) (resp []map[string]string, err error) {
resp = []map[string]string{}
var result *elastic.SearchResult
// 创建百分位数聚合
percentBoolQuery := elastic.NewRangeQuery("ecpm").Gt(0)
percentilesAgg := elastic.NewPercentilesAggregation().Field("ecpm").Percentiles([]float64{5, 90}...)
// 执行查询
response, err := es.EsClient.Search().
Index(esIndexName).
Query(percentBoolQuery).
Aggregation("percentiles_agg", percentilesAgg).
Size(0).
Do(context.Background())
if err != nil {
// 处理错误
return resp, err
}
// 获取百分位数结果
percentiles, found := response.Aggregations.Percentiles("percentiles_agg")
if !found {
return resp, err
}
// 获取第10和第90百分位数
p5 := percentiles.Values["5.0"]
p90 := percentiles.Values["90.0"]
boolQuery := elastic.NewRangeQuery("ecpm").Gte(p5).Lte(p90)

// 1、 创建 extended_stats 高级统计
aggs := elastic.NewExtendedStatsAggregation().Field("ecpm")
result, err = es.EsClient.Search().
Index(esIndexName).
TrackTotalHits(true).
Query(boolQuery). // 设置查询条件
Aggregation("result", aggs). // 设置聚合条件,并为聚合条件设置一个名字
Size(0). // 设置分页参数 - 每页大小,设置为0,代表不返回搜索结果,仅返回聚合分析结果
Pretty(true). // 返回可读的json格式
Do(context.Background())
if err != nil {
return resp, err
}

// 使用Cardinality函数和前面定义的聚合条件名称,查询结果
agg, found := result.Aggregations.ExtendedStats("result")
if !found {
// 打印结果,注意:这里使用的是取值运算符
return resp, errors.New("未聚合出数据")
}

if agg.StdDeviation != nil && *agg.StdDeviation != 0 {
//2、histogram 直方图聚合
stdDeviation := *agg.StdDeviation //标准平方差
min := *agg.Min //最小值
max := *agg.Max //最大值
avg := *agg.Avg //平均数
totalCount := agg.Count //总数
discreteCoefficient := stdDeviation / avg //离散系数
if stdDeviation < avg {
//TODO::如果标准差相对较小,而平均值较大,离散系数可能会导致较大的间隔,直接使用标准差或其他更合适的值作为间隔。
discreteCoefficient = stdDeviation
}
newAggs := elastic.NewHistogramAggregation().Field("ecpm").Interval(discreteCoefficient).MinDocCount(1).ExtendedBounds(min, max)
searchResult, err := es.EsClient.Search().
Index(esIndexName).
TrackTotalHits(true).
Query(boolQuery). // 设置查询条件
Aggregation("result", newAggs). // 设置聚合条件,并为聚合条件设置一个名字
Size(0). // 设置分页参数 - 每页大小,设置为0,代表不返回搜索结果,仅返回聚合分析结果
Pretty(true). // 返回可读的json格式
Do(context.Background())
if err != nil {
return resp, err
}
// 使用Terms函数和前面定义的聚合条件名称,查询结果
newAgg, found := searchResult.Aggregations.Histogram("result")
if !found {
return resp, errors.New("未聚合出数据")
}

// 3、组装数据
var keys []string
var values []int64
for _, bucket := range newAgg.Buckets {
// 每一个桶都有一个key值,其实就是分组的值,可以理解为SQL的group by值
bucketValue := bucket.Key
keys = append(keys, utils.Float64ToStr(bucketValue))
values = append(values, bucket.DocCount)
}

for k, v := range keys {
value, _ := Operate(values[k], totalCount)
if k+1 == len(keys) {
resp = append(resp, map[string]string{
"key": v + " ~ " + utils.Float64ToStr(max) + " 元",
"value": utils.Float64ToStrPrec10(value),
})
} else {
resp = append(resp, map[string]string{
"key": v + " ~ " + keys[k+1] + " 元",
"value": utils.Float64ToStrPrec10(value),
})
}
}
}
fmt.Println(resp)
return resp, nil
}

// StatisticsUserEggInviteUserNumsRange 统计用户"拉新人数"范围
func StatisticsUserEggInviteUserNumsRange(esIndexName string) (resp []map[string]string, err error) {
resp = []map[string]string{}


+ 1
- 1
app/svc/egg_energy/svc_egg_energy_test.go View File

@@ -11,7 +11,7 @@ func TestStatisticsUserEggEcpmRange(t *testing.T) {
es.Init("http://120.55.48.175:9200", "elastic", "fnuo123")
var tempResp md.StatisticsEggPointResp
var err error
tempResp.StatisticsUserEggEcpmRange, err = StatisticsUserEggEcpmRange("egg_energy_user_egg_score_202451")
tempResp.StatisticsUserEggEcpmRange, err = StatisticsUserEggEcpmRange1("egg_energy_user_egg_score_202451")
if err != nil {
fmt.Println(err)
}


+ 1
- 0
app/task/init.go View File

@@ -101,4 +101,5 @@ func initTasks() {
jobs[taskMd.CornEggEnergyAutoUpdateUserAccess] = taskEggEnergyAutoUpdateUserAccess // es蛋蛋分记录-自动更新访问次数
jobs[taskMd.CornEggEnergyAutoDeleteTableAdvertisingCallback] = taskAutoDeleteTableAdvertisingCallback // 定时清除七天以前的广告回调数据
jobs[taskMd.CornEggEnergyCoinFlow] = taskEggEnergyCoinFlow // 定时重置用户流水聚合信息
jobs[taskMd.CornEggEnergyAutoCachingTop100] = taskAutoCachingTop100 // 英雄榜缓存前一百数据
}

+ 1
- 0
app/task/md/cron_key.go View File

@@ -15,4 +15,5 @@ const (
CornEggEnergyAutoUpdateUserAccess = "cron_egg_energy_auto_update_user_access" // 同步用户访问次数到es
CornEggEnergyAutoDeleteTableAdvertisingCallback = "cron_egg_energy_auto_delete_table_advertising_callback" // 同步用户访问次数到es
CornEggEnergyCoinFlow = "cron_egg_energy_coin_flow" // 定时重置用户流水聚合信息
CornEggEnergyAutoCachingTop100 = "cron_egg_energy_auto_caching_top100" // 英雄榜缓存前一百数据
)

+ 22
- 0
app/task/task_auto_caching_top100.go View File

@@ -0,0 +1,22 @@
package task

import (
"applet/app/task/svc"
"math/rand"
"time"
"xorm.io/xorm"
)

func taskAutoCachingTop100(eg *xorm.Engine) {
for {
if len(ch) > workerNum {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
} else {
goto START
}
}
START:
ch <- 1
svc.AutoCachingTop100(eg)
<-ch
}

Loading…
Cancel
Save