Add ooo native histogram ingestion by SungJin1212 · Pull Request #6626 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3526,6 +3526,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-exemplars
[max_exemplars: <int> | default = 0]

# [Experimental] Enable out-of-order native histogram ingestion, it only takes
# effect when -blocks-storage.tsdb.enable-native-histograms=true and
# -ingester.out-of-order-time-window > 0. It is applied after the restart if it
# is changed at runtime through the runtime config.
# CLI flag: -ingester.enable-ooo-native-histograms
[enable_ooo_native_histograms: <boolean> | default = false]

# Maximum number of chunks that can be fetched in a single query from ingesters
# and long-term storage. This limit is enforced in the querier, ruler and
# store-gateway. 0 to disable.
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Currently experimental features are:
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
- `-ingester.out-of-order-time-window` (duration) CLI flag
- `out_of_order_time_window` (duration) field in runtime config file
- `enable_ooo_native_histograms` (bool) field in runtime config file
- Store Gateway Zone Stable Shuffle Sharding
- `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag
- `zone_stable_shuffle_sharding` (boolean) field in config file
Expand Down
79 changes: 79 additions & 0 deletions integration/native_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/require"
Expand All @@ -19,6 +20,84 @@ import (
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func TestOOONativeHistogramIngestion(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())

flags := mergeFlags(baseFlags, map[string]string{
// ooo setting
"-ingester.enable-ooo-native-histograms": "true",
"-blocks-storage.tsdb.enable-native-histograms": "true",
"-ingester.out-of-order-time-window": "5m",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// consul
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
})

nowTs := time.Now()
oooTs := time.Now().Add(-time.Minute * 3)
tooOldTs := time.Now().Add(-time.Minute * 10)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

seriesName := "series"
histogramIdx := rand.Uint32()

// Make Cortex client
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push now ts
seriesNow := e2e.GenerateHistogramSeries(seriesName, nowTs, histogramIdx, false, prompb.Label{Name: "job", Value: "test"})
res, err := c.Push(seriesNow)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Push ooo ts
seriesOOOTs := e2e.GenerateHistogramSeries(seriesName, oooTs, histogramIdx, false, prompb.Label{Name: "job", Value: "test"})
res, err = c.Push(seriesOOOTs)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Push too old ts
seriesTooOOOTs := e2e.GenerateHistogramSeries(seriesName, tooOldTs, histogramIdx, false, prompb.Label{Name: "job", Value: "test"})
res, err = c.Push(seriesTooOOOTs)
require.NoError(t, err)
require.Equal(t, 400, res.StatusCode)

// check metrics
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_tsdb_head_samples_appended_total"),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "histogram")),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")),
)
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_tsdb_head_out_of_order_samples_appended_total"),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "histogram")),
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")),
)

require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_ingested_native_histograms_total"))
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_ingested_native_histograms_failures_total"))
}

func TestNativeHistogramIngestionAndQuery(t *testing.T) {
const blockRangePeriod = 5 * time.Second

Expand Down
4 changes: 4 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
case errors.Is(cause, histogram.ErrHistogramCountMismatch):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrOOONativeHistogramsDisabled):
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

default:
rollback = true
}
Expand Down Expand Up @@ -2421,6 +2424,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
OutOfOrderTimeWindow: time.Duration(oooTimeWindow).Milliseconds(),
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
EnableOOONativeHistograms: i.limits.EnableOOONativeHistograms(userID),

@yeya24 yeya24 Mar 5, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add hot reload because a user can enable or disable ooo native histogram during runtime.

But EnableOOONativeHistograms config is only specified once when creating the TSDB? So I wonder if it makes sense to make EnableOOONativeHistograms an runtime config as there is nothing can be changed during runtime

@SungJin1212 SungJin1212 Mar 6, 2025

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the reloader is necessary to change those configs at runtime.
I intended to configure EnableOOONativeHistograms as per tenant, just like OutOfOrderTimeWindow.

@yeya24 yeya24 Mar 6, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of order time window is a hot reloadable config.

I guess it is probably the same reason why we didn't make enable native histogram a runtime config.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we go with the global config for now?

EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
BlockChunkQuerierFunc: i.blockChunkQuerierFunc(userID),
Expand Down
182 changes: 182 additions & 0 deletions pkg/ingester/ingester_test.go
Loading