es通过时间聚合查询一周中每天的数据平均值

场景回顾:设备上传的数据保存在es中,大屏模块要统计本周的数据折线图(一个设备三分总上传一次,所以拟定每天聚合求个平均值)

kibana查询请求

GET xxxx_2022-10/_search
{
  "size": 0,
  "query": {
    "bool":{
      "must": [
        {"term": {
          "deviceId": {
            "value": "xxx"
          }
        }},
        {
          "term":{
          "property":{
            "value":"temperature"
          }
        }
        }
      ],
      "filter": {
        "range": {
          "createTime": {
            "gte": 1664726400000,   // 2022-10-03 00:00:00 ————本周开始时间
            "lt": 1665331200000     // 2022-10-10 00:00:00 ————下周开始时间
          }
        }
      }
    }
  },
    "aggs": {
    "create_date":{
      "date_histogram": {
        "field": "createTime",
        "calendar_interval": "day",
        "time_zone": "Asia/Shanghai",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
         // 强制返回指定范围内的每一个桶,包括min和max,这时就会将max作为一个key
         // 所以我这里只设置的最小时间,没有设计结束时间
        "extended_bounds": {
          "min": "1664726400000"
        }
      },
        "aggs": {
          "dayNumValAvg": {
            "avg": {
              "field": "numberValue"
            }
          }
        }
    }
  }
}

存在的问题

返回的桶不在extended_bounds设置的范围内:

原本只需要 [10.03日0点,10.10日零点)中每一天的聚合,使用extended_bounds设定min为1664726400000(2022-10-03 00:00:00),但是返回的第一个却为” 2022-10-02 00:00:00

投机取巧的解法,offset设置为” +24h“,后来把数据挨个加了遍,这样求出来的数据没有问题……

使用脚本实现该需求

没有指定时区,可能存在偏差
点击展开代码

POST xxx_2022-10/_search?size=0
{
  "size": 0,
  "query": {
    "bool":{
      "must": [
        {"term": {
          "deviceId": {
            "value": "xxx"
          }
        }},
        {
          "term":{
          "property":{
            "value":"temperature"
          }
        }
        }
      ],
      "filter": {
        "range": {
          "createTime": {
            "gte": 1664726400000,
            "lt": 1665331200000
          }
        }
      }
    }
  },
  "aggs": {
    "dayOfWeek": {
        "terms": {
            "script": {
                "lang": "painless",
                "source": "doc['createTime'].value.dayOfWeekEnum.value"
            }
        },
        "aggs": {
          "dayNumValAvg": {
            "avg": {
              "field": "numberValue"
            }
          }
        }
    }
  }
}

对应Java实现

Long[] timeScope = {1664726400000L, 1665331200000L};
String deviceId = "xxxx";
String property = "temperature";
String index = "xxxx";

DateHistogramAggregationBuilder aggCreateTime = AggregationBuilders.dateHistogram("createTimesGroup")
    .field("createTime").timeZone(ZoneId.of("Asia/Shanghai")).calendarInterval(DateHistogramInterval.DAY)
    .offset("+24h").extendedBounds(new ExtendedBounds(timeScope[0], null));
aggCreateTime.subAggregation(AggregationBuilders.avg("aggNumVal").field("numberValue"));
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.matchQuery("deviceId", deviceId))
    .must(QueryBuilders.matchQuery("property", property));
boolQueryBuilder.filter(
    QueryBuilders.rangeQuery("createTime").gte(timeScope[0]).lt(timeScope[1]));
SearchSourceBuilder thisWeekSearch = new SearchSourceBuilder().size(0).query(boolQueryBuilder).aggregation(aggCreateTime);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(thisWeekSearch);
SearchResponse response = null;
try {
    response = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
    e.printStackTrace();
}

// 这里是根据响应一步步拆出来的 如果有更好的方法还望不吝指教
int capacity = 7;
Double[] aggValues = new Double[capacity];
Aggregations aggregations = response.getAggregations();
ParsedDateHistogram dateHistogram = (ParsedDateHistogram) aggregations.asList().get(0);
List buckets = dateHistogram.getBuckets();
for (int i = 0; i < capacity; i++) {
    if (i < buckets.size()) {
        Histogram.Bucket bucket = buckets.get(i);
        ParsedSingleValueNumericMetricsAggregation aggValue = bucket.getAggregations().get("aggNumVal");
        aggValues[i] = aggValue.value();
        if(Double.isNaN(aggValues[i]) || Double.isInfinite(aggValues[i])){
            aggValues[i] = 0.0D;
        }
    } else {
        aggValues[i] = 0.0D;
    }
}
System.out.println(Arrays.toString(aggValues));

Original: https://www.cnblogs.com/daydreamer-fs/p/16810836.html
Author: fogey
Title: es通过时间聚合查询一周中每天的数据平均值

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713546/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球