diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.test.ts b/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.test.ts index 51f1e06f278ad..4bae40076e581 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.test.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.test.ts @@ -13,6 +13,8 @@ import { encode } from 'cbor-x'; import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; import { ContentStream, ContentStreamEncoding, ContentStreamParameters } from './content_stream'; import type { GetResponse } from '@elastic/elasticsearch/lib/api/types'; +import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { FileDocument } from '../../../../file_client/file_metadata_client/adapters/es_index'; describe('ContentStream', () => { let client: ReturnType; @@ -30,8 +32,9 @@ describe('ContentStream', () => { encoding: 'base64' as ContentStreamEncoding, size: 1, } as ContentStreamParameters, + indexIsAlias = false, } = {}) => { - return new ContentStream(client, id, index, logger, params); + return new ContentStream(client, id, index, logger, params, indexIsAlias); }; beforeEach(() => { @@ -43,124 +46,193 @@ describe('ContentStream', () => { }); describe('read', () => { - beforeEach(() => { - stream = getContentStream({ params: { size: 1 } }); - }); + describe('with `indexIsAlias` set to `true`', () => { + let searchResponse: estypes.SearchResponse>; + + beforeEach(() => { + searchResponse = { + took: 3, + timed_out: false, + _shards: { + total: 2, + successful: 2, + skipped: 0, + failed: 0, + }, + hits: { + total: { + value: 1, + relation: 'eq', + }, + max_score: 0, + hits: [ + { + _index: 'foo', + _id: '123', + _score: 1.0, + }, + ], + }, + }; + + client.search.mockResolvedValue(searchResponse); + }); - it('should perform a search using index and the document id', async () => { - await new Promise((resolve) => stream.once('data', resolve)); + it('should use es.search() to find chunk index', async () => { + stream = getContentStream({ params: { size: 1 }, indexIsAlias: true }); + const data = await new Promise((resolve) => stream.once('data', resolve)); + + expect(client.search).toHaveBeenCalledWith({ + body: { + _source: false, + query: { + term: { + _id: 'something.0', + }, + }, + size: 1, + }, + index: 'somewhere', + }); + expect(data).toEqual(Buffer.from('some content')); + }); - expect(client.get).toHaveBeenCalledTimes(1); + it('should throw if chunk is not found', async () => { + searchResponse.hits.hits = []; + stream = getContentStream({ params: { size: 1 }, indexIsAlias: true }); - const [[request]] = client.get.mock.calls; - expect(request).toHaveProperty('index', 'somewhere'); - expect(request).toHaveProperty('id', 'something.0'); - }); + const readPromise = new Promise((resolve, reject) => { + stream.once('data', resolve); + stream.once('error', reject); + }); - it('should read the document contents', async () => { - const data = await new Promise((resolve) => stream.once('data', resolve)); - expect(data).toEqual(Buffer.from('some content')); + await expect(readPromise).rejects.toHaveProperty( + 'message', + 'Unable to determine index for file chunk id [something.0] in index (alias) [somewhere]' + ); + }); }); - it('should be an empty stream on empty response', async () => { - client.get.mockResponseOnce(toReadable()); - const onData = jest.fn(); + describe('with `indexIsAlias` set to `false`', () => { + beforeEach(() => { + stream = getContentStream({ params: { size: 1 } }); + }); - stream.on('data', onData); - await new Promise((resolve) => stream.once('end', resolve)); + it('should perform a search using index and the document id', async () => { + await new Promise((resolve) => stream.once('data', resolve)); - expect(onData).not.toHaveBeenCalled(); - }); + expect(client.get).toHaveBeenCalledTimes(1); - it('should emit an error event', async () => { - client.get.mockRejectedValueOnce('some error'); + const [[request]] = client.get.mock.calls; + expect(request).toHaveProperty('index', 'somewhere'); + expect(request).toHaveProperty('id', 'something.0'); + }); - stream.read(); - const error = await new Promise((resolve) => stream.once('error', resolve)); + it('should read the document contents', async () => { + const data = await new Promise((resolve) => stream.once('data', resolve)); + expect(data).toEqual(Buffer.from('some content')); + }); - expect(error).toBe('some error'); - }); + it('should be an empty stream on empty response', async () => { + client.get.mockResponseOnce(toReadable()); + const onData = jest.fn(); - it('should decode base64 encoded content', async () => { - client.get.mockResponseOnce( - toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content'))) - ); - const data = await new Promise((resolve) => stream.once('data', resolve)); + stream.on('data', onData); + await new Promise((resolve) => stream.once('end', resolve)); - expect(data).toEqual(Buffer.from('encoded content')); - }); + expect(onData).not.toHaveBeenCalled(); + }); + + it('should emit an error event', async () => { + client.get.mockRejectedValueOnce('some error'); - it('should compound content from multiple chunks', async () => { - const [one, two, three] = ['12', '34', '56'].map(Buffer.from); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three))); + stream.read(); + const error = await new Promise((resolve) => stream.once('error', resolve)); - stream = getContentStream({ - params: { size: 6 }, + expect(error).toBe('some error'); }); - let data = ''; - for await (const chunk of stream) { - data += chunk; - } + it('should decode base64 encoded content', async () => { + client.get.mockResponseOnce( + toReadable(set({ found: true }, '_source.data', Buffer.from('encoded content'))) + ); + const data = await new Promise((resolve) => stream.once('data', resolve)); - expect(data).toEqual('123456'); - expect(client.get).toHaveBeenCalledTimes(3); + expect(data).toEqual(Buffer.from('encoded content')); + }); - const [[request1], [request2], [request3]] = client.get.mock.calls; + it('should compound content from multiple chunks', async () => { + const [one, two, three] = ['12', '34', '56'].map(Buffer.from); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three))); - expect(request1).toHaveProperty('index', 'somewhere'); - expect(request1).toHaveProperty('id', 'something.0'); - expect(request2).toHaveProperty('index', 'somewhere'); - expect(request2).toHaveProperty('id', 'something.1'); - expect(request3).toHaveProperty('index', 'somewhere'); - expect(request3).toHaveProperty('id', 'something.2'); - }); + stream = getContentStream({ + params: { size: 6 }, + }); - it('should stop reading on empty chunk', async () => { - const [one, two, three] = ['12', '34', ''].map(Buffer.from); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three))); - stream = getContentStream({ params: { size: 12 } }); - let data = ''; - for await (const chunk of stream) { - data += chunk; - } - - expect(data).toEqual('1234'); - expect(client.get).toHaveBeenCalledTimes(3); - }); + let data = ''; + for await (const chunk of stream) { + data += chunk; + } - it('should read while chunks are present when there is no size', async () => { - const [one, two] = ['12', '34'].map(Buffer.from); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); - client.get.mockResponseOnce(toReadable({ found: true })); - stream = getContentStream({ params: { size: undefined } }); - let data = ''; - for await (const chunk of stream) { - data += chunk; - } - - expect(data).toEqual('1234'); - expect(client.get).toHaveBeenCalledTimes(3); - }); + expect(data).toEqual('123456'); + expect(client.get).toHaveBeenCalledTimes(3); - it('should decode every chunk separately', async () => { - const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three))); - client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four))); - stream = getContentStream({ params: { size: 12 } }); - let data = ''; - for await (const chunk of stream) { - data += chunk; - } - - expect(data).toEqual('123456'); + const [[request1], [request2], [request3]] = client.get.mock.calls; + + expect(request1).toHaveProperty('index', 'somewhere'); + expect(request1).toHaveProperty('id', 'something.0'); + expect(request2).toHaveProperty('index', 'somewhere'); + expect(request2).toHaveProperty('id', 'something.1'); + expect(request3).toHaveProperty('index', 'somewhere'); + expect(request3).toHaveProperty('id', 'something.2'); + }); + + it('should stop reading on empty chunk', async () => { + const [one, two, three] = ['12', '34', ''].map(Buffer.from); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three))); + stream = getContentStream({ params: { size: 12 } }); + let data = ''; + for await (const chunk of stream) { + data += chunk; + } + + expect(data).toEqual('1234'); + expect(client.get).toHaveBeenCalledTimes(3); + }); + + it('should read while chunks are present when there is no size', async () => { + const [one, two] = ['12', '34'].map(Buffer.from); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); + client.get.mockResponseOnce(toReadable({ found: true })); + stream = getContentStream({ params: { size: undefined } }); + let data = ''; + for await (const chunk of stream) { + data += chunk; + } + + expect(data).toEqual('1234'); + expect(client.get).toHaveBeenCalledTimes(3); + }); + + it('should decode every chunk separately', async () => { + const [one, two, three, four] = ['12', '34', '56', ''].map(Buffer.from); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', one))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', two))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', three))); + client.get.mockResponseOnce(toReadable(set({ found: true }, '_source.data', four))); + stream = getContentStream({ params: { size: 12 } }); + let data = ''; + for await (const chunk of stream) { + data += chunk; + } + + expect(data).toEqual('123456'); + }); }); }); diff --git a/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.ts b/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.ts index 99e29383f020f..98aebda3c7735 100644 --- a/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.ts +++ b/src/plugins/files/server/blob_storage_service/adapters/es/content_stream/content_stream.ts @@ -101,12 +101,15 @@ export class ContentStream extends Duplex { }, }); - const docIndex = chunkDocMeta.hits.hits[0]._index; + const docIndex = chunkDocMeta.hits.hits?.[0]?._index; if (!docIndex) { - throw new Error( + const err = new Error( `Unable to determine index for file chunk id [${id}] in index (alias) [${this.index}]` ); + + this.logger.error(err); + throw err; } return docIndex; diff --git a/src/plugins/files/server/file_client/create_es_file_client.test.ts b/src/plugins/files/server/file_client/create_es_file_client.test.ts new file mode 100644 index 0000000000000..68589b334c8ea --- /dev/null +++ b/src/plugins/files/server/file_client/create_es_file_client.test.ts @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { + type ElasticsearchClientMock, + elasticsearchServiceMock, + loggingSystemMock, +} from '@kbn/core/server/mocks'; +import { MockedLogger } from '@kbn/logging-mocks'; +import { createEsFileClient } from './create_es_file_client'; +import { FileClient } from './types'; +import { ElasticsearchBlobStorageClient } from '../blob_storage_service'; +import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { FileDocument } from './file_metadata_client/adapters/es_index'; + +describe('When initializing file client via createESFileClient()', () => { + let esClient: ElasticsearchClientMock; + let logger: MockedLogger; + + beforeEach(() => { + ElasticsearchBlobStorageClient.configureConcurrentUpload(Infinity); + esClient = elasticsearchServiceMock.createElasticsearchClient(); + logger = loggingSystemMock.createLogger(); + }); + + describe('and `indexIsAlias` argument is used', () => { + let fileClient: FileClient; + let searchResponse: estypes.SearchResponse>; + + beforeEach(() => { + searchResponse = { + took: 3, + timed_out: false, + _shards: { + total: 2, + successful: 2, + skipped: 0, + failed: 0, + }, + hits: { + total: { + value: 1, + relation: 'eq', + }, + max_score: 0, + hits: [ + { + _index: 'foo', + _id: '123', + _score: 1.0, + _source: { + file: { + name: 'foo.txt', + Status: 'READY', + created: '2023-03-27T20:45:31.490Z', + Updated: '2023-03-27T20:45:31.490Z', + FileKind: '', + }, + }, + }, + ], + }, + }; + + esClient.search.mockResolvedValue(searchResponse); + fileClient = createEsFileClient({ + logger, + metadataIndex: 'file-meta', + blobStorageIndex: 'file-data', + elasticsearchClient: esClient, + indexIsAlias: true, + }); + }); + + it('should use es.search() to retrieve file metadata', async () => { + await fileClient.get({ id: '123' }); + expect(esClient.search).toHaveBeenCalledWith({ + body: { + query: { + term: { + _id: '123', + }, + }, + size: 1, + }, + index: 'file-meta', + }); + }); + + it('should throw an error if file is not found', async () => { + (searchResponse.hits.total as estypes.SearchTotalHits).value = 0; + searchResponse.hits.hits = []; + await expect(fileClient.get({ id: '123 ' })).rejects.toHaveProperty( + 'message', + 'File not found' + ); + }); + }); +}); diff --git a/src/plugins/files/server/file_client/file_metadata_client/adapters/es_index.ts b/src/plugins/files/server/file_client/file_metadata_client/adapters/es_index.ts index 37f3988c0cc75..d139c7cf6330f 100644 --- a/src/plugins/files/server/file_client/file_metadata_client/adapters/es_index.ts +++ b/src/plugins/files/server/file_client/file_metadata_client/adapters/es_index.ts @@ -12,7 +12,6 @@ import { Logger } from '@kbn/core/server'; import { toElasticsearchQuery } from '@kbn/es-query'; import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { MappingProperty, SearchTotalHits } from '@elastic/elasticsearch/lib/api/types'; -import { fetchDoc } from '../../utils'; import type { FilesMetrics, FileMetadata, Pagination } from '../../../../common'; import type { FindFileArgs } from '../../../file_service'; import type { @@ -36,7 +35,7 @@ const fileMappings: MappingProperty = { }, }; -interface FileDocument { +export interface FileDocument { file: FileMetadata; } @@ -82,11 +81,36 @@ export class EsIndexFilesMetadataClient implements FileMetadataClie } async get({ id }: GetArg): Promise> { - const { _source: doc } = - (await fetchDoc>(this.esClient, this.index, id, this.indexIsAlias)) ?? {}; + const { esClient, index, indexIsAlias } = this; + let doc: FileDocument | undefined; + + if (indexIsAlias) { + doc = ( + await esClient.search>({ + index, + body: { + size: 1, + query: { + term: { + _id: id, + }, + }, + }, + }) + ).hits.hits?.[0]?._source; + } else { + doc = ( + await esClient.get>({ + index, + id, + }) + )._source; + } if (!doc) { - this.logger.error(`File with id "${id}" not found`); + this.logger.error( + `File with id "${id}" not found in index ${indexIsAlias ? 'alias ' : ''}"${index}"` + ); throw new Error('File not found'); } diff --git a/src/plugins/files/server/file_client/utils.ts b/src/plugins/files/server/file_client/utils.ts index 9c7d07312f635..88e0901680f0c 100644 --- a/src/plugins/files/server/file_client/utils.ts +++ b/src/plugins/files/server/file_client/utils.ts @@ -6,8 +6,6 @@ * Side Public License, v 1. */ -import { GetResponse } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; -import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { FileMetadata } from '../../common'; export function createDefaultFileAttributes(): Pick< @@ -21,31 +19,3 @@ export function createDefaultFileAttributes(): Pick< Updated: dateString, }; } - -export const fetchDoc = async ( - esClient: ElasticsearchClient, - index: string, - docId: string, - indexIsAlias: boolean = false -): Promise | undefined> => { - if (indexIsAlias) { - const fileDocSearchResult = await esClient.search({ - index, - body: { - size: 1, - query: { - term: { - _id: docId, - }, - }, - }, - }); - - return fileDocSearchResult.hits.hits[0] as GetResponse; - } - - return esClient.get({ - index, - id: docId, - }); -}; diff --git a/src/plugins/files/tsconfig.json b/src/plugins/files/tsconfig.json index 8a14fd5ef06bc..ffeed3f22d46a 100644 --- a/src/plugins/files/tsconfig.json +++ b/src/plugins/files/tsconfig.json @@ -28,6 +28,7 @@ "@kbn/core-logging-server-mocks", "@kbn/ecs", "@kbn/safer-lodash-set", + "@kbn/logging-mocks", ], "exclude": [ "target/**/*",