-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[APM] Improve logic to determine when to use the transaction.duration.summary field #171315
Changes from 32 commits
0de5219
6a53f6d
c7f7683
ec4177d
9a1d36f
83e1c55
0e329c9
0351b2f
618cddc
0416fe9
72782f3
bb2d2cd
a109347
72f76b0
ea01c51
cf8a8f7
28a82ee
3c89c0f
1710bf6
aa2e343
7a80eb7
196724c
c5addd4
f89d228
0709537
0ee3106
22d3e61
ed45132
6b3f0bf
33efa72
6d5eed5
f207b54
5897ee7
ecfa42c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
import { ApmFields, apm } from '@kbn/apm-synthtrace-client'; | ||
import { random } from 'lodash'; | ||
import { pipeline, Readable } from 'stream'; | ||
import semver from 'semver'; | ||
import { Scenario } from '../cli/scenario'; | ||
import { | ||
addObserverVersionTransform, | ||
deleteSummaryFieldTransform, | ||
} from '../lib/utils/transform_helpers'; | ||
import { withClient } from '../lib/utils/with_client'; | ||
|
||
const scenario: Scenario<ApmFields> = async ({ logger, versionOverride }) => { | ||
const version = versionOverride as string; | ||
const isLegacy = versionOverride && semver.lt(version, '8.7.0'); | ||
return { | ||
bootstrap: async ({ apmEsClient }) => { | ||
if (isLegacy) { | ||
apmEsClient.pipeline((base: Readable) => { | ||
const defaultPipeline = apmEsClient.getDefaultPipeline()( | ||
base | ||
) as unknown as NodeJS.ReadableStream; | ||
|
||
return pipeline( | ||
defaultPipeline, | ||
addObserverVersionTransform(version), | ||
deleteSummaryFieldTransform(), | ||
(err) => { | ||
if (err) { | ||
logger.error(err); | ||
} | ||
} | ||
); | ||
}); | ||
} | ||
}, | ||
generate: ({ range, clients: { apmEsClient } }) => { | ||
const successfulTimestamps = range.ratePerMinute(6); | ||
const instance = apm | ||
.service({ | ||
name: `java${isLegacy ? '-legacy' : ''}`, | ||
environment: 'production', | ||
agentName: 'java', | ||
}) | ||
.instance(`instance`); | ||
|
||
return withClient( | ||
apmEsClient, | ||
successfulTimestamps.generator((timestamp) => { | ||
const randomHigh = random(1000, 4000); | ||
const randomLow = random(100, randomHigh / 5); | ||
const duration = random(randomLow, randomHigh); | ||
return instance | ||
.transaction({ transactionName: 'GET /order/{id}' }) | ||
.timestamp(timestamp) | ||
.duration(duration) | ||
.success(); | ||
}) | ||
); | ||
}, | ||
}; | ||
}; | ||
|
||
export default scenario; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,22 @@ import { ApmDocumentType } from '../../../common/document_type'; | |
import { RollupInterval } from '../../../common/rollup'; | ||
import { APMEventClient } from './create_es_client/create_apm_event_client'; | ||
import { getConfigForDocumentType } from './create_es_client/document_type'; | ||
import { TRANSACTION_DURATION_SUMMARY } from '../../../common/es_fields/apm'; | ||
import { TimeRangeMetadata } from '../../../common/time_range_metadata'; | ||
import { getDurationLegacyFilter } from './transactions'; | ||
|
||
const QUERY_INDEX = { | ||
BEFORE: 0, | ||
CURRENT: 1, | ||
DURATION_SUMMARY: 2, | ||
} as const; | ||
|
||
interface DocumentTypeData { | ||
documentType: ApmDocumentType; | ||
rollupInterval: RollupInterval; | ||
hasDocBefore: boolean; | ||
hasDocAfter: boolean; | ||
hasDurationSummary: boolean; | ||
} | ||
|
||
const getRequest = ({ | ||
documentType, | ||
|
@@ -64,170 +78,169 @@ export async function getDocumentSources({ | |
kuery: string; | ||
enableServiceTransactionMetrics: boolean; | ||
enableContinuousRollups: boolean; | ||
}) { | ||
const currentRange = rangeQuery(start, end); | ||
const diff = end - start; | ||
const kql = kqlQuery(kuery); | ||
const beforeRange = rangeQuery(start - diff, end - diff); | ||
|
||
const sourcesToCheck = [ | ||
}): Promise<TimeRangeMetadata['sources']> { | ||
const documentTypesToCheck = [ | ||
...(enableServiceTransactionMetrics | ||
? [ApmDocumentType.ServiceTransactionMetric as const] | ||
: []), | ||
ApmDocumentType.TransactionMetric as const, | ||
].flatMap((documentType) => { | ||
const docTypeConfig = getConfigForDocumentType(documentType); | ||
|
||
return ( | ||
enableContinuousRollups | ||
? docTypeConfig.rollupIntervals | ||
: [RollupInterval.OneMinute] | ||
).flatMap((rollupInterval) => { | ||
return { | ||
documentType, | ||
rollupInterval, | ||
meta: { | ||
checkSummaryFieldExists: false, | ||
}, | ||
before: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...beforeRange], | ||
}), | ||
current: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...currentRange], | ||
}), | ||
}; | ||
}); | ||
]; | ||
|
||
const documentTypesInfo = await getDocumentTypesInfo({ | ||
apmEventClient, | ||
start, | ||
end, | ||
kuery, | ||
enableContinuousRollups, | ||
documentTypesToCheck, | ||
}); | ||
|
||
const sourcesToCheckWithSummary = [ | ||
ApmDocumentType.TransactionMetric as const, | ||
].flatMap((documentType) => { | ||
const docTypeConfig = getConfigForDocumentType(documentType); | ||
|
||
return ( | ||
enableContinuousRollups | ||
? docTypeConfig.rollupIntervals | ||
: [RollupInterval.OneMinute] | ||
).flatMap((rollupInterval) => { | ||
const summaryExistsFilter = { | ||
bool: { | ||
filter: [ | ||
{ | ||
exists: { | ||
field: TRANSACTION_DURATION_SUMMARY, | ||
}, | ||
}, | ||
], | ||
}, | ||
}; | ||
const hasAnySourceDocBefore = documentTypesInfo.some( | ||
(source) => source.hasDocBefore | ||
); | ||
|
||
return { | ||
documentType, | ||
rollupInterval, | ||
meta: { | ||
checkSummaryFieldExists: true, | ||
}, | ||
before: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...beforeRange, summaryExistsFilter], | ||
}), | ||
current: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...currentRange, summaryExistsFilter], | ||
}), | ||
}; | ||
}); | ||
return [ | ||
...mapToSources(documentTypesInfo, hasAnySourceDocBefore), | ||
{ | ||
documentType: ApmDocumentType.TransactionEvent, | ||
rollupInterval: RollupInterval.None, | ||
hasDocs: true, | ||
hasDurationSummaryField: false, | ||
}, | ||
]; | ||
} | ||
|
||
const getDocumentTypesInfo = async ({ | ||
apmEventClient, | ||
start, | ||
end, | ||
kuery, | ||
enableContinuousRollups, | ||
documentTypesToCheck, | ||
}: { | ||
apmEventClient: APMEventClient; | ||
start: number; | ||
end: number; | ||
kuery: string; | ||
enableContinuousRollups: boolean; | ||
documentTypesToCheck: ApmDocumentType[]; | ||
}) => { | ||
const getRequests = getDocumentTypeRequestsFn({ | ||
enableContinuousRollups, | ||
start, | ||
end, | ||
kuery, | ||
}); | ||
|
||
const allSourcesToCheck = [...sourcesToCheck, ...sourcesToCheckWithSummary]; | ||
const sourceRequests = documentTypesToCheck.flatMap(getRequests); | ||
|
||
const allSearches = allSourcesToCheck.flatMap(({ before, current }) => [ | ||
before, | ||
current, | ||
]); | ||
const allSearches = sourceRequests | ||
.flatMap(({ before, current, durationSummaryCheck }) => [ | ||
before, | ||
current, | ||
durationSummaryCheck, | ||
]) | ||
.filter( | ||
(request): request is ReturnType<typeof getRequest> => | ||
request !== undefined | ||
); | ||
|
||
const allResponses = ( | ||
await apmEventClient.msearch('get_document_availability', ...allSearches) | ||
).responses; | ||
|
||
const checkedSources = allSourcesToCheck.map((source, index) => { | ||
const { documentType, rollupInterval } = source; | ||
const responseBefore = allResponses[index * 2]; | ||
const responseAfter = allResponses[index * 2 + 1]; | ||
|
||
const hasDocBefore = responseBefore.hits.total.value > 0; | ||
const hasDocAfter = responseAfter.hits.total.value > 0; | ||
return sourceRequests.map(({ documentType, rollupInterval, ...queries }) => { | ||
const numberOfQueries = Object.values(queries).filter(Boolean).length; | ||
// allResponses is sorted by the order of the requests in sourceRequests | ||
const docTypeResponses = allResponses.splice(0, numberOfQueries); | ||
|
||
return { | ||
documentType, | ||
rollupInterval, | ||
hasDocBefore, | ||
hasDocAfter, | ||
checkSummaryFieldExists: source.meta.checkSummaryFieldExists, | ||
hasDocBefore: docTypeResponses[QUERY_INDEX.BEFORE].hits.total.value > 0, | ||
hasDocAfter: docTypeResponses[QUERY_INDEX.CURRENT].hits.total.value > 0, | ||
hasDurationSummary: docTypeResponses[QUERY_INDEX.DURATION_SUMMARY] | ||
? docTypeResponses[QUERY_INDEX.DURATION_SUMMARY].hits.total.value === 0 | ||
: true, | ||
}; | ||
}); | ||
}; | ||
|
||
const hasAnySourceDocBefore = checkedSources.some( | ||
(source) => source.hasDocBefore | ||
); | ||
const getDocumentTypeRequestsFn = | ||
({ | ||
enableContinuousRollups, | ||
start, | ||
end, | ||
kuery, | ||
}: { | ||
enableContinuousRollups: boolean; | ||
start: number; | ||
end: number; | ||
kuery: string; | ||
}) => | ||
(documentType: ApmDocumentType) => { | ||
const currentRange = rangeQuery(start, end); | ||
const diff = end - start; | ||
const kql = kqlQuery(kuery); | ||
const beforeRange = rangeQuery(start - diff, end - diff); | ||
|
||
const rollupIntervals = enableContinuousRollups | ||
? getConfigForDocumentType(documentType).rollupIntervals | ||
: [RollupInterval.OneMinute]; | ||
|
||
return rollupIntervals.map((rollupInterval) => ({ | ||
documentType, | ||
rollupInterval, | ||
before: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...beforeRange], | ||
}), | ||
current: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...currentRange], | ||
}), | ||
...(documentType !== ApmDocumentType.ServiceTransactionMetric | ||
? { | ||
durationSummaryCheck: getRequest({ | ||
documentType, | ||
rollupInterval, | ||
filters: [...kql, ...currentRange, getDurationLegacyFilter()], | ||
}), | ||
} | ||
: {}), | ||
})); | ||
Comment on lines
+204
to
+213
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we not check both before/after here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to - but I might be wrong. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you're right! Because the check is inversed (which is not possible with document sources), checking the current time range is enough. |
||
}; | ||
|
||
const sourcesWithHasDocs = checkedSources.map((checkedSource) => { | ||
const mapToSources = ( | ||
sources: DocumentTypeData[], | ||
hasAnySourceDocBefore: boolean | ||
) => { | ||
return sources.map((source) => { | ||
const { | ||
documentType, | ||
hasDocAfter, | ||
hasDocBefore, | ||
rollupInterval, | ||
checkSummaryFieldExists, | ||
} = checkedSource; | ||
hasDurationSummary, | ||
} = source; | ||
|
||
const hasDocBeforeOrAfter = hasDocBefore || hasDocAfter; | ||
|
||
// If there is any data before, we require that data is available before | ||
// this time range to mark this source as available. If we don't do that, | ||
// users that upgrade to a version that starts generating service tx metrics | ||
// will see a mostly empty screen for a while after upgrading. | ||
// If we only check before, users with a new deployment will use raw transaction | ||
// events. | ||
const hasDocs = hasAnySourceDocBefore ? hasDocBefore : hasDocBeforeOrAfter; | ||
|
||
return { | ||
documentType, | ||
rollupInterval, | ||
checkSummaryFieldExists, | ||
hasDocs, | ||
hasDurationSummaryField: hasDurationSummary, | ||
}; | ||
}); | ||
|
||
const sources: TimeRangeMetadata['sources'] = sourcesWithHasDocs | ||
.filter((source) => !source.checkSummaryFieldExists) | ||
.map((checkedSource) => { | ||
const { documentType, hasDocs, rollupInterval } = checkedSource; | ||
return { | ||
documentType, | ||
rollupInterval, | ||
hasDocs, | ||
hasDurationSummaryField: | ||
documentType === ApmDocumentType.ServiceTransactionMetric || | ||
Boolean( | ||
sourcesWithHasDocs.find((eSource) => { | ||
return ( | ||
eSource.documentType === documentType && | ||
eSource.rollupInterval === rollupInterval && | ||
eSource.checkSummaryFieldExists | ||
); | ||
})?.hasDocs | ||
), | ||
}; | ||
}); | ||
|
||
return sources.concat({ | ||
documentType: ApmDocumentType.TransactionEvent, | ||
rollupInterval: RollupInterval.None, | ||
hasDocs: true, | ||
hasDurationSummaryField: false, | ||
}); | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one small nit here: maybe we should rename to
allHaveDurationSummary
?