Add max tenant config to tenant federation by SungJin1212 · Pull Request #6493 · cortexproject/cortex · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ tenant_federation:
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

# A maximum number of tenants to query at once. 0 means no limit.
# CLI flag: -tenant-federation.max-tenant
[max_tenant: <int> | default = 0]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
// Wrap roundtripper into Tripperware.
roundTripper = t.QueryFrontendTripperware(roundTripper)

handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
handler := transport.NewHandler(t.Cfg.Frontend.Handler, t.Cfg.TenantFederation, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
t.API.RegisterQueryFrontendHandler(handler)

if frontendV1 != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -279,7 +280,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(config.Handler, rt, logger, nil)))
).Wrap(transport.NewHandler(config.Handler, tenantfederation.Config{}, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
28 changes: 21 additions & 7 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/status"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -33,6 +34,8 @@ const (
// StatusClientClosedRequest is the status code for when a client request cancellation of a http request
StatusClientClosedRequest = 499
ServiceTimingHeaderName = "Server-Timing"

errTooManyTenants = "too many tenants, max: %d, actual: %d"
)

var (
Expand Down Expand Up @@ -84,9 +87,10 @@ func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
// but all other logic is inside the RoundTripper.
type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
cfg HandlerConfig
tenantFederationCfg tenantfederation.Config
log log.Logger
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
Expand All @@ -101,11 +105,12 @@ type Handler struct {
}

// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
cfg: cfg,
tenantFederationCfg: tenantFederationCfg,
log: log,
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
Expand Down Expand Up @@ -185,6 +190,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}

if f.tenantFederationCfg.Enabled {
maxTenant := f.tenantFederationCfg.MaxTenant
if maxTenant > 0 && len(tenantIDs) > maxTenant {
http.Error(w, fmt.Errorf(errTooManyTenants, maxTenant, len(tenantIDs)).Error(), http.StatusBadRequest)
return
}
}

userID := tenant.JoinTenantIDs(tenantIDs)

// Initialise the stats in the context and make sure it's propagated
Expand Down
106 changes: 104 additions & 2 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/codes"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
util_api "github.com/cortexproject/cortex/pkg/util/api"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
Expand Down Expand Up @@ -178,6 +181,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
})
userID := "12345"
tenantFederationCfg := tenantfederation.Config{}
for _, tt := range []struct {
name string
cfg HandlerConfig
Expand Down Expand Up @@ -379,7 +383,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
handler := NewHandler(tt.cfg, tt.roundTripperFunc, log.NewNopLogger(), reg)
handler := NewHandler(tt.cfg, tenantFederationCfg, tt.roundTripperFunc, log.NewNopLogger(), reg)

ctx := user.InjectOrgID(context.Background(), userID)
req := httptest.NewRequest("GET", "/", nil)
Expand Down Expand Up @@ -413,7 +417,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
func TestReportQueryStatsFormat(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil)
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
userID := "fake"
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
resp := &http.Response{ContentLength: 1000}
Expand Down Expand Up @@ -506,3 +510,101 @@ func TestReportQueryStatsFormat(t *testing.T) {
})
}
}

func Test_TenantFederation_MaxTenant(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())

roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
})

tests := []struct {
name string
cfg tenantfederation.Config
orgId string
expectedStatusCode int
expectedErrMsg string
}{
{
name: "one tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "org1",
expectedStatusCode: http.StatusOK,
},
{
name: "less than max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 3,
},
orgId: "org1|org2",
expectedStatusCode: http.StatusOK,
},
{
name: "equal to max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 2,
},
orgId: "org1|org2",
expectedStatusCode: http.StatusOK,
},
{
name: "exceeds max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 2,
},
orgId: "org1|org2|org3",
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "too many tenants, max: 2, actual: 3",
},
{
name: "no org Id",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "",
expectedStatusCode: http.StatusUnauthorized,
expectedErrMsg: "no org id",
},
{
name: "no limit",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "org1|org2|org3",
expectedStatusCode: http.StatusOK,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
handler := NewHandler(HandlerConfig{}, test.cfg, roundTripper, log.NewNopLogger(), nil)
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)

req := httptest.NewRequest("GET", "http://fake", nil)
req.Header.Set("X-Scope-OrgId", test.orgId)
resp := httptest.NewRecorder()

handlerWithAuth.ServeHTTP(resp, req)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, test.expectedStatusCode, resp.Code)

if test.expectedErrMsg != "" {
require.Contains(t, string(body), test.expectedErrMsg)
}
})
}
}
5 changes: 4 additions & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -264,14 +265,16 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a

// Default HTTP handler config.
handlerCfg := transport.HandlerConfig{}
tenantFederationCfg := tenantfederation.Config{}

flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil)))
).Wrap(transport.NewHandler(handlerCfg, tenantFederationCfg, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/tenantfederation/tenant_federation.go