Add cleaner logic to clean partition compaction blocks and related files by alexqyle · Pull Request #6507 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions pkg/compactor/blocks_cleaner.go
96 changes: 90 additions & 6 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
)
Expand Down Expand Up @@ -86,8 +87,9 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

// Clean User with no error
cleaner.bucketClient = bkt
Expand Down Expand Up @@ -193,8 +195,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -354,8 +357,9 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -418,8 +422,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck

Expand Down Expand Up @@ -476,8 +481,9 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
require.NoError(t, err)
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
Expand Down Expand Up @@ -617,8 +623,9 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
Expand Down Expand Up @@ -811,6 +818,83 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
}
}

func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)

ts := func(hours int) int64 {
return time.Now().Add(time.Duration(hours)*time.Hour).Unix() * 1000
}

userID := "user-1"
partitionedGroupID := uint32(123)
partitionCount := 1
startTime := ts(-10)
endTime := ts(-8)
block1 := createTSDBBlock(t, bucketClient, userID, startTime, endTime, nil)

cfg := BlocksCleanerConfig{
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
ShardingStrategy: util.ShardingStrategyShuffle,
CompactionStrategy: util.CompactionStrategyPartitioning,
}

ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: blocksMarkedForDeletionName,
Help: blocksMarkedForDeletionHelp,
}, append(commonLabels, reasonLabelName))
dummyGaugeVec := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"test"})

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, 60*time.Second, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion, dummyGaugeVec)

userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)

partitionedGroupInfo := PartitionedGroupInfo{
PartitionedGroupID: partitionedGroupID,
PartitionCount: partitionCount,
Partitions: []Partition{
{
PartitionID: 0,
Blocks: []ulid.ULID{block1},
},
},
RangeStart: startTime,
RangeEnd: endTime,
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
Version: PartitionedGroupInfoVersion1,
}
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
require.NoError(t, err)

visitMarker := &partitionVisitMarker{
PartitionedGroupID: partitionedGroupID,
PartitionID: 0,
Status: Completed,
VisitTime: time.Now().Add(-2 * time.Minute).Unix(),
}
visitMarkerManager := NewVisitMarkerManager(userBucket, logger, "dummy-cleaner", visitMarker)
err = visitMarkerManager.updateVisitMarker(ctx)
require.NoError(t, err)

cleaner.cleanPartitionedGroupInfo(ctx, userBucket, logger, userID)

partitionedGroupFileExists, err := userBucket.Exists(ctx, GetPartitionedGroupFile(partitionedGroupID))
require.NoError(t, err)
require.False(t, partitionedGroupFileExists)

block1DeletionMarkerExists, err := userBucket.Exists(ctx, path.Join(block1.String(), metadata.DeletionMarkFilename))
require.NoError(t, err)
require.True(t, block1DeletionMarkerExists)

}

type mockConfigProvider struct {
userRetentionPeriods map[string]time.Duration
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/compactor/compactor.go
Loading