InfluxDB 2.x: Task: Aggregation

Aus Wiki-WebPerfect
Version vom 3. Februar 2022, 09:52 Uhr von Admin (Diskussion | Beiträge)

(Unterschied) ← Nächstältere Version | Aktuelle Version (Unterschied) | Nächstjüngere Version → (Unterschied)
Wechseln zu: Navigation, Suche

Sometimes you want to aggregate a heavy amount of data, for example for a Grafana graph panel, but the query is very slow because of the amount of metrics (Over millions, in scenario I have over 6 millions metrics to aggregate).
Then it is more efficient to create an InfluxDB Task that aggregates the data and save the results back to an bucket instead of calculate the result each time the query runs.

Example: Sum of all storage

What I want

  • Create a graph panel that shows the sum of all used storage we have over time.
  • Because the storage is shared, each node in the same cluster collects the same storage information's -> That's why I have to dedup the data (unique()).
  • Because I write the aggregated data back to the same bucket, I renamed the _measurement of the aggregated data (adding suffix "_agg-sum") to be able to distinguish the data.


How can I achieve this?

  • Because I want also the historical data, I must first aggregate the data over a bigger time window (run the query manually).
  • The sampling frequency in my bucket is one metric point per each hour. So that's why I also want to aggregate the data with an sampling frequency of one hour.


Some explanation about my data

  • telegraf_90d = is the source and destination bucket with an retention of 90 days (bucket).
  • cluster_csv = is the original measurement of the source data (_measurement).
  • cluster_csv_agg-sum = is the new measurement for the aggregated data (_measurement).
  • host = is the node that collects the storage information, in our case the Hyper-V node (tag).
  • FileSystemLabel = is the name of the storage (tag).


Manual query to aggregate the historical data

//define variables
bucket = "telegraf_90d"
window = 1h

customsum = (tables=<-, column="_value") => 
  tables
    |> drop(columns: ["host"])
    |> unique(column: "FileSystemLabel")
    |> drop(columns: ["FileSystemLabel"]) 
    |> sum(column)


from(bucket: bucket)
  |> range(start: -90d, stop: v.timeRangeStop)
  |> filter(fn: (r) => 
    r._measurement == "cluster_csv" and
    r._field  == "SizeUsed" and 
    exists r.FileSystemLabel
  )
  |> aggregateWindow(every: window, fn: customsum, createEmpty: true)
  |> fill(column: "_value", usePrevious: true)
  |> toInt()
  |> set(key: "_measurement", value: "cluster_csv_agg-sum")
  |> to(
    bucket: "telegraf_90d"
  )


InfluxDB Task (runs scheduled)

option task = {name: "task_telegraf_90d-cluster_csv_agg-sum", every: 1h}

customsum = (tables=<-, column="_value") =>
	(tables
		|> drop(columns: ["host"])
		|> unique(column: "FileSystemLabel")
		|> drop(columns: ["FileSystemLabel"])
		|> sum(column))
data = from(bucket: "telegraf_90d")
	|> range(start: -duration(v: int(v: task.every) * 2))
	|> filter(fn: (r) =>
		(r._measurement == "cluster_csv" and r._field == "SizeUsed" and exists r.FileSystemLabel))

data
	|> aggregateWindow(every: 1h, fn: customsum, createEmpty: true)
	|> fill(column: "_value", usePrevious: true)
	|> toInt()
	|> set(key: "_measurement", value: "cluster_csv_agg-sum")
	|> to(bucket: "telegraf_90d")