Add percentage based sharding to rulers by wilguo · Pull Request #6680 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
5 changes: 3 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3635,9 +3635,10 @@ query_rejection:

# The default tenant's shard size when the shuffle-sharding strategy is used by
# ruler. When this setting is specified in the per-tenant overrides, a value of
# 0 disables shuffle sharding for the tenant.
# 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size
# will be a percentage of the total rulers.
# CLI flag: -ruler.tenant-shard-size
[ruler_tenant_shard_size: <int> | default = 0]
[ruler_tenant_shard_size: <float> | default = 0]
Comment thread
wilguo marked this conversation as resolved.

# Maximum number of rules per rule group per-tenant. 0 to disable.
# CLI flag: -ruler.max-rules-per-rule-group
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender {
// RulesLimits defines limits used by Ruler.
type RulesLimits interface {
MaxQueryLength(userID string) time.Duration
RulerTenantShardSize(userID string) int
RulerTenantShardSize(userID string) float64
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
RulerQueryOffset(userID string) time.Duration
Expand Down
11 changes: 9 additions & 2 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
userRings := map[string]ring.ReadRing{}
for _, u := range users {
if shardSize := r.limits.RulerTenantShardSize(u); shardSize > 0 {
subRing := r.ring.ShuffleShard(u, shardSize)
subRing := r.ring.ShuffleShard(u, r.getShardSizeForUser(u))

// Include the user only if it belongs to this ruler shard.
if subRing.HasInstance(r.lifecycler.GetInstanceID()) {
Expand Down Expand Up @@ -1325,11 +1325,18 @@ func (r *Ruler) ruleGroupListToGroupStateDesc(userID string, backupGroups rulesp
return groupDescs, nil
}

func (r *Ruler) getShardSizeForUser(userID string) int {
newShardSize := util.DynamicShardSize(r.limits.RulerTenantShardSize(userID), r.ring.InstancesCount())

// We want to guarantee that shard size will be at least replication factor
return max(newShardSize, r.cfg.Ring.ReplicationFactor)
}

func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest RulesRequest) (*RulesResponse, error) {
ring := ring.ReadRing(r.ring)

if shardSize := r.limits.RulerTenantShardSize(userID); shardSize > 0 && r.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
ring = r.ring.ShuffleShard(userID, shardSize)
ring = r.ring.ShuffleShard(userID, r.getShardSizeForUser(userID))
}

rulers, failedZones, err := GetReplicationSetForListRule(ring, &r.cfg.Ring)
Expand Down
155 changes: 151 additions & 4 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func defaultRulerConfig(t testing.TB) Config {

type ruleLimits struct {
mtx sync.RWMutex
tenantShard int
tenantShard float64
maxRulesPerRuleGroup int
maxRuleGroups int
disabledRuleGroups validation.DisabledRuleGroups
Expand All @@ -102,7 +102,7 @@ func (r *ruleLimits) setRulerExternalLabels(lset labels.Labels) {
r.mtx.Unlock()
}

func (r *ruleLimits) RulerTenantShardSize(_ string) int {
func (r *ruleLimits) RulerTenantShardSize(_ string) float64 {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.tenantShard
Expand Down Expand Up @@ -630,7 +630,7 @@ func TestGetRules(t *testing.T) {
type testCase struct {
sharding bool
shardingStrategy string
shuffleShardSize int
shuffleShardSize float64
rulesRequest RulesRequest
expectedCount map[string]int
expectedClientCallCount int
Expand Down Expand Up @@ -1887,7 +1887,7 @@ func TestSharding(t *testing.T) {
sharding bool
shardingStrategy string
replicationFactor int
shuffleShardSize int
shuffleShardSize float64
setupRing func(*ring.Desc)
enabledUsers []string
disabledUsers []string
Expand Down Expand Up @@ -3104,3 +3104,150 @@ func TestRuler_QueryOffset(t *testing.T) {
gotOffset = rg.GetGroup().QueryOffset
require.Equal(t, time.Minute*2, *gotOffset)
}

func TestGetShardSizeForUser(t *testing.T) {
tests := []struct {
name string
userID string
replicationFactor int
rulerInstanceCount int
tenantShardSize float64
expectedShardSize int
}{
{
name: "User with fixed shard size with 10 ruler instances",
userID: "user1",
rulerInstanceCount: 10,
replicationFactor: 1,
tenantShardSize: 2,
expectedShardSize: 2,
},
{
name: "User with fixed shard size with 50 ruler instances",
userID: "user1",
rulerInstanceCount: 50,
replicationFactor: 1,
tenantShardSize: 30,
expectedShardSize: 30,
},
{
name: "User with percentage shard size with 10 ruler instances",
userID: "user1",
rulerInstanceCount: 10,
replicationFactor: 1,
tenantShardSize: 0.6,
expectedShardSize: 6,
},
{
name: "User with percentage shard size with 80 ruler instances",
userID: "user1",
rulerInstanceCount: 80,
replicationFactor: 1,
tenantShardSize: 0.25,
expectedShardSize: 20,
},
{
name: "Ensure shard size is at least replication factor",
userID: "user1",
rulerInstanceCount: 10,
replicationFactor: 3,
tenantShardSize: 0.1,
expectedShardSize: 3,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

rulerStateMap := make(map[string]ring.InstanceState)
rulerAZEvenSpread := make(map[string]string)
rulerIDs := make([]string, tc.rulerInstanceCount)

for i := 0; i < tc.rulerInstanceCount; i++ {
rulerID := fmt.Sprintf("ruler%d", i+1)
rulerIDs[i] = rulerID
rulerStateMap[rulerID] = ring.ACTIVE
rulerAZEvenSpread[rulerID] = string(rune('a' + i%3))
}

kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) })
allRulesByUser := map[string]rulespb.RuleGroupList{}
allTokensByRuler := map[string][]uint32{}
rulerAddrMap := map[string]*Ruler{}

createRuler := func(id string) *Ruler {
store := newMockRuleStore(allRulesByUser, nil)
cfg := defaultRulerConfig(t)

cfg.ShardingStrategy = util.ShardingStrategyShuffle
cfg.EnableSharding = true
cfg.EnableHAEvaluation = false
cfg.EvaluationInterval = 5 * time.Minute

cfg.Ring = RingConfig{
InstanceID: id,
InstanceAddr: id,
KVStore: kv.Config{
Mock: kvStore,
},
ReplicationFactor: tc.replicationFactor,
ZoneAwarenessEnabled: true,
InstanceZone: rulerAZEvenSpread[id],
}

r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap)
r.limits = &ruleLimits{tenantShard: tc.tenantShardSize}
rulerAddrMap[id] = r
if r.ring != nil {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring))
t.Cleanup(r.ring.StopAsync)
}
return r
}

var testRuler *Ruler
// Create rulers and ensure they join the ring
for _, rID := range rulerIDs {
r := createRuler(rID)
testRuler = r
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.lifecycler))
}

err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) {
d, _ := in.(*ring.Desc)
if d == nil {
d = ring.NewDesc()
}
for rID, tokens := range allTokensByRuler {
d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now())
}
return d, true, nil
})
require.NoError(t, err)
// Wait a bit to make sure ruler's ring is updated.
time.Sleep(100 * time.Millisecond)

// Check the ring state
ringDesc, err := kvStore.Get(context.Background(), ringKey)
require.NoError(t, err)
require.NotNil(t, ringDesc)
desc := ringDesc.(*ring.Desc)
require.Equal(t, tc.rulerInstanceCount, len(desc.Ingesters))

forEachRuler := func(f func(rID string, r *Ruler)) {
for rID, r := range rulerAddrMap {
f(rID, r)
}
}

// Sync Rules
forEachRuler(func(_ string, r *Ruler) {
r.syncRules(context.Background(), rulerSyncReasonInitial)
})

result := testRuler.getShardSizeForUser(tc.userID)
assert.Equal(t, tc.expectedShardSize, result)
})
}
}
6 changes: 3 additions & 3 deletions pkg/util/validation/limits.go