Skip to content

Commit

Permalink
[8.x] [ML] Fix file upload with no ingest pipeline (#193744) (#194019)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `8.x`:
- [[ML] Fix file upload with no ingest pipeline
(#193744)](#193744)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"James
Gowdy","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-09-25T14:30:30Z","message":"[ML]
Fix file upload with no ingest pipeline (#193744)\n\nWith some datasets
the find structure api will not generate an ingest\r\npipeline. A
recent\r\n[change](#186956) to how
we catch\r\nand display errors during file upload means an upload with
no pipeline\r\nnow produces an error which aborts the
upload.\r\nPreviously all pipeline creation errors were ignored and
hidden from the\r\nuser.\r\n\r\nThis PR changes changes the file upload
endpoint to allow it to receive\r\nno ingest pipeline and also changes
the UI to not display the pipeline\r\ncreation step during
upload.\r\n\r\nThis file can be used to test the
fix.\r\nhttps://github.com/elastic/eland/blob/main/tests/flights.json.gz","sha":"ee1a147baca52dca5703663d35b66e7c44f3b676","branchLabelMapping":{"^v9.0.0$":"main","^v8.16.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:fix",":ml","Feature:File
and Index Data Viz","Feature:File
Upload","v9.0.0","v8.16.0"],"title":"[ML] Fix file upload with no ingest
pipeline","number":193744,"url":"https://github.com/elastic/kibana/pull/193744","mergeCommit":{"message":"[ML]
Fix file upload with no ingest pipeline (#193744)\n\nWith some datasets
the find structure api will not generate an ingest\r\npipeline. A
recent\r\n[change](#186956) to how
we catch\r\nand display errors during file upload means an upload with
no pipeline\r\nnow produces an error which aborts the
upload.\r\nPreviously all pipeline creation errors were ignored and
hidden from the\r\nuser.\r\n\r\nThis PR changes changes the file upload
endpoint to allow it to receive\r\nno ingest pipeline and also changes
the UI to not display the pipeline\r\ncreation step during
upload.\r\n\r\nThis file can be used to test the
fix.\r\nhttps://github.com/elastic/eland/blob/main/tests/flights.json.gz","sha":"ee1a147baca52dca5703663d35b66e7c44f3b676"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/193744","number":193744,"mergeCommit":{"message":"[ML]
Fix file upload with no ingest pipeline (#193744)\n\nWith some datasets
the find structure api will not generate an ingest\r\npipeline. A
recent\r\n[change](#186956) to how
we catch\r\nand display errors during file upload means an upload with
no pipeline\r\nnow produces an error which aborts the
upload.\r\nPreviously all pipeline creation errors were ignored and
hidden from the\r\nuser.\r\n\r\nThis PR changes changes the file upload
endpoint to allow it to receive\r\nno ingest pipeline and also changes
the UI to not display the pipeline\r\ncreation step during
upload.\r\n\r\nThis file can be used to test the
fix.\r\nhttps://github.com/elastic/eland/blob/main/tests/flights.json.gz","sha":"ee1a147baca52dca5703663d35b66e7c44f3b676"}},{"branch":"8.x","label":"v8.16.0","branchLabelMappingKey":"^v8.16.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: James Gowdy <[email protected]>
  • Loading branch information
kibanamachine and jgowdyelastic authored Sep 25, 2024
1 parent 44e1ca2 commit 603f51f
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ interface Config {
mappingsString: string;
pipelineString: string;
pipelineId: string | null;
createPipeline: boolean;
}

export async function importData(props: Props, config: Config, setState: (state: unknown) => void) {
Expand All @@ -41,6 +42,7 @@ export async function importData(props: Props, config: Config, setState: (state:
mappingsString,
pipelineString,
pipelineId,
createPipeline,
} = config;
const { format } = results;

Expand Down Expand Up @@ -86,7 +88,7 @@ export async function importData(props: Props, config: Config, setState: (state:

let settings = {};
let mappings = {};
let pipeline = {};
let pipeline;

try {
settings = JSON.parse(indexSettingsString);
Expand All @@ -109,7 +111,9 @@ export async function importData(props: Props, config: Config, setState: (state:
}

try {
pipeline = JSON.parse(pipelineString);
if (createPipeline) {
pipeline = JSON.parse(pipelineString) as IngestPipeline;
}
} catch (error) {
success = false;
const parseError = i18n.translate('xpack.dataVisualizer.file.importView.parsePipelineError', {
Expand Down Expand Up @@ -143,12 +147,7 @@ export async function importData(props: Props, config: Config, setState: (state:
return;
}

const initializeImportResp = await importer.initializeImport(
index,
settings,
mappings,
pipeline as IngestPipeline
);
const initializeImportResp = await importer.initializeImport(index, settings, mappings, pipeline);

const timeFieldName = importer.getTimeField();
setState({ timeFieldName });
Expand All @@ -158,14 +157,20 @@ export async function importData(props: Props, config: Config, setState: (state:
indexCreatedStatus: getSuccess(indexCreated),
});

const pipelineCreated = initializeImportResp.pipelineId !== undefined;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
});
if (createPipeline) {
const pipelineCreated = initializeImportResp.pipelineId !== undefined;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated
? IMPORT_STATUS.COMPLETE
: IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
});
}
success = indexCreated && pipelineCreated;
} else {
success = indexCreated;
}
success = indexCreated && pipelineCreated;

if (success === false) {
errors.push(initializeImportResp.error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ export class ImportView extends Component {
pipelineId,
} = this.state;

const createPipeline = pipelineString !== '';
this.setState({
createPipeline,
});

importData(
{ data, results, dataViewsContract, fileUpload },
{
Expand All @@ -119,6 +124,7 @@ export class ImportView extends Component {
mappingsString,
pipelineString,
pipelineId,
createPipeline,
},
(state) => this.setState(state)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline: {
id: pipelineId,
},
ingestPipeline:
pipelineId !== undefined
? {
id: pipelineId,
}
: undefined,
});

if (!this._isActive) {
Expand Down
54 changes: 29 additions & 25 deletions x-pack/plugins/file_upload/public/importer/importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import { i18n } from '@kbn/i18n';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { getHttp } from '../kibana_services';
import { MB } from '../../common/constants';
import type { ImportDoc, ImportFailure, ImportResponse, IngestPipeline } from '../../common/types';
import type {
ImportDoc,
ImportFailure,
ImportResponse,
IngestPipeline,
IngestPipelineWrapper,
} from '../../common/types';
import { CreateDocsResponse, IImporter, ImportResults } from './types';

const CHUNK_SIZE = 5000;
Expand Down Expand Up @@ -79,26 +85,25 @@ export abstract class Importer implements IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline
pipeline: IngestPipeline | undefined
) {
updatePipelineTimezone(pipeline);

if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
let ingestPipeline: IngestPipelineWrapper | undefined;
if (pipeline !== undefined) {
updatePipelineTimezone(pipeline);

if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
}
// if no pipeline has been supplied,
// send an empty object
ingestPipeline = {
id: `${index}-pipeline`,
pipeline,
};
}

// if no pipeline has been supplied,
// send an empty object
const ingestPipeline =
pipeline !== undefined
? {
id: `${index}-pipeline`,
pipeline,
}
: {};

this._index = index;
this._pipeline = pipeline;

Expand Down Expand Up @@ -139,9 +144,11 @@ export abstract class Importer implements IImporter {

const chunks = createDocumentChunks(this._docArray, this._chunkSize);

const ingestPipeline = {
id: pipelineId,
};
const ingestPipeline: IngestPipelineWrapper | undefined = pipelineId
? {
id: pipelineId,
}
: undefined;

let success = true;
const failures: ImportFailure[] = [];
Expand Down Expand Up @@ -345,10 +352,7 @@ export function callImportRoute({
data: ImportDoc[];
settings: IndicesIndexSettings;
mappings: MappingTypeMapping;
ingestPipeline: {
id?: string;
pipeline?: IngestPipeline;
};
ingestPipeline: IngestPipelineWrapper | undefined;
}) {
const query = id !== undefined ? { id } : {};
const body = JSON.stringify({
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/file_upload/public/importer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline
pipeline: IngestPipeline | undefined
): Promise<ImportResponse>;
import(
id: string,
Expand Down
8 changes: 4 additions & 4 deletions x-pack/plugins/file_upload/server/import_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
ingestPipeline: IngestPipelineWrapper,
ingestPipeline: IngestPipelineWrapper | undefined,
data: InputData
): Promise<ImportResponse> {
let createdIndex;
let createdPipelineId;
const docCount = data.length;

try {
const { id: pipelineId, pipeline } = ingestPipeline;
const pipelineId = ingestPipeline?.id;
const pipeline = ingestPipeline?.pipeline;

if (id === undefined) {
// first chunk of data, create the index and id to return
Expand All @@ -48,7 +49,6 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
createdPipelineId = pipelineId;
} else {
createdIndex = index;
createdPipelineId = pipelineId;
}

let failures: ImportFailure[] = [];
Expand Down Expand Up @@ -109,7 +109,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
await asCurrentUser.indices.create({ index, body }, { maxRetries: 0 });
}

async function indexData(index: string, pipelineId: string, data: InputData) {
async function indexData(index: string, pipelineId: string | undefined, data: InputData) {
try {
const body = [];
for (let i = 0; i < data.length; i++) {
Expand Down
10 changes: 6 additions & 4 deletions x-pack/plugins/file_upload/server/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ export const importFileBodySchema = schema.object({
/** Mappings */
mappings: schema.any(),
/** Ingest pipeline definition */
ingestPipeline: schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
}),
ingestPipeline: schema.maybe(
schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
})
),
});

export const runtimeMappingsSchema = schema.object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,42 @@ export default function ({ getService }: FtrProviderContext) {
ingestedDocCount: 3,
},
},
{
suiteSuffix: 'with a file which does not generate a ingest pipeline',
filePath: require.resolve('./files_to_import/flights_small.json'),
indexName: 'user-import_4',
createIndexPattern: false,
fieldTypeFilters: [ML_JOB_FIELD_TYPES.KEYWORD],
fieldNameFilters: ['timestamp'],
expected: {
results: {
title: 'flights_small.json',
highlightedText: false,
},
metricFields: [],
nonMetricFields: [
{
fieldName: 'Carrier',
type: ML_JOB_FIELD_TYPES.KEYWORD,
docCountFormatted: '20 (100%)',
exampleCount: 4,
},
{
fieldName: 'timestamp',
type: ML_JOB_FIELD_TYPES.KEYWORD,
docCountFormatted: '20 (100%)',
exampleCount: 11,
},
],
visibleMetricFieldsCount: 0,
totalMetricFieldsCount: 0,
populatedFieldsCount: 3,
totalFieldsCount: 25,
fieldTypeFiltersResultCount: 16,
fieldNameFiltersResultCount: 1,
ingestedDocCount: 20,
},
},
];

const testDataListNegative = [
Expand Down
Loading

0 comments on commit 603f51f

Please sign in to comment.