Skip to content

Commit

Permalink
[ML] Fix file upload with no ingest pipeline (elastic#193744)
Browse files Browse the repository at this point in the history
With some datasets the find structure api will not generate an ingest
pipeline. A recent
[change](elastic#186956) to how we catch
and display errors during file upload means an upload with no pipeline
now produces an error which aborts the upload.
Previously all pipeline creation errors were ignored and hidden from the
user.

This PR changes changes the file upload endpoint to allow it to receive
no ingest pipeline and also changes the UI to not display the pipeline
creation step during upload.

This file can be used to test the fix.
https://github.com/elastic/eland/blob/main/tests/flights.json.gz

(cherry picked from commit ee1a147)
  • Loading branch information
jgowdyelastic committed Sep 25, 2024
1 parent 24bec01 commit 1291765
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ interface Config {
mappingsString: string;
pipelineString: string;
pipelineId: string | null;
createPipeline: boolean;
}

export async function importData(props: Props, config: Config, setState: (state: unknown) => void) {
Expand All @@ -41,6 +42,7 @@ export async function importData(props: Props, config: Config, setState: (state:
mappingsString,
pipelineString,
pipelineId,
createPipeline,
} = config;
const { format } = results;

Expand Down Expand Up @@ -86,7 +88,7 @@ export async function importData(props: Props, config: Config, setState: (state:

let settings = {};
let mappings = {};
let pipeline = {};
let pipeline;

try {
settings = JSON.parse(indexSettingsString);
Expand All @@ -109,7 +111,9 @@ export async function importData(props: Props, config: Config, setState: (state:
}

try {
pipeline = JSON.parse(pipelineString);
if (createPipeline) {
pipeline = JSON.parse(pipelineString) as IngestPipeline;
}
} catch (error) {
success = false;
const parseError = i18n.translate('xpack.dataVisualizer.file.importView.parsePipelineError', {
Expand Down Expand Up @@ -143,12 +147,7 @@ export async function importData(props: Props, config: Config, setState: (state:
return;
}

const initializeImportResp = await importer.initializeImport(
index,
settings,
mappings,
pipeline as IngestPipeline
);
const initializeImportResp = await importer.initializeImport(index, settings, mappings, pipeline);

const timeFieldName = importer.getTimeField();
setState({ timeFieldName });
Expand All @@ -158,14 +157,20 @@ export async function importData(props: Props, config: Config, setState: (state:
indexCreatedStatus: getSuccess(indexCreated),
});

const pipelineCreated = initializeImportResp.pipelineId !== undefined;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
});
if (createPipeline) {
const pipelineCreated = initializeImportResp.pipelineId !== undefined;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated
? IMPORT_STATUS.COMPLETE
: IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
});
}
success = indexCreated && pipelineCreated;
} else {
success = indexCreated;
}
success = indexCreated && pipelineCreated;

if (success === false) {
errors.push(initializeImportResp.error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ export class ImportView extends Component {
pipelineId,
} = this.state;

const createPipeline = pipelineString !== '';
this.setState({
createPipeline,
});

importData(
{ data, results, dataViewsContract, fileUpload },
{
Expand All @@ -119,6 +124,7 @@ export class ImportView extends Component {
mappingsString,
pipelineString,
pipelineId,
createPipeline,
},
(state) => this.setState(state)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline: {
id: pipelineId,
},
ingestPipeline:
pipelineId !== undefined
? {
id: pipelineId,
}
: undefined,
});

if (!this._isActive) {
Expand Down
54 changes: 29 additions & 25 deletions x-pack/plugins/file_upload/public/importer/importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import { i18n } from '@kbn/i18n';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { getHttp } from '../kibana_services';
import { MB } from '../../common/constants';
import type { ImportDoc, ImportFailure, ImportResponse, IngestPipeline } from '../../common/types';
import type {
ImportDoc,
ImportFailure,
ImportResponse,
IngestPipeline,
IngestPipelineWrapper,
} from '../../common/types';
import { CreateDocsResponse, IImporter, ImportResults } from './types';

const CHUNK_SIZE = 5000;
Expand Down Expand Up @@ -79,26 +85,25 @@ export abstract class Importer implements IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline
pipeline: IngestPipeline | undefined
) {
updatePipelineTimezone(pipeline);

if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
let ingestPipeline: IngestPipelineWrapper | undefined;
if (pipeline !== undefined) {
updatePipelineTimezone(pipeline);

if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
}
// if no pipeline has been supplied,
// send an empty object
ingestPipeline = {
id: `${index}-pipeline`,
pipeline,
};
}

// if no pipeline has been supplied,
// send an empty object
const ingestPipeline =
pipeline !== undefined
? {
id: `${index}-pipeline`,
pipeline,
}
: {};

this._index = index;
this._pipeline = pipeline;

Expand Down Expand Up @@ -139,9 +144,11 @@ export abstract class Importer implements IImporter {

const chunks = createDocumentChunks(this._docArray, this._chunkSize);

const ingestPipeline = {
id: pipelineId,
};
const ingestPipeline: IngestPipelineWrapper | undefined = pipelineId
? {
id: pipelineId,
}
: undefined;

let success = true;
const failures: ImportFailure[] = [];
Expand Down Expand Up @@ -345,10 +352,7 @@ export function callImportRoute({
data: ImportDoc[];
settings: IndicesIndexSettings;
mappings: MappingTypeMapping;
ingestPipeline: {
id?: string;
pipeline?: IngestPipeline;
};
ingestPipeline: IngestPipelineWrapper | undefined;
}) {
const query = id !== undefined ? { id } : {};
const body = JSON.stringify({
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/file_upload/public/importer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline
pipeline: IngestPipeline | undefined
): Promise<ImportResponse>;
import(
id: string,
Expand Down
8 changes: 4 additions & 4 deletions x-pack/plugins/file_upload/server/import_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
ingestPipeline: IngestPipelineWrapper,
ingestPipeline: IngestPipelineWrapper | undefined,
data: InputData
): Promise<ImportResponse> {
let createdIndex;
let createdPipelineId;
const docCount = data.length;

try {
const { id: pipelineId, pipeline } = ingestPipeline;
const pipelineId = ingestPipeline?.id;
const pipeline = ingestPipeline?.pipeline;

if (id === undefined) {
// first chunk of data, create the index and id to return
Expand All @@ -48,7 +49,6 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
createdPipelineId = pipelineId;
} else {
createdIndex = index;
createdPipelineId = pipelineId;
}

let failures: ImportFailure[] = [];
Expand Down Expand Up @@ -109,7 +109,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
await asCurrentUser.indices.create({ index, body }, { maxRetries: 0 });
}

async function indexData(index: string, pipelineId: string, data: InputData) {
async function indexData(index: string, pipelineId: string | undefined, data: InputData) {
try {
const body = [];
for (let i = 0; i < data.length; i++) {
Expand Down
10 changes: 6 additions & 4 deletions x-pack/plugins/file_upload/server/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ export const importFileBodySchema = schema.object({
/** Mappings */
mappings: schema.any(),
/** Ingest pipeline definition */
ingestPipeline: schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
}),
ingestPipeline: schema.maybe(
schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
})
),
});

export const runtimeMappingsSchema = schema.object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,42 @@ export default function ({ getService }: FtrProviderContext) {
ingestedDocCount: 3,
},
},
{
suiteSuffix: 'with a file which does not generate a ingest pipeline',
filePath: require.resolve('./files_to_import/flights_small.json'),
indexName: 'user-import_4',
createIndexPattern: false,
fieldTypeFilters: [ML_JOB_FIELD_TYPES.KEYWORD],
fieldNameFilters: ['timestamp'],
expected: {
results: {
title: 'flights_small.json',
highlightedText: false,
},
metricFields: [],
nonMetricFields: [
{
fieldName: 'Carrier',
type: ML_JOB_FIELD_TYPES.KEYWORD,
docCountFormatted: '20 (100%)',
exampleCount: 4,
},
{
fieldName: 'timestamp',
type: ML_JOB_FIELD_TYPES.KEYWORD,
docCountFormatted: '20 (100%)',
exampleCount: 11,
},
],
visibleMetricFieldsCount: 0,
totalMetricFieldsCount: 0,
populatedFieldsCount: 3,
totalFieldsCount: 25,
fieldTypeFiltersResultCount: 16,
fieldNameFiltersResultCount: 1,
ingestedDocCount: 20,
},
},
];

const testDataListNegative = [
Expand Down
Loading

0 comments on commit 1291765

Please sign in to comment.