Cleanup meta sync files in compactor for un-owned users. by pstibrany · Pull Request #3851 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
73 changes: 69 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"flag"
"fmt"
"hash/fnv"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
"strings"
"time"

Expand Down Expand Up @@ -480,6 +483,8 @@ func (c *Compactor) compactUsers(ctx context.Context) {
users[i], users[j] = users[j], users[i]
})

// Keep track of users owned by this shard, so that we can delete the local files for all other users.
ownedUsers := map[string]struct{}{}
for _, userID := range users {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if ctx.Err() != nil {
Expand All @@ -498,6 +503,8 @@ func (c *Compactor) compactUsers(ctx context.Context) {
continue
}

ownedUsers[userID] = struct{}{}

if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {
c.compactionRunSkippedTenants.Inc()
level.Warn(c.logger).Log("msg", "unable to check if user is marked for deletion", "user", userID, "err", err)
Expand All @@ -520,6 +527,33 @@ func (c *Compactor) compactUsers(ctx context.Context) {
level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID)
}

// Delete local files for unowned tenants, if there are any. This cleans up
// leftover local files for tenants that belong to different compactors now,
// or have been deleted completely.
for userID := range c.listTenantsWithMetaSyncDirectories() {
if _, owned := ownedUsers[userID]; owned {
continue
}

dir := c.metaSyncDirForUser(userID)
s, err := os.Stat(dir)
if err != nil {
if !os.IsNotExist(err) {
level.Warn(c.logger).Log("msg", "failed to stat local directory with user data", "dir", dir, "err", err)
}
continue
}

if s.IsDir() {
err := os.RemoveAll(dir)
if err == nil {
level.Info(c.logger).Log("msg", "deleted directory for user not owned by this shard", "dir", dir)
} else {
level.Warn(c.logger).Log("msg", "failed to delete directory for user not owned by this shard", "dir", dir, "err", err)
}
}
}

succeeded = true
}

Expand Down Expand Up @@ -567,10 +601,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
ulogger,
c.compactorCfg.MetaSyncConcurrency,
bucket,
// The fetcher stores cached metas in the "meta-syncer/" sub directory,
// but we prefix it with "compactor-meta-" in order to guarantee no clashing with
// the directory used by the Thanos Syncer, whatever is the user ID.
path.Join(c.compactorCfg.DataDir, "compactor-meta-"+userID),
c.metaSyncDirForUser(userID),
reg,
// List of filters to apply (order matters).
[]block.MetadataFilter{
Expand Down Expand Up @@ -700,3 +731,37 @@ func isAllowedUser(enabledUsers, disabledUsers map[string]struct{}, userID strin

return true
}

const compactorMetaPrefix = "compactor-meta-"

// metaSyncDirForUser returns directory to store cached meta files.
// The fetcher stores cached metas in the "meta-syncer/" sub directory,
// but we prefix it with "compactor-meta-" in order to guarantee no clashing with
// the directory used by the Thanos Syncer, whatever is the user ID.
func (c *Compactor) metaSyncDirForUser(userID string) string {
return filepath.Join(c.compactorCfg.DataDir, compactorMetaPrefix+userID)
}

// This function returns tenants with meta sync directories found on local disk. On error, it returns nil map.
func (c *Compactor) listTenantsWithMetaSyncDirectories() map[string]struct{} {
result := map[string]struct{}{}

files, err := ioutil.ReadDir(c.compactorCfg.DataDir)
if err != nil {
return nil
}

for _, f := range files {
if !f.IsDir() {
continue
}

if !strings.HasPrefix(f.Name(), compactorMetaPrefix) {
continue
}

result[f.Name()[len(compactorMetaPrefix):]] = struct{}{}
}

return result
}
116 changes: 97 additions & 19 deletions pkg/compactor/compactor_test.go