Skip to content

Commit

Permalink
Addressing scope of tenant prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewjstanford committed Jan 24, 2024
1 parent ef23a6e commit 2e8c2fa
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 25 deletions.
25 changes: 14 additions & 11 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
return
}

tenantPrefix := p.cfg.Tenant.Prefix

if p.cfg.Tenant.PrefixPreferSource {
sourceTenantPrefix := string(ctx.Request.Header.Peek(p.cfg.Tenant.Header))
if sourceTenantPrefix != "" {
p.cfg.Tenant.Prefix = sourceTenantPrefix + "-"
tenantPrefix = sourceTenantPrefix + "-"
}
}

Expand All @@ -184,7 +186,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) {
// If there's metadata - just accept the request and drop it
if len(wrReqIn.Metadata) > 0 {
if p.cfg.Metadata && p.cfg.Tenant.Default != "" {
r := p.send(clientIP, reqID, p.cfg.Tenant.Default, wrReqIn)
r := p.send(clientIP, reqID, tenantPrefix, p.cfg.Tenant.Default, wrReqIn)
if r.err != nil {
ctx.Error(err.Error(), fh.StatusInternalServerError)
p.Errorf("src=%s req_id=%s: unable to proxy metadata: %s", clientIP, reqID, r.err)
Expand All @@ -209,7 +211,7 @@ func (p *processor) handle(ctx *fh.RequestCtx) {

metricTenant := ""
var errs *me.Error
results := p.dispatch(clientIP, reqID, m)
results := p.dispatch(clientIP, reqID, tenantPrefix, m)

code, body := 0, []byte("Ok")

Expand Down Expand Up @@ -311,20 +313,20 @@ func (p *processor) marshal(wr *prompb.WriteRequest) (bufOut []byte, err error)
return snappy.Encode(nil, b), nil
}

func (p *processor) dispatch(clientIP net.Addr, reqID uuid.UUID, m map[string]*prompb.WriteRequest) (res []result) {
func (p *processor) dispatch(clientIP net.Addr, reqID uuid.UUID, tenantPrefix string, m map[string]*prompb.WriteRequest) (res []result) {
var wg sync.WaitGroup
res = make([]result, len(m))

i := 0
for tenant, wrReq := range m {
wg.Add(1)

go func(idx int, tenant string, wrReq *prompb.WriteRequest) {
go func(idx int, tenantPrefix string, tenant string, wrReq *prompb.WriteRequest) {
defer wg.Done()

r := p.send(clientIP, reqID, tenant, wrReq)
r := p.send(clientIP, reqID, tenantPrefix, tenant, wrReq)
res[idx] = r
}(i, tenant, wrReq)
}(i, tenantPrefix, tenant, wrReq)

i++
}
Expand Down Expand Up @@ -367,7 +369,7 @@ func (p *processor) processTimeseries(ts *prompb.TimeSeries) (tenant string, err
return
}

func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *prompb.WriteRequest) (r result) {
func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenantPrefix string, tenant string, wr *prompb.WriteRequest) (r result) {
start := time.Now()
r.tenant = tenant

Expand All @@ -385,6 +387,10 @@ func (p *processor) send(clientIP net.Addr, reqID uuid.UUID, tenant string, wr *
return
}

if tenantPrefix != "" {
tenant = tenantPrefix + tenant
}

p.fillRequestHeaders(clientIP, reqID, tenant, req)

if p.auth.egressHeader != nil {
Expand Down Expand Up @@ -415,9 +421,6 @@ func (p *processor) fillRequestHeaders(
req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
req.Header.Set("X-Cortex-Tenant-Client", clientIP.String())
req.Header.Set("X-Cortex-Tenant-ReqID", reqID.String())
if p.cfg.Tenant.Prefix != "" {
tenant = p.cfg.Tenant.Prefix + tenant
}
req.Header.Set(p.cfg.Tenant.Header, tenant)
}

Expand Down
14 changes: 0 additions & 14 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,20 +232,6 @@ func Test_request_headers(t *testing.T) {
assert.Equal(t, "my-tenant", string(req.Header.Peek("X-Scope-OrgID")))
}

func Test_request_headers_with_prefix(t *testing.T) {
cfg, err := getConfig(testConfigWithValues)
assert.Nil(t, err)

p := newProcessor(*cfg)

req := fh.AcquireRequest()
clientIP, _ := net.ResolveIPAddr("ip", "1.1.1.1")
reqID, _ := uuid.NewRandom()
p.fillRequestHeaders(clientIP, reqID, "my-tenant", req)

assert.Equal(t, "foobar-my-tenant", string(req.Header.Peek("X-Scope-OrgID")))
}

func Test_handle(t *testing.T) {
cfg, err := getConfig(testConfig)
assert.Nil(t, err)
Expand Down

0 comments on commit 2e8c2fa

Please sign in to comment.