From 184bc34623f2be8cf350a9aa307045c1e693e207 Mon Sep 17 00:00:00 2001 From: shenjiachi Date: Wed, 25 Dec 2024 10:25:26 +0800 Subject: [PATCH] init task caching top100 --- app/svc/egg_energy/svc_egg_energy.go | 107 ++++++++++++++++++++++ app/svc/egg_energy/svc_egg_energy_test.go | 2 +- app/task/init.go | 1 + app/task/md/cron_key.go | 1 + app/task/task_auto_caching_top100.go | 22 +++++ 5 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 app/task/task_auto_caching_top100.go diff --git a/app/svc/egg_energy/svc_egg_energy.go b/app/svc/egg_energy/svc_egg_energy.go index b88260f..d62586c 100644 --- a/app/svc/egg_energy/svc_egg_energy.go +++ b/app/svc/egg_energy/svc_egg_energy.go @@ -325,6 +325,113 @@ func StatisticsUserEggEcpmRange(esIndexName string) (resp []map[string]string, e return resp, nil } +func StatisticsUserEggEcpmRange1(esIndexName string) (resp []map[string]string, err error) { + resp = []map[string]string{} + var result *elastic.SearchResult + // 创建百分位数聚合 + percentBoolQuery := elastic.NewRangeQuery("ecpm").Gt(0) + percentilesAgg := elastic.NewPercentilesAggregation().Field("ecpm").Percentiles([]float64{5, 90}...) + // 执行查询 + response, err := es.EsClient.Search(). + Index(esIndexName). + Query(percentBoolQuery). + Aggregation("percentiles_agg", percentilesAgg). + Size(0). + Do(context.Background()) + if err != nil { + // 处理错误 + return resp, err + } + // 获取百分位数结果 + percentiles, found := response.Aggregations.Percentiles("percentiles_agg") + if !found { + return resp, err + } + // 获取第10和第90百分位数 + p5 := percentiles.Values["5.0"] + p90 := percentiles.Values["90.0"] + boolQuery := elastic.NewRangeQuery("ecpm").Gte(p5).Lte(p90) + + // 1、 创建 extended_stats 高级统计 + aggs := elastic.NewExtendedStatsAggregation().Field("ecpm") + result, err = es.EsClient.Search(). + Index(esIndexName). + TrackTotalHits(true). + Query(boolQuery). // 设置查询条件 + Aggregation("result", aggs). // 设置聚合条件,并为聚合条件设置一个名字 + Size(0). // 设置分页参数 - 每页大小,设置为0,代表不返回搜索结果,仅返回聚合分析结果 + Pretty(true). // 返回可读的json格式 + Do(context.Background()) + if err != nil { + return resp, err + } + + // 使用Cardinality函数和前面定义的聚合条件名称,查询结果 + agg, found := result.Aggregations.ExtendedStats("result") + if !found { + // 打印结果,注意:这里使用的是取值运算符 + return resp, errors.New("未聚合出数据") + } + + if agg.StdDeviation != nil && *agg.StdDeviation != 0 { + //2、histogram 直方图聚合 + stdDeviation := *agg.StdDeviation //标准平方差 + min := *agg.Min //最小值 + max := *agg.Max //最大值 + avg := *agg.Avg //平均数 + totalCount := agg.Count //总数 + discreteCoefficient := stdDeviation / avg //离散系数 + if stdDeviation < avg { + //TODO::如果标准差相对较小,而平均值较大,离散系数可能会导致较大的间隔,直接使用标准差或其他更合适的值作为间隔。 + discreteCoefficient = stdDeviation + } + newAggs := elastic.NewHistogramAggregation().Field("ecpm").Interval(discreteCoefficient).MinDocCount(1).ExtendedBounds(min, max) + searchResult, err := es.EsClient.Search(). + Index(esIndexName). + TrackTotalHits(true). + Query(boolQuery). // 设置查询条件 + Aggregation("result", newAggs). // 设置聚合条件,并为聚合条件设置一个名字 + Size(0). // 设置分页参数 - 每页大小,设置为0,代表不返回搜索结果,仅返回聚合分析结果 + Pretty(true). // 返回可读的json格式 + Do(context.Background()) + if err != nil { + return resp, err + } + // 使用Terms函数和前面定义的聚合条件名称,查询结果 + newAgg, found := searchResult.Aggregations.Histogram("result") + if !found { + return resp, errors.New("未聚合出数据") + } + + // 3、组装数据 + var keys []string + var values []int64 + for _, bucket := range newAgg.Buckets { + // 每一个桶都有一个key值,其实就是分组的值,可以理解为SQL的group by值 + bucketValue := bucket.Key + keys = append(keys, utils.Float64ToStr(bucketValue)) + values = append(values, bucket.DocCount) + } + + for k, v := range keys { + value, _ := Operate(values[k], totalCount) + if k+1 == len(keys) { + resp = append(resp, map[string]string{ + "key": v + " ~ " + utils.Float64ToStr(max) + " 元", + "value": utils.Float64ToStrPrec10(value), + }) + } else { + resp = append(resp, map[string]string{ + "key": v + " ~ " + keys[k+1] + " 元", + "value": utils.Float64ToStrPrec10(value), + }) + } + } + } + fmt.Println(resp) + return resp, nil +} + // StatisticsUserEggInviteUserNumsRange 统计用户"拉新人数"范围 func StatisticsUserEggInviteUserNumsRange(esIndexName string) (resp []map[string]string, err error) { resp = []map[string]string{} diff --git a/app/svc/egg_energy/svc_egg_energy_test.go b/app/svc/egg_energy/svc_egg_energy_test.go index f5bde40..d6917e7 100644 --- a/app/svc/egg_energy/svc_egg_energy_test.go +++ b/app/svc/egg_energy/svc_egg_energy_test.go @@ -11,7 +11,7 @@ func TestStatisticsUserEggEcpmRange(t *testing.T) { es.Init("http://120.55.48.175:9200", "elastic", "fnuo123") var tempResp md.StatisticsEggPointResp var err error - tempResp.StatisticsUserEggEcpmRange, err = StatisticsUserEggEcpmRange("egg_energy_user_egg_score_202451") + tempResp.StatisticsUserEggEcpmRange, err = StatisticsUserEggEcpmRange1("egg_energy_user_egg_score_202451") if err != nil { fmt.Println(err) } diff --git a/app/task/init.go b/app/task/init.go index 7bacc70..f4b6fea 100644 --- a/app/task/init.go +++ b/app/task/init.go @@ -101,4 +101,5 @@ func initTasks() { jobs[taskMd.CornEggEnergyAutoUpdateUserAccess] = taskEggEnergyAutoUpdateUserAccess // es蛋蛋分记录-自动更新访问次数 jobs[taskMd.CornEggEnergyAutoDeleteTableAdvertisingCallback] = taskAutoDeleteTableAdvertisingCallback // 定时清除七天以前的广告回调数据 jobs[taskMd.CornEggEnergyCoinFlow] = taskEggEnergyCoinFlow // 定时重置用户流水聚合信息 + jobs[taskMd.CornEggEnergyAutoCachingTop100] = taskAutoCachingTop100 // 英雄榜缓存前一百数据 } diff --git a/app/task/md/cron_key.go b/app/task/md/cron_key.go index eabbe93..900335c 100644 --- a/app/task/md/cron_key.go +++ b/app/task/md/cron_key.go @@ -15,4 +15,5 @@ const ( CornEggEnergyAutoUpdateUserAccess = "cron_egg_energy_auto_update_user_access" // 同步用户访问次数到es CornEggEnergyAutoDeleteTableAdvertisingCallback = "cron_egg_energy_auto_delete_table_advertising_callback" // 同步用户访问次数到es CornEggEnergyCoinFlow = "cron_egg_energy_coin_flow" // 定时重置用户流水聚合信息 + CornEggEnergyAutoCachingTop100 = "cron_egg_energy_auto_caching_top100" // 英雄榜缓存前一百数据 ) diff --git a/app/task/task_auto_caching_top100.go b/app/task/task_auto_caching_top100.go new file mode 100644 index 0000000..8b39fe0 --- /dev/null +++ b/app/task/task_auto_caching_top100.go @@ -0,0 +1,22 @@ +package task + +import ( + "applet/app/task/svc" + "math/rand" + "time" + "xorm.io/xorm" +) + +func taskAutoCachingTop100(eg *xorm.Engine) { + for { + if len(ch) > workerNum { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000))) + } else { + goto START + } + } +START: + ch <- 1 + svc.AutoCachingTop100(eg) + <-ch +}