Support remote write v2 by converting request by SungJin1212 · Pull Request #6330 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-build-deploy.yml
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
* [FEATURE] Ingester/StoreGateway: Add `ResourceMonitor` module in Cortex, and add `ResourceBasedLimiter` in Ingesters and StoreGateways. #6674
* [FEATURE] Support Prometheus remote write 2.0. #6330
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
* [FEATURE] Ruler: Add support for group labels. #6665
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3140,6 +3140,11 @@ ha_tracker:
# CLI flag: -distributor.use-stream-push
[use_stream_push: <boolean> | default = false]

# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
# request.
# CLI flag: -distributor.remote-writev2-enabled
[remote_write2_enabled: <boolean> | default = false]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Currently experimental features are:
- Distributor:
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
- Accept Prometheus remote write 2.0 request (`-distributor.remote-writev2-enabled=true`)
- Tenant Deletion in Purger, for blocks storage.
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
- Blocks storage bucket index
Expand Down
115 changes: 115 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
Expand Down Expand Up @@ -423,3 +424,117 @@ func CreateBlock(

return id, nil
}

func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(ts)

st := writev2.NewSymbolTable()
lb := labels.NewScratchBuilder(0)
lb.Add("__name__", name)
for _, lbl := range additionalLabels {
lb.Add(lbl.Name, lbl.Value)
}

var (
h *histogram.Histogram
fh *histogram.FloatHistogram
ph writev2.Histogram
)
if floatHistogram {
fh = tsdbutil.GenerateTestFloatHistogram(int64(i))
ph = writev2.FromFloatHistogram(tsMillis, fh)
} else {
h = tsdbutil.GenerateTestHistogram(int64(i))
ph = writev2.FromIntHistogram(tsMillis, h)
}

// Generate the series
series = append(series, writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
Histograms: []writev2.Histogram{ph},
})

symbols = st.Symbols()

return
}

func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
tsMillis := TimeToMilliseconds(ts)
value := rand.Float64()

st := writev2.NewSymbolTable()
lb := labels.NewScratchBuilder(0)
lb.Add("__name__", name)

for _, label := range additionalLabels {
lb.Add(label.Name, label.Value)
}
series = append(series, writev2.TimeSeries{
// Generate the series
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
Samples: []writev2.Sample{
{Value: value, Timestamp: tsMillis},
},
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
})
symbols = st.Symbols()

// Generate the expected vector when querying it
metric := model.Metric{}
metric[labels.MetricName] = model.LabelValue(name)
for _, lbl := range additionalLabels {
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}

vector = append(vector, &model.Sample{
Metric: metric,
Value: model.SampleValue(value),
Timestamp: model.Time(tsMillis),
})

return
}

func GenerateV2SeriesWithSamples(
name string,
startTime time.Time,
scrapeInterval time.Duration,
startValue int,
numSamples int,
additionalLabels ...prompb.Label,
) (symbols []string, series writev2.TimeSeries) {
tsMillis := TimeToMilliseconds(startTime)
durMillis := scrapeInterval.Milliseconds()

st := writev2.NewSymbolTable()
lb := labels.NewScratchBuilder(0)
lb.Add("__name__", name)

for _, label := range additionalLabels {
lb.Add(label.Name, label.Value)
}

startTMillis := tsMillis
samples := make([]writev2.Sample, numSamples)
for i := 0; i < numSamples; i++ {
scrapeJitter := rand.Int63n(10) + 1 // add a jitter to simulate real-world scenarios, refer to: https://github.com/prometheus/prometheus/issues/13213
samples[i] = writev2.Sample{
Timestamp: startTMillis + scrapeJitter,
Value: float64(i + startValue),
}
startTMillis += durMillis
}

series = writev2.TimeSeries{
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
Samples: samples,
Metadata: writev2.Metadata{
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
},
}

return st.Symbols(), series
}
34 changes: 34 additions & 0 deletions integration/e2ecortex/client.go
Loading
Loading