diff --git a/config/serverless.yml b/config/serverless.yml index 7eca5cae871c3..27580c3ce3da2 100644 --- a/config/serverless.yml +++ b/config/serverless.yml @@ -7,6 +7,7 @@ xpack.fleet.internal.disableILMPolicies: true xpack.fleet.internal.activeAgentsSoftLimit: 25000 xpack.fleet.internal.onlyAllowAgentUpgradeToKnownVersions: true xpack.fleet.internal.retrySetupOnBoot: true +xpack.fleet.internal.useMeteringApi: true ## Fine-tune the feature privileges. xpack.features.overrides: diff --git a/x-pack/plugins/fleet/common/types/index.ts b/x-pack/plugins/fleet/common/types/index.ts index 647a8b917d0c0..f77ea38dc7b5f 100644 --- a/x-pack/plugins/fleet/common/types/index.ts +++ b/x-pack/plugins/fleet/common/types/index.ts @@ -65,6 +65,7 @@ export interface FleetConfigType { disableBundledPackagesCache?: boolean; }; internal?: { + useMeteringApi?: boolean; disableILMPolicies: boolean; fleetServerStandalone: boolean; onlyAllowAgentUpgradeToKnownVersions: boolean; diff --git a/x-pack/plugins/fleet/server/config.ts b/x-pack/plugins/fleet/server/config.ts index 746498221de55..8035f389db488 100644 --- a/x-pack/plugins/fleet/server/config.ts +++ b/x-pack/plugins/fleet/server/config.ts @@ -203,6 +203,9 @@ export const config: PluginConfigDescriptor = { internal: schema.maybe( schema.object({ + useMeteringApi: schema.boolean({ + defaultValue: false, + }), disableILMPolicies: schema.boolean({ defaultValue: false, }), diff --git a/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts b/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts index f9021f344c69b..7cbc9d9274032 100644 --- a/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts +++ b/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts @@ -4,6 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import type { Dictionary } from 'lodash'; import { keyBy, keys, merge } from 'lodash'; import type { RequestHandler } from '@kbn/core/server'; import pMap from 'p-map'; @@ -13,9 +14,13 @@ import { KibanaSavedObjectType } from '../../../common/types'; import type { GetDataStreamsResponse } from '../../../common/types'; import { getPackageSavedObjects } from '../../services/epm/packages/get'; import { defaultFleetErrorHandler } from '../../errors'; +import type { MeteringStats } from '../../services/data_streams'; import { dataStreamService } from '../../services/data_streams'; import { getDataStreamsQueryMetadata } from './get_data_streams_query_metadata'; +import type { IndicesDataStreamsStatsDataStreamsStatsItem } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { ByteSizeValue } from '@kbn/config-schema'; +import { appContextService } from '../../services'; const MANAGED_BY = 'fleet'; const LEGACY_MANAGED_BY = 'ingest-manager'; @@ -51,10 +56,22 @@ export const getListHandler: RequestHandler = async (context, request, response) }; try { + const useMeteringApi = appContextService.getConfig()?.internal?.useMeteringApi; + // Get matching data streams, their stats, and package SOs - const [dataStreamsInfo, dataStreamStats, packageSavedObjects] = await Promise.all([ + const [ + dataStreamsInfo, + dataStreamStatsOrUndefined, + dataStreamMeteringStatsorUndefined, + packageSavedObjects, + ] = await Promise.all([ dataStreamService.getAllFleetDataStreams(esClient), - dataStreamService.getAllFleetDataStreamsStats(esClient), + useMeteringApi + ? undefined + : dataStreamService.getAllFleetDataStreamsStats(elasticsearch.client.asSecondaryAuthUser), + useMeteringApi + ? dataStreamService.getAllFleetMeteringStats(elasticsearch.client.asSecondaryAuthUser) + : undefined, getPackageSavedObjects(savedObjects.client), ]); @@ -67,13 +84,24 @@ export const getListHandler: RequestHandler = async (context, request, response) const dataStreamsInfoByName = keyBy(filteredDataStreamsInfo, 'name'); - const filteredDataStreamsStats = dataStreamStats.filter( - (dss) => !!dataStreamsInfoByName[dss.data_stream] - ); - const dataStreamsStatsByName = keyBy(filteredDataStreamsStats, 'data_stream'); + let dataStreamsStatsByName: Dictionary = {}; + if (dataStreamStatsOrUndefined) { + const filteredDataStreamsStats = dataStreamStatsOrUndefined.filter( + (dss) => !!dataStreamsInfoByName[dss.data_stream] + ); + dataStreamsStatsByName = keyBy(filteredDataStreamsStats, 'data_stream'); + } + let dataStreamsMeteringStatsByName: Dictionary = {}; + if (dataStreamMeteringStatsorUndefined) { + dataStreamsMeteringStatsByName = keyBy(dataStreamMeteringStatsorUndefined, 'name'); + } // Combine data stream info - const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName); + const dataStreams = merge( + dataStreamsInfoByName, + dataStreamsStatsByName, + dataStreamsMeteringStatsByName + ); const dataStreamNames = keys(dataStreams); // Map package SOs @@ -132,10 +160,14 @@ export const getListHandler: RequestHandler = async (context, request, response) package: dataStream._meta?.package?.name || '', package_version: '', last_activity_ms: dataStream.maximum_timestamp, // overridden below if maxIngestedTimestamp agg returns a result - size_in_bytes: dataStream.store_size_bytes, + size_in_bytes: dataStream.store_size_bytes || dataStream.size_in_bytes, // `store_size` should be available from ES due to ?human=true flag // but fallback to bytes just in case - size_in_bytes_formatted: dataStream.store_size || `${dataStream.store_size_bytes}b`, + size_in_bytes_formatted: + dataStream.store_size || + new ByteSizeValue( + dataStream.store_size_bytes || dataStream.size_in_bytes || 0 + ).toString(), dashboards: [], serviceDetails: null, }; diff --git a/x-pack/plugins/fleet/server/services/data_streams.ts b/x-pack/plugins/fleet/server/services/data_streams.ts index 6dd60a4e0be1e..68076c3309ab2 100644 --- a/x-pack/plugins/fleet/server/services/data_streams.ts +++ b/x-pack/plugins/fleet/server/services/data_streams.ts @@ -10,6 +10,15 @@ import type { ElasticsearchClient } from '@kbn/core/server'; const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*,synthetics-*-*,profiling-*'; +export interface MeteringStatsResponse { + datastreams: MeteringStats[]; +} +export interface MeteringStats { + name: string; + num_docs: number; + size_in_bytes: number; +} + class DataStreamService { public async getAllFleetDataStreams(esClient: ElasticsearchClient) { const { data_streams: dataStreamsInfo } = await esClient.indices.getDataStream({ @@ -19,6 +28,18 @@ class DataStreamService { return dataStreamsInfo; } + public async getAllFleetMeteringStats(esClient: ElasticsearchClient) { + const res = await esClient.transport.request({ + path: `/_metering/stats`, + method: 'GET', + querystring: { + human: true, + }, + }); + + return res.datastreams ?? []; + } + public async getAllFleetDataStreamsStats(esClient: ElasticsearchClient) { const { data_streams: dataStreamStats } = await esClient.indices.dataStreamsStats({ name: DATA_STREAM_INDEX_PATTERN, diff --git a/x-pack/test_serverless/api_integration/test_suites/security/fleet/fleet.ts b/x-pack/test_serverless/api_integration/test_suites/security/fleet/fleet.ts index 45db1f2e530dc..d812e43dfa62a 100644 --- a/x-pack/test_serverless/api_integration/test_suites/security/fleet/fleet.ts +++ b/x-pack/test_serverless/api_integration/test_suites/security/fleet/fleet.ts @@ -17,6 +17,7 @@ export default function (ctx: FtrProviderContext) { const svlCommonApi = ctx.getService('svlCommonApi'); const supertestWithoutAuth = ctx.getService('supertestWithoutAuth'); const svlUserManager = ctx.getService('svlUserManager'); + const es = ctx.getService('es'); let roleAuthc: RoleCredentials; describe('fleet', function () { @@ -112,5 +113,130 @@ export default function (ctx: FtrProviderContext) { }); expect(status).toBe(200); }); + + describe('datastreams API', () => { + before(async () => { + await es.index({ + refresh: 'wait_for', + index: 'logs-nginx.access-default', + document: { + agent: { + name: 'docker-fleet-agent', + id: 'ef5e274d-4b53-45e6-943a-a5bcf1a6f523', + ephemeral_id: '34369a4a-4f24-4a39-9758-85fc2429d7e2', + type: 'filebeat', + version: '8.5.0', + }, + nginx: { + access: { + remote_ip_list: ['127.0.0.1'], + }, + }, + log: { + file: { + path: '/tmp/service_logs/access.log', + }, + offset: 0, + }, + elastic_agent: { + id: 'ef5e274d-4b53-45e6-943a-a5bcf1a6f523', + version: '8.5.0', + snapshot: false, + }, + source: { + address: '127.0.0.1', + ip: '127.0.0.1', + }, + url: { + path: '/server-status', + original: '/server-status', + }, + tags: ['nginx-access'], + input: { + type: 'log', + }, + '@timestamp': new Date().toISOString(), + _tmp: {}, + ecs: { + version: '8.11.0', + }, + related: { + ip: ['127.0.0.1'], + }, + data_stream: { + namespace: 'default', + type: 'logs', + dataset: 'nginx.access', + }, + host: { + hostname: 'docker-fleet-agent', + os: { + kernel: '5.15.49-linuxkit', + codename: 'focal', + name: 'Ubuntu', + family: 'debian', + type: 'linux', + version: '20.04.5 LTS (Focal Fossa)', + platform: 'ubuntu', + }, + containerized: false, + ip: ['172.18.0.7'], + name: 'docker-fleet-agent', + id: '66392b0697b84641af8006d87aeb89f1', + mac: ['02-42-AC-12-00-07'], + architecture: 'x86_64', + }, + http: { + request: { + method: 'GET', + }, + response: { + status_code: 200, + body: { + bytes: 97, + }, + }, + version: '1.1', + }, + event: { + agent_id_status: 'verified', + ingested: '2022-12-09T10:39:40Z', + created: '2022-12-09T10:39:38.896Z', + kind: 'event', + timezone: '+00:00', + category: ['web'], + type: ['access'], + dataset: 'nginx.access', + outcome: 'success', + }, + user_agent: { + original: 'curl/7.64.0', + name: 'curl', + device: { + name: 'Other', + }, + version: '7.64.0', + }, + }, + }); + }); + + after(async () => { + await es.transport.request({ + path: `/_data_stream/logs-nginx.access-default`, + method: 'delete', + }); + }); + + it('it works', async () => { + const { body, status } = await supertestWithoutAuth + .get('/api/fleet/data_streams') + .set(svlCommonApi.getInternalRequestHeader()) + .set(roleAuthc.apiKeyHeader); + + expect(status).toBe(200); + expect(body.data_streams?.[0]?.index).toBe('logs-nginx.access-default'); + }); + }); }); }