Skip to content

Commit

Permalink
[Fleet] Use metering API in serverless (elastic#200063)
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet authored and CAWilson94 committed Dec 12, 2024
1 parent 53a5942 commit 1f8f756
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 9 deletions.
1 change: 1 addition & 0 deletions config/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ xpack.fleet.internal.disableILMPolicies: true
xpack.fleet.internal.activeAgentsSoftLimit: 25000
xpack.fleet.internal.onlyAllowAgentUpgradeToKnownVersions: true
xpack.fleet.internal.retrySetupOnBoot: true
xpack.fleet.internal.useMeteringApi: true

## Fine-tune the feature privileges.
xpack.features.overrides:
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export interface FleetConfigType {
disableBundledPackagesCache?: boolean;
};
internal?: {
useMeteringApi?: boolean;
disableILMPolicies: boolean;
fleetServerStandalone: boolean;
onlyAllowAgentUpgradeToKnownVersions: boolean;
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/fleet/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ export const config: PluginConfigDescriptor = {
}),

internal: schema.object({
useMeteringApi: schema.boolean({
defaultValue: false,
}),
disableILMPolicies: schema.boolean({
defaultValue: false,
}),
Expand Down
50 changes: 41 additions & 9 deletions x-pack/plugins/fleet/server/routes/data_streams/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { Dictionary } from 'lodash';
import { keyBy, keys, merge } from 'lodash';
import type { RequestHandler } from '@kbn/core/server';
import pMap from 'p-map';
Expand All @@ -13,9 +14,13 @@ import { KibanaSavedObjectType } from '../../../common/types';
import type { GetDataStreamsResponse } from '../../../common/types';
import { getPackageSavedObjects } from '../../services/epm/packages/get';
import { defaultFleetErrorHandler } from '../../errors';
import type { MeteringStats } from '../../services/data_streams';
import { dataStreamService } from '../../services/data_streams';

import { getDataStreamsQueryMetadata } from './get_data_streams_query_metadata';
import type { IndicesDataStreamsStatsDataStreamsStatsItem } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ByteSizeValue } from '@kbn/config-schema';
import { appContextService } from '../../services';

const MANAGED_BY = 'fleet';
const LEGACY_MANAGED_BY = 'ingest-manager';
Expand Down Expand Up @@ -51,10 +56,22 @@ export const getListHandler: RequestHandler = async (context, request, response)
};

try {
const useMeteringApi = appContextService.getConfig()?.internal?.useMeteringApi;

// Get matching data streams, their stats, and package SOs
const [dataStreamsInfo, dataStreamStats, packageSavedObjects] = await Promise.all([
const [
dataStreamsInfo,
dataStreamStatsOrUndefined,
dataStreamMeteringStatsorUndefined,
packageSavedObjects,
] = await Promise.all([
dataStreamService.getAllFleetDataStreams(esClient),
dataStreamService.getAllFleetDataStreamsStats(esClient),
useMeteringApi
? undefined
: dataStreamService.getAllFleetDataStreamsStats(elasticsearch.client.asSecondaryAuthUser),
useMeteringApi
? dataStreamService.getAllFleetMeteringStats(elasticsearch.client.asSecondaryAuthUser)
: undefined,
getPackageSavedObjects(savedObjects.client),
]);

Expand All @@ -67,13 +84,24 @@ export const getListHandler: RequestHandler = async (context, request, response)

const dataStreamsInfoByName = keyBy<ESDataStreamInfo>(filteredDataStreamsInfo, 'name');

const filteredDataStreamsStats = dataStreamStats.filter(
(dss) => !!dataStreamsInfoByName[dss.data_stream]
);
const dataStreamsStatsByName = keyBy(filteredDataStreamsStats, 'data_stream');
let dataStreamsStatsByName: Dictionary<IndicesDataStreamsStatsDataStreamsStatsItem> = {};
if (dataStreamStatsOrUndefined) {
const filteredDataStreamsStats = dataStreamStatsOrUndefined.filter(
(dss) => !!dataStreamsInfoByName[dss.data_stream]
);
dataStreamsStatsByName = keyBy(filteredDataStreamsStats, 'data_stream');
}
let dataStreamsMeteringStatsByName: Dictionary<MeteringStats> = {};
if (dataStreamMeteringStatsorUndefined) {
dataStreamsMeteringStatsByName = keyBy(dataStreamMeteringStatsorUndefined, 'name');
}

// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
const dataStreams = merge(
dataStreamsInfoByName,
dataStreamsStatsByName,
dataStreamsMeteringStatsByName
);
const dataStreamNames = keys(dataStreams);

// Map package SOs
Expand Down Expand Up @@ -132,10 +160,14 @@ export const getListHandler: RequestHandler = async (context, request, response)
package: dataStream._meta?.package?.name || '',
package_version: '',
last_activity_ms: dataStream.maximum_timestamp, // overridden below if maxIngestedTimestamp agg returns a result
size_in_bytes: dataStream.store_size_bytes,
size_in_bytes: dataStream.store_size_bytes || dataStream.size_in_bytes,
// `store_size` should be available from ES due to ?human=true flag
// but fallback to bytes just in case
size_in_bytes_formatted: dataStream.store_size || `${dataStream.store_size_bytes}b`,
size_in_bytes_formatted:
dataStream.store_size ||
new ByteSizeValue(
dataStream.store_size_bytes || dataStream.size_in_bytes || 0
).toString(),
dashboards: [],
serviceDetails: null,
};
Expand Down
21 changes: 21 additions & 0 deletions x-pack/plugins/fleet/server/services/data_streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import type { ElasticsearchClient } from '@kbn/core/server';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*,synthetics-*-*,profiling-*';

export interface MeteringStatsResponse {
datastreams: MeteringStats[];
}
export interface MeteringStats {
name: string;
num_docs: number;
size_in_bytes: number;
}

class DataStreamService {
public async getAllFleetDataStreams(esClient: ElasticsearchClient) {
const { data_streams: dataStreamsInfo } = await esClient.indices.getDataStream({
Expand All @@ -19,6 +28,18 @@ class DataStreamService {
return dataStreamsInfo;
}

public async getAllFleetMeteringStats(esClient: ElasticsearchClient) {
const res = await esClient.transport.request<MeteringStatsResponse>({
path: `/_metering/stats`,
method: 'GET',
querystring: {
human: true,
},
});

return res.datastreams ?? [];
}

public async getAllFleetDataStreamsStats(esClient: ElasticsearchClient) {
const { data_streams: dataStreamStats } = await esClient.indices.dataStreamsStats({
name: DATA_STREAM_INDEX_PATTERN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export default function (ctx: FtrProviderContext) {
const svlCommonApi = ctx.getService('svlCommonApi');
const supertestWithoutAuth = ctx.getService('supertestWithoutAuth');
const svlUserManager = ctx.getService('svlUserManager');
const es = ctx.getService('es');
let roleAuthc: RoleCredentials;

describe('fleet', function () {
Expand Down Expand Up @@ -112,5 +113,130 @@ export default function (ctx: FtrProviderContext) {
});
expect(status).toBe(200);
});

describe('datastreams API', () => {
before(async () => {
await es.index({
refresh: 'wait_for',
index: 'logs-nginx.access-default',
document: {
agent: {
name: 'docker-fleet-agent',
id: 'ef5e274d-4b53-45e6-943a-a5bcf1a6f523',
ephemeral_id: '34369a4a-4f24-4a39-9758-85fc2429d7e2',
type: 'filebeat',
version: '8.5.0',
},
nginx: {
access: {
remote_ip_list: ['127.0.0.1'],
},
},
log: {
file: {
path: '/tmp/service_logs/access.log',
},
offset: 0,
},
elastic_agent: {
id: 'ef5e274d-4b53-45e6-943a-a5bcf1a6f523',
version: '8.5.0',
snapshot: false,
},
source: {
address: '127.0.0.1',
ip: '127.0.0.1',
},
url: {
path: '/server-status',
original: '/server-status',
},
tags: ['nginx-access'],
input: {
type: 'log',
},
'@timestamp': new Date().toISOString(),
_tmp: {},
ecs: {
version: '8.11.0',
},
related: {
ip: ['127.0.0.1'],
},
data_stream: {
namespace: 'default',
type: 'logs',
dataset: 'nginx.access',
},
host: {
hostname: 'docker-fleet-agent',
os: {
kernel: '5.15.49-linuxkit',
codename: 'focal',
name: 'Ubuntu',
family: 'debian',
type: 'linux',
version: '20.04.5 LTS (Focal Fossa)',
platform: 'ubuntu',
},
containerized: false,
ip: ['172.18.0.7'],
name: 'docker-fleet-agent',
id: '66392b0697b84641af8006d87aeb89f1',
mac: ['02-42-AC-12-00-07'],
architecture: 'x86_64',
},
http: {
request: {
method: 'GET',
},
response: {
status_code: 200,
body: {
bytes: 97,
},
},
version: '1.1',
},
event: {
agent_id_status: 'verified',
ingested: '2022-12-09T10:39:40Z',
created: '2022-12-09T10:39:38.896Z',
kind: 'event',
timezone: '+00:00',
category: ['web'],
type: ['access'],
dataset: 'nginx.access',
outcome: 'success',
},
user_agent: {
original: 'curl/7.64.0',
name: 'curl',
device: {
name: 'Other',
},
version: '7.64.0',
},
},
});
});

after(async () => {
await es.transport.request({
path: `/_data_stream/logs-nginx.access-default`,
method: 'delete',
});
});

it('it works', async () => {
const { body, status } = await supertestWithoutAuth
.get('/api/fleet/data_streams')
.set(svlCommonApi.getInternalRequestHeader())
.set(roleAuthc.apiKeyHeader);

expect(status).toBe(200);
expect(body.data_streams?.[0]?.index).toBe('logs-nginx.access-default');
});
});
});
}

0 comments on commit 1f8f756

Please sign in to comment.