🚿 Streaming Aggregation

This document explains how to setup streaming aggregation pipelines in Levitate.

Why Streaming Aggregation?

In a regular scenario, data is written to the store as it comes in. And once it's queried, the querier fetches the data, partitions it by time and groupers (if specified), and runs a mathematical operator to return the results. This is done every time the query is run.

Regular Query flow

Example for a query: sum by (type) (metric{}[3m])

Default Query evaluation flow

Default Query evaluation flow

Scanning large and massive amounts of data can be slow and resource-consuming. It can slow down queries and overload the entire system, which can only handle a limited number of parallel queries.

Pre-Aggregated Data

There are a few ways to expedite and reduce this load.

  • Save and reuse the repeated queries, and only fetch the missing parts. This is called Delta-Caching and is already enabled for all Levitate clusters.
  • Keep increasing the resources to do computation and aggregation. Not recommended, otherwise you won't be here. 😊
  • Use materialized views like constructs to rely on pre-aggregation.

When you enable streaming aggregates in Levitate, the most expensive queries are combined ahead of time, and the resulting metric is saved in the storage so that it can be used as any other time series.

Here's the query we had earlier asked for sum by (type) (metric{}[3m]); If it is pre-aggregated the query flow will be as follows.

Flow for Pre-Aggregated queries

Flow for Pre-Aggregated queries

Tumbling Window Aggregation

We use Tumbling Windows to represent the stream's consistent and separate time intervals. First, we group all metrics into buckets of n minutes, which we specify using the[n] in the aggregation and then group them by metric name + labels (or as properties in case of events).

For instance, the samples with timestamp values\[0:00:00-0:01:00) are in the first period. Samples with timestamp values \[0:01:00-0:02:00) are in the second period. And so on.

Tumbling windows in action

Tumbling windows in action

Streaming Aggregation Workflow

Streaming aggregation is based on a GitOps-based flow where you can add definitions for aggregated queries which will be sent to Levitate via automatic CI/CD workflow. The Last9 team will provide the GitHub repository with predefined CI/CD workflows.

The step-by-step process to add new aggregation as is follows:

Define the PromQL for the aggregation

First, identify the target metric that you want to aggregate using the Explore tab in Embedded Grafana.

We break down the metric with near-infinite cardinality into a query specific to our mission. For example, we sum the metric by stack in this case.

sum by (stack) (http_requests_duration_seconds_count{service="pushnotifs"}[1m])

The resultant time series running above PromQL is much less in cardinality and thus can help us if we save its result instead of the original data.

Adding the aggregation in the pipeline

Let's attempt to save this as a streaming aggregate now by adding it to the YAML file for the Levitate cluster in which you are trying to apply the streaming
aggregation.

- promql: 'sum by (stack, le) (http_requests_duration_seconds_bucket{service="pushnotifs"}[2m])'
  as: pushnotifs_http_requests_duration:2m

Levitate will emit the data as a new metric defined in the field. In this example, the new metric will be named pushnotifs_http_requests_duration:2m.

Connecting the dots

Create a Pull request to the GitHub repository with the updated rules.

Pull Request with new streaming aggregate

Wait for the tests to pass to ensure that the streaming aggregations do not contain any syntax errors.

Merge the Pull Request for the CI/CD workflow to come into action and the pipeline to get activated on Levitate.

The new metric will then be available in the Levitate cluster, for which it was added in the Github repository.

🚧

Please note that streaming aggregations are limited to 3,000,000 per hour. Therefore, if a streaming aggregate generates more data than that, it will be skipped.

The new metrics can be queried in the main lake.

Achieving Complex Pipelines like Histograms

Effectively, a Histogram requires three metrics:

  • <metric_name>_bucket
  • <metric_name>_sum
  • <metric_name>_count

They will work if the three can be streamed in the exact alignment.

Here’s the previous example, again changed to accommodate a Histogram.

📘

Instead of sum, please use the sum2 operator when dealing with counters. This aggregator is aware of counters and the fluctuations that can happen during a reset.

And we can execute histogram_quantile on the streamed metric.

📘

A similar technique can also be used to roll up the data. Like in some of the above examples, we have chosen to sum over [2m] instead of [1m].

Supported Functions

Supported aggregation functions that are available in Streaming Aggregation are as follows.

Function NameDescription
sumTotal to be used for other metric types
countA count of the number of samples.
maxThe Maximum value of the samples
sum2Sum, but for counters and reset awareness.
increaseThe increase in counter value.
minThe minimum value of the samples
avgThe average value of the samples.

Troubleshooting

Please get in touch with us on Discord or Email if you have any questions.