Skip to content

Commit

Permalink
[APM] Fix max bucket error in Dependencies pages (elastic#173083)
Browse files Browse the repository at this point in the history
fixes: elastic#161239
## Summary

This PR changes how dependencies data is fetched. To mitigate the max
bucket error risk, and, at the same time, keep the compatibility with
the dependencies-related pages that consume the `get_connection_stats`
query, it now paginates the composite aggregation, using smaller batches
of 1k items per pagination.

**10k dependencies**
<img width="826" alt="image"
src="https://github.com/elastic/kibana/assets/2767137/fb277dd9-0a09-48ef-9e3e-fc1d5a4445e8">


**500 dependencies per service**
<img width="821" alt="image"
src="https://github.com/elastic/kibana/assets/2767137/cf8be999-2fa6-44ee-8838-c255fdb9896d">

**Notes:**
Fetching 1k on each iteration might make the page slower;
The max bucket error might still happen depending on the date range or
how services are instrumented.


### How to test
1. Run synthtrace
```
 node scripts/synthtrace service_many_dependencies.ts --from=now-15m --to=now --clean
```
2. Navigate to the Dependencies Inventory page
3. Navigate to a service overview and then to the dependency tab

---------

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
crespocarlos and kibanamachine authored Jan 5, 2024
1 parent 42272f9 commit 403a8d0
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { ApmFields, Instance } from '@kbn/apm-synthtrace-client';
import { service } from '@kbn/apm-synthtrace-client/src/lib/apm/service';
import { Scenario } from '../cli/scenario';
import { RunOptions } from '../cli/utils/parse_run_cli_flags';
import { getSynthtraceEnvironment } from '../lib/utils/get_synthtrace_environment';
import { withClient } from '../lib/utils/with_client';

const ENVIRONMENT = getSynthtraceEnvironment(__filename);
const MAX_DEPENDENCIES = 10000;
const MAX_DEPENDENCIES_PER_SERVICE = 500;
const MAX_SERVICES = 20;

const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
return {
generate: ({ range, clients: { apmEsClient } }) => {
const javaInstances = Array.from({ length: MAX_SERVICES }).map((_, index) =>
service(`opbeans-java-${index}`, ENVIRONMENT, 'java').instance(`java-instance-${index}`)
);

const instanceDependencies = (instance: Instance, startIndex: number) => {
const rate = range.ratePerMinute(60);

return rate.generator((timestamp, index) => {
const currentIndex = index % MAX_DEPENDENCIES_PER_SERVICE;
const destination = (startIndex + currentIndex) % MAX_DEPENDENCIES;

const span = instance
.transaction({ transactionName: 'GET /java' })
.timestamp(timestamp)
.duration(400)
.success()
.children(
instance
.span({
spanName: 'GET apm-*/_search',
spanType: 'db',
spanSubtype: 'elasticsearch',
})
.destination(`elasticsearch/${destination}`)
.timestamp(timestamp)
.duration(200)
.success()
);

return span;
});
};

return withClient(
apmEsClient,
javaInstances.map((instance, index) =>
instanceDependencies(instance, (index * MAX_DEPENDENCIES_PER_SERVICE) % MAX_DEPENDENCIES)
)
);
},
};
};

export default scenario;
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { synthtrace } from '../../synthtrace';
import { opbeans } from '../fixtures/synthtrace/opbeans';
import { checkA11y } from '../support/commands';
import { synthtrace } from '../../../synthtrace';
import { opbeans } from '../../fixtures/synthtrace/opbeans';
import { checkA11y } from '../../support/commands';
import { generateManyDependencies } from './generate_many_dependencies';

const start = '2021-10-10T00:00:00.000Z';
const end = '2021-10-10T00:15:00.000Z';
Expand Down Expand Up @@ -120,3 +121,46 @@ describe('Dependencies', () => {
});
});
});

describe('Dependencies with high volume of data', () => {
before(() => {
synthtrace.index(
generateManyDependencies({
from: new Date(start).getTime(),
to: new Date(end).getTime(),
})
);
});

after(() => {
synthtrace.clean();
});

beforeEach(() => {
cy.loginAsViewerUser();
});

it('shows dependencies inventory page', () => {
cy.visitKibana(
`/app/apm/dependencies/inventory?${new URLSearchParams({
...timeRange,
kuery: 'elasticsearch*',
})}`
);

cy.getByTestSubj('dependenciesTable');
cy.contains('nav', 'Page 1 of 60');
});

it('shows service dependencies', () => {
cy.visitKibana(
`/app/apm/services/synth-java-0/dependencies?${new URLSearchParams({
...timeRange,
})}`
);

cy.getByTestSubj('serviceDependenciesBreakdownChart').get('canvas');
cy.getByTestSubj('dependenciesTable');
cy.contains('nav', 'Page 1 of 100');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { apm, Instance, timerange } from '@kbn/apm-synthtrace-client';

const MAX_DEPENDENCIES = 10000;
const MAX_DEPENDENCIES_PER_SERVICE = 500;
const MAX_SERVICES = 20;

export function generateManyDependencies({
from,
to,
}: {
from: number;
to: number;
}) {
const instances = Array.from({ length: MAX_SERVICES }).map((_, index) =>
apm
.service({
name: `synth-java-${index}`,
environment: 'production',
agentName: 'java',
})
.instance(`java-instance-${index}`)
);

const instanceDependencies = (instance: Instance, startIndex: number) => {
return Array.from(
timerange(new Date(from), new Date(to))
.interval('1m')
.rate(60)
.generator((timestamp, index) => {
const currentIndex = index % MAX_DEPENDENCIES_PER_SERVICE;
const destination = (startIndex + currentIndex) % MAX_DEPENDENCIES;

const span = instance
.transaction({ transactionName: 'GET /java' })
.timestamp(timestamp)
.duration(400)
.success()
.children(
instance
.span({
spanName: 'GET apm-*/_search',
spanType: 'db',
spanSubtype: 'elasticsearch',
})
.destination(`elasticsearch/${destination}`)
.timestamp(timestamp)
.duration(200)
.success()
);

return span;
})
);
};

return instances.flatMap((instance, index) =>
instanceDependencies(
instance,
(index * MAX_DEPENDENCIES_PER_SERVICE) % MAX_DEPENDENCIES
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { excludeRumExitSpansQuery } from '../exclude_rum_exit_spans_query';
import { APMEventClient } from '../../helpers/create_es_client/create_apm_event_client';
import { getDocumentTypeFilterForServiceDestinationStatistics } from '../../helpers/spans/get_is_using_service_destination_metrics';

const MAX_ITEMS = 1500;
export const getStats = async ({
apmEventClient,
start,
Expand All @@ -54,7 +55,85 @@ export const getStats = async ({
offset,
});

const response = await apmEventClient.search('get_connection_stats', {
const response = await getConnectionStats({
apmEventClient,
startWithOffset,
endWithOffset,
filter,
numBuckets,
});

return (
response.aggregations?.connections.buckets.map((bucket) => {
const sample = bucket.sample.top[0].metrics;
const serviceName = bucket.key.serviceName as string;
const dependencyName = bucket.key.dependencyName as string;

return {
from: {
id: objectHash({ serviceName }),
serviceName,
environment: (sample[SERVICE_ENVIRONMENT] ||
ENVIRONMENT_NOT_DEFINED.value) as string,
agentName: sample[AGENT_NAME] as AgentName,
type: NodeType.service as const,
},
to: {
id: objectHash({ dependencyName }),
dependencyName,
spanType: sample[SPAN_TYPE] as string,
spanSubtype: (sample[SPAN_SUBTYPE] || '') as string,
type: NodeType.dependency as const,
},
value: {
count: sum(
bucket.timeseries.buckets.map(
(dateBucket) => dateBucket.count.value ?? 0
)
),
latency_sum: sum(
bucket.timeseries.buckets.map(
(dateBucket) => dateBucket.latency_sum.value ?? 0
)
),
error_count: sum(
bucket.timeseries.buckets.flatMap(
(dateBucket) =>
dateBucket[EVENT_OUTCOME].buckets.find(
(outcomeBucket) => outcomeBucket.key === EventOutcome.failure
)?.count.value ?? 0
)
),
},
timeseries: bucket.timeseries.buckets.map((dateBucket) => ({
x: dateBucket.key + offsetInMs,
count: dateBucket.count.value ?? 0,
latency_sum: dateBucket.latency_sum.value ?? 0,
error_count:
dateBucket[EVENT_OUTCOME].buckets.find(
(outcomeBucket) => outcomeBucket.key === EventOutcome.failure
)?.count.value ?? 0,
})),
};
}) ?? []
);
};

async function getConnectionStats({
apmEventClient,
startWithOffset,
endWithOffset,
filter,
numBuckets,
}: {
apmEventClient: APMEventClient;
startWithOffset: number;
endWithOffset: number;
filter: QueryDslQueryContainer[];
numBuckets: number;
after?: { serviceName: string | number; dependencyName: string | number };
}) {
return apmEventClient.search('get_connection_stats', {
apm: {
sources: [
{
Expand All @@ -79,7 +158,7 @@ export const getStats = async ({
aggs: {
connections: {
composite: {
size: 10000,
size: MAX_ITEMS,
sources: asMutableArray([
{
serviceName: {
Expand Down Expand Up @@ -174,59 +253,4 @@ export const getStats = async ({
},
},
});

return (
response.aggregations?.connections.buckets.map((bucket) => {
const sample = bucket.sample.top[0].metrics;
const serviceName = bucket.key.serviceName as string;
const dependencyName = bucket.key.dependencyName as string;

return {
from: {
id: objectHash({ serviceName }),
serviceName,
environment: (sample[SERVICE_ENVIRONMENT] ||
ENVIRONMENT_NOT_DEFINED.value) as string,
agentName: sample[AGENT_NAME] as AgentName,
type: NodeType.service as const,
},
to: {
id: objectHash({ dependencyName }),
dependencyName,
spanType: sample[SPAN_TYPE] as string,
spanSubtype: (sample[SPAN_SUBTYPE] || '') as string,
type: NodeType.dependency as const,
},
value: {
count: sum(
bucket.timeseries.buckets.map(
(dateBucket) => dateBucket.count.value ?? 0
)
),
latency_sum: sum(
bucket.timeseries.buckets.map(
(dateBucket) => dateBucket.latency_sum.value ?? 0
)
),
error_count: sum(
bucket.timeseries.buckets.flatMap(
(dateBucket) =>
dateBucket[EVENT_OUTCOME].buckets.find(
(outcomeBucket) => outcomeBucket.key === EventOutcome.failure
)?.count.value ?? 0
)
),
},
timeseries: bucket.timeseries.buckets.map((dateBucket) => ({
x: dateBucket.key + offsetInMs,
count: dateBucket.count.value ?? 0,
latency_sum: dateBucket.latency_sum.value ?? 0,
error_count:
dateBucket[EVENT_OUTCOME].buckets.find(
(outcomeBucket) => outcomeBucket.key === EventOutcome.failure
)?.count.value ?? 0,
})),
};
}) ?? []
);
};
}

0 comments on commit 403a8d0

Please sign in to comment.