Fix cleaner not discovering deleted users from global dir by harry671003 · Pull Request #5691 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/compactor/compactor_test.go
3 changes: 3 additions & 0 deletions pkg/querier/blocks_finder_bucket_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestBucketScanBlocksFinder_InitialScanFailure(t *testing.T) {

// Mock the storage to simulate a failure when reading objects.
bucket.MockIter("", []string{"user-1"}, nil)
bucket.MockIter("__markers__", []string{}, nil)
bucket.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json"}, nil)
bucket.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
bucket.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyTenants(t *t
// Mock the bucket to introduce a 1s sleep while iterating each tenant in the bucket.
bucket := &bucket.ClientMock{}
bucket.MockIter("", tenantIDs, nil)
bucket.MockIter("__markers__", []string{}, nil)
for _, tenantID := range tenantIDs {
bucket.MockIterWithCallback(tenantID+"/", []string{}, nil, func() {
time.Sleep(time.Second)
Expand Down Expand Up @@ -177,6 +179,7 @@ func TestBucketScanBlocksFinder_StopWhileRunningTheInitialScanOnManyBlocks(t *te
// Mock the bucket to introduce a 1s sleep while syncing each block in the bucket.
bucket := &bucket.ClientMock{}
bucket.MockIter("", []string{"user-1"}, nil)
bucket.MockIter("__markers__", []string{}, nil)
bucket.MockIter("user-1/", blockPaths, nil)
bucket.On("Exists", mock.Anything, mock.Anything).Return(false, nil).Run(func(args mock.Arguments) {
// We return the meta.json doesn't exist, but introduce a 1s delay for each call.
Expand Down
36 changes: 23 additions & 13 deletions pkg/storage/tsdb/users_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,48 @@ func NewUsersScanner(bucketClient objstore.Bucket, isOwned func(userID string) (
//
// If sharding is enabled, returned lists contains only the users owned by this instance.
func (s *UsersScanner) ScanUsers(ctx context.Context) (users, markedForDeletion []string, err error) {
scannedUsers := make(map[string]struct{})

// Scan users in the bucket.
err = s.bucketClient.Iter(ctx, "", func(entry string) error {
users = append(users, strings.TrimSuffix(entry, "/"))
userID := strings.TrimSuffix(entry, "/")
scannedUsers[userID] = struct{}{}
return nil
})
if err != nil {
return nil, nil, err
}

// Check users for being owned by instance, and split users into non-deleted and deleted.
// We do these checks after listing all users, to improve cacheability of Iter (result is only cached at the end of Iter call).
for ix := 0; ix < len(users); {
userID := users[ix]
// Scan users from the __markers__ directory.
err = s.bucketClient.Iter(ctx, util.GlobalMarkersDir, func(entry string) error {
// entry will be of the form __markers__/<user>/
parts := strings.Split(entry, objstore.DirDelim)
userID := parts[1]
scannedUsers[userID] = struct{}{}
return nil
})
if err != nil {
return nil, nil, err
}

// Check if it's owned by this instance.
owned, err := s.isOwned(userID)
if err != nil {
for userID := range scannedUsers {
// Filter out users not owned by this instance.
if owned, err := s.isOwned(userID); err != nil {
level.Warn(s.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
} else if !owned {
users = append(users[:ix], users[ix+1:]...)
continue
}

deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID)
if err != nil {
// Filter users marked for deletion
if deletionMarkExists, err := TenantDeletionMarkExists(ctx, s.bucketClient, userID); err != nil {
level.Warn(s.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err)
} else if deletionMarkExists {
users = append(users[:ix], users[ix+1:]...)
markedForDeletion = append(markedForDeletion, userID)
continue
}

ix++
// The remaining are the active users owned by this instance.
users = append(users, userID)
}

return users, markedForDeletion, nil
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/tsdb/users_scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,40 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/cortexproject/cortex/pkg/storage/bucket"
)

func TestUsersScanner_ScanUsers_ShouldReturnedOwnedUsersOnly(t *testing.T) {
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{"user-1", "user-2", "user-3", "user-4"}, nil)
bucketClient.MockIter("", []string{"user-1/", "user-2/", "user-3/", "user-4/"}, nil)
bucketClient.MockIter("__markers__", []string{"__markers__/user-5/", "__markers__/user-6/", "__markers__/user-7/"}, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-3"), true, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), true, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-3"), false, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-7"), false, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-7"), true, nil)

isOwned := func(userID string) (bool, error) {
return userID == "user-1" || userID == "user-3", nil
return userID == "user-1" || userID == "user-3" || userID == "user-7", nil
}

s := NewUsersScanner(bucketClient, isOwned, log.NewNopLogger())
actual, deleted, err := s.ScanUsers(context.Background())
require.NoError(t, err)
assert.Equal(t, []string{"user-1"}, actual)
assert.Equal(t, []string{"user-3"}, deleted)

slices.Sort(deleted)
assert.Equal(t, []string{"user-3", "user-7"}, deleted)
}

func TestUsersScanner_ScanUsers_ShouldReturnUsersForWhichOwnerCheckOrTenantDeletionCheckFailed(t *testing.T) {
expected := []string{"user-1", "user-2"}

bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", expected, nil)
bucketClient.MockIter("__markers__", []string{}, nil)
bucketClient.MockExists(GetGlobalDeletionMarkPath("user-1"), false, nil)
bucketClient.MockExists(GetLocalDeletionMarkPath("user-1"), false, nil)

Expand Down
16 changes: 12 additions & 4 deletions tools/thanosconvert/thanosconvert_test.go