Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Fix file upload with no ingest pipeline #193744

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ interface Config {
mappingsString: string;
pipelineString: string;
pipelineId: string | null;
createPipeline: boolean;
}

export async function importData(props: Props, config: Config, setState: (state: unknown) => void) {
Expand All @@ -41,6 +42,7 @@ export async function importData(props: Props, config: Config, setState: (state:
mappingsString,
pipelineString,
pipelineId,
createPipeline,
} = config;
const { format } = results;

Expand Down Expand Up @@ -86,7 +88,7 @@ export async function importData(props: Props, config: Config, setState: (state:

let settings = {};
let mappings = {};
let pipeline = {};
let pipeline;

try {
settings = JSON.parse(indexSettingsString);
Expand All @@ -109,7 +111,9 @@ export async function importData(props: Props, config: Config, setState: (state:
}

try {
pipeline = JSON.parse(pipelineString);
if (createPipeline) {
pipeline = JSON.parse(pipelineString) as IngestPipeline;
}
} catch (error) {
success = false;
const parseError = i18n.translate('xpack.dataVisualizer.file.importView.parsePipelineError', {
Expand Down Expand Up @@ -143,12 +147,7 @@ export async function importData(props: Props, config: Config, setState: (state:
return;
}

const initializeImportResp = await importer.initializeImport(
index,
settings,
mappings,
pipeline as IngestPipeline
);
const initializeImportResp = await importer.initializeImport(index, settings, mappings, pipeline);

const timeFieldName = importer.getTimeField();
setState({ timeFieldName });
Expand All @@ -158,14 +157,20 @@ export async function importData(props: Props, config: Config, setState: (state:
indexCreatedStatus: getSuccess(indexCreated),
});

const pipelineCreated = initializeImportResp.pipelineId !== undefined;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
});
if (createPipeline) {
const pipelineCreated = initializeImportResp.pipelineId !== undefined;
if (indexCreated) {
setState({
ingestPipelineCreatedStatus: pipelineCreated
? IMPORT_STATUS.COMPLETE
: IMPORT_STATUS.FAILED,
pipelineId: pipelineCreated ? initializeImportResp.pipelineId : '',
});
}
success = indexCreated && pipelineCreated;
} else {
success = indexCreated;
}
success = indexCreated && pipelineCreated;

if (success === false) {
errors.push(initializeImportResp.error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ export class ImportView extends Component {
pipelineId,
} = this.state;

const createPipeline = pipelineString !== '';
this.setState({
createPipeline,
});

importData(
{ data, results, dataViewsContract, fileUpload },
{
Expand All @@ -119,6 +124,7 @@ export class ImportView extends Component {
mappingsString,
pipelineString,
pipelineId,
createPipeline,
},
(state) => this.setState(state)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,12 @@ export class AbstractGeoFileImporter extends Importer implements GeoFileImporter
data: chunks[i],
settings: {},
mappings: {},
ingestPipeline: {
id: pipelineId,
},
ingestPipeline:
pipelineId !== undefined
? {
id: pipelineId,
}
: undefined,
});

if (!this._isActive) {
Expand Down
54 changes: 29 additions & 25 deletions x-pack/plugins/file_upload/public/importer/importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ import { i18n } from '@kbn/i18n';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { getHttp } from '../kibana_services';
import { MB } from '../../common/constants';
import type { ImportDoc, ImportFailure, ImportResponse, IngestPipeline } from '../../common/types';
import type {
ImportDoc,
ImportFailure,
ImportResponse,
IngestPipeline,
IngestPipelineWrapper,
} from '../../common/types';
import { CreateDocsResponse, IImporter, ImportResults } from './types';

const CHUNK_SIZE = 5000;
Expand Down Expand Up @@ -79,26 +85,25 @@ export abstract class Importer implements IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline
pipeline: IngestPipeline | undefined
) {
updatePipelineTimezone(pipeline);

if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
let ingestPipeline: IngestPipelineWrapper | undefined;
if (pipeline !== undefined) {
updatePipelineTimezone(pipeline);

if (pipelineContainsSpecialProcessors(pipeline)) {
// pipeline contains processors which we know are slow
// so reduce the chunk size significantly to avoid timeouts
this._chunkSize = REDUCED_CHUNK_SIZE;
}
// if no pipeline has been supplied,
// send an empty object
ingestPipeline = {
id: `${index}-pipeline`,
pipeline,
};
}

// if no pipeline has been supplied,
// send an empty object
const ingestPipeline =
pipeline !== undefined
? {
id: `${index}-pipeline`,
pipeline,
}
: {};

this._index = index;
this._pipeline = pipeline;

Expand Down Expand Up @@ -139,9 +144,11 @@ export abstract class Importer implements IImporter {

const chunks = createDocumentChunks(this._docArray, this._chunkSize);

const ingestPipeline = {
id: pipelineId,
};
const ingestPipeline: IngestPipelineWrapper | undefined = pipelineId
? {
id: pipelineId,
}
: undefined;

let success = true;
const failures: ImportFailure[] = [];
Expand Down Expand Up @@ -345,10 +352,7 @@ export function callImportRoute({
data: ImportDoc[];
settings: IndicesIndexSettings;
mappings: MappingTypeMapping;
ingestPipeline: {
id?: string;
pipeline?: IngestPipeline;
};
ingestPipeline: IngestPipelineWrapper | undefined;
}) {
const query = id !== undefined ? { id } : {};
const body = JSON.stringify({
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/file_upload/public/importer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export interface IImporter {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
pipeline: IngestPipeline
pipeline: IngestPipeline | undefined
): Promise<ImportResponse>;
import(
id: string,
Expand Down
8 changes: 4 additions & 4 deletions x-pack/plugins/file_upload/server/import_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
index: string,
settings: IndicesIndexSettings,
mappings: MappingTypeMapping,
ingestPipeline: IngestPipelineWrapper,
ingestPipeline: IngestPipelineWrapper | undefined,
data: InputData
): Promise<ImportResponse> {
let createdIndex;
let createdPipelineId;
const docCount = data.length;

try {
const { id: pipelineId, pipeline } = ingestPipeline;
const pipelineId = ingestPipeline?.id;
const pipeline = ingestPipeline?.pipeline;

if (id === undefined) {
// first chunk of data, create the index and id to return
Expand All @@ -48,7 +49,6 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
createdPipelineId = pipelineId;
} else {
createdIndex = index;
createdPipelineId = pipelineId;
}

let failures: ImportFailure[] = [];
Expand Down Expand Up @@ -109,7 +109,7 @@ export function importDataProvider({ asCurrentUser }: IScopedClusterClient) {
await asCurrentUser.indices.create({ index, body }, { maxRetries: 0 });
}

async function indexData(index: string, pipelineId: string, data: InputData) {
async function indexData(index: string, pipelineId: string | undefined, data: InputData) {
try {
const body = [];
for (let i = 0; i < data.length; i++) {
Expand Down
10 changes: 6 additions & 4 deletions x-pack/plugins/file_upload/server/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ export const importFileBodySchema = schema.object({
/** Mappings */
mappings: schema.any(),
/** Ingest pipeline definition */
ingestPipeline: schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
}),
ingestPipeline: schema.maybe(
schema.object({
id: schema.maybe(schema.string()),
pipeline: schema.maybe(schema.any()),
})
),
});

export const runtimeMappingsSchema = schema.object(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,42 @@ export default function ({ getService }: FtrProviderContext) {
ingestedDocCount: 3,
},
},
{
suiteSuffix: 'with a file which does not generate a ingest pipeline',
filePath: require.resolve('./files_to_import/flights_small.json'),
indexName: 'user-import_4',
createIndexPattern: false,
fieldTypeFilters: [ML_JOB_FIELD_TYPES.KEYWORD],
fieldNameFilters: ['timestamp'],
expected: {
results: {
title: 'flights_small.json',
highlightedText: false,
},
metricFields: [],
nonMetricFields: [
{
fieldName: 'Carrier',
type: ML_JOB_FIELD_TYPES.KEYWORD,
docCountFormatted: '20 (100%)',
exampleCount: 4,
},
{
fieldName: 'timestamp',
type: ML_JOB_FIELD_TYPES.KEYWORD,
docCountFormatted: '20 (100%)',
exampleCount: 11,
},
],
visibleMetricFieldsCount: 0,
totalMetricFieldsCount: 0,
populatedFieldsCount: 3,
totalFieldsCount: 25,
fieldTypeFiltersResultCount: 16,
fieldNameFiltersResultCount: 1,
ingestedDocCount: 20,
},
},
];

const testDataListNegative = [
Expand Down
Loading