Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1438 - optionally gather fields after ingest exception #1439

Open
wants to merge 32 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
45df65f
Fix #1438 - optionally gather fields after ingest exception
Feb 24, 2022
8638428
Fix #1438: remove getFields method after researching extensions
Feb 28, 2022
5c7359d
Fix #1438: consolidate FieldHarvester methods and adjust tests
Feb 28, 2022
539b3a2
Fix #1438: preserve original exception instead of wrapping it
Feb 28, 2022
9f09371
Fix #1438: adjust Mapper test to reflect supplemental fields addition…
Feb 28, 2022
a4015f9
Fix #1438: initial PR feedback
Mar 14, 2022
e17f5b3
Fix #1438: change interface as per PR feedback
Mar 14, 2022
d0389ab
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
70d7d1d
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
342c896
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
bf987e3
Branch was auto-updated on change of target.
github-actions[bot] Apr 28, 2022
64cd680
Branch was auto-updated on change of target.
github-actions[bot] Apr 28, 2022
1e1fe86
Branch was auto-updated on change of target.
datawave-bot-builder Apr 28, 2022
ea08a5b
Branch was auto-updated on change of target.
datawave-bot-builder May 2, 2022
09931d7
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
d426b49
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
cabe5f3
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
d02de12
Branch was auto-updated on change of target.
datawave-bot-builder May 10, 2022
99d7027
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
6be4d92
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
edff00d
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
3c76aa1
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
c37c3b0
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
3a4f54b
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
94bac43
Updating for current integration
mineralntl Nov 13, 2024
88b6595
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 13, 2024
dca9a10
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 18, 2024
7c6561d
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 20, 2024
044ccb1
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 20, 2024
54bd847
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 22, 2024
e38ae66
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 25, 2024
068ab7a
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package datawave.ingest.mapreduce;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import datawave.data.normalizer.DateNormalizer;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.NormalizedFieldAndValue;
import datawave.ingest.data.config.ingest.CompositeIngest;
import datawave.ingest.data.config.ingest.FilterIngest;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.data.config.ingest.VirtualIngest;
import datawave.ingest.time.Now;
import datawave.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;

import java.util.Date;
import java.util.Map;

/**
* Encapsulates the logic for extracting fields from a record, making use of a provided IngestHelperInterface. Generates virtual fields, composite fields, and
* supplemental fields (like LOAD_DATE, ORIG_FILE, and RAW_FILE). Some logic for handling errors is also included here: extracting salvagable fields if any
* exception occurs, and detecting if there were field errors (indicating a normalization failure).
*/
public class FieldHarvester {
private static final Logger log = Logger.getLogger(FieldHarvester.class);

private static Now now = Now.getInstance();

public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE";
public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE";
public static final String RAW_FILE_FIELDNAME = "RAW_FILE";
public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename";
public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename";
public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename";

private boolean createSequenceFileName;
private boolean trimSequenceFileName;
private boolean createRawFileName;
private final DateNormalizer dateNormalizer = new DateNormalizer();

private static final String SRC_FILE_DEL = "|";
private Exception originalException;

public FieldHarvester(Configuration configuration) {
this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true);
this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true);
this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true);
}

/**
* Updates "fields" with extracted, derived, and automatically generated fields. Will capture
matthpeterson marked this conversation as resolved.
Show resolved Hide resolved
*
* @param fields
* the Multimap to modify with extracted and generated fields
* @param ingestHelper
* interface to use for field extraction
* @param value
* the record from which the fields will be extracted
* @param offset
* record offset within the source file
* @param splitStart
* the splitStart for the record
*/
public void extractFields(Multimap<String,NormalizedContentInterface> fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset,
String splitStart) throws Exception {
// reset exception-in-extraction tracking
this.originalException = null;

// "candidateFields" holds the fields that will eventually be added to "fields"
Multimap<String,NormalizedContentInterface> candidateFields;

try {
// parse the record into its candidate field names and values using the IngestHelperInterface.
candidateFields = ingestHelper.getEventFields(value);
} catch (Exception exception) {
// delay throwing the exception to attempt salvaging
this.originalException = exception;
candidateFields = attemptToSalvageFields(value, ingestHelper);
}

try {
// try adding supplemental fields to candidateFields, whether or not they were salvaged
addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields);
} catch (Exception exception) {
if (null == this.originalException) {
this.originalException = exception;
} else {
// preserve original exception and log the latest exception
log.error(exception);
matthpeterson marked this conversation as resolved.
Show resolved Hide resolved
}
}

// add candidateFields to fields, even if there was an error
// identify if any individual fields contain an error
addFieldsAndDetectFieldErrors(fields, candidateFields);

if (null != this.originalException) {
log.error("Rethrowing original exception after completing field extraction.");
throw originalException;
}
}

@VisibleForTesting
boolean hasError() {
return null != this.originalException;
}

@VisibleForTesting
Exception getOriginalException() {
return this.originalException;
}

/**
* If IngestHelper implements FieldSalvager, get the salvageable fields from value. Otherwise, return an empty Multimap.
*/
private Multimap<String,NormalizedContentInterface> attemptToSalvageFields(RawRecordContainer value, IngestHelperInterface ingestHelper) {
// If this helper is able, attempt to salvage a subset of the fields
if (null != ingestHelper && ingestHelper instanceof FieldSalvager) {
FieldSalvager salvager = (FieldSalvager) ingestHelper;
try {
Multimap<String,NormalizedContentInterface> salvagedFields = salvager.getSalvageableEventFields(value);
matthpeterson marked this conversation as resolved.
Show resolved Hide resolved
if (null != salvagedFields) {
return salvagedFields;
}
} catch (Exception salvagerException) {
// Do not overwrite the original exception
if (null == this.originalException) {
this.originalException = new IllegalStateException("Unexpected state (FieldExpander.exception should be non-null if salvaging",
salvagerException);
} else {
// allow original exception (this.exception) to be thrown by caller
log.error("Even salvager threw an exception", salvagerException);
}
}
}
return HashMultimap.create();
}

private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper,
Multimap<String,NormalizedContentInterface> fields) {
addVirtualFields(ingestHelper, fields);
addCompositeFields(ingestHelper, fields);
addLoadDateField(fields);
addFileNameFields(value, offset, splitStart, fields);
applyFieldFilters(ingestHelper, fields);
}

private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the virtual fields, if applicable.
if (null != newFields && ingestHelper instanceof VirtualIngest) {
VirtualIngest vHelper = (VirtualIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> virtualFields = vHelper.getVirtualFields(newFields);
for (Map.Entry<String,NormalizedContentInterface> v : virtualFields.entries())
newFields.put(v.getKey(), v.getValue());
}
}

private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the composite fields, if applicable
if (null != newFields && ingestHelper instanceof CompositeIngest) {
CompositeIngest vHelper = (CompositeIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> compositeFields = vHelper.getCompositeFields(newFields);
for (String fieldName : compositeFields.keySet()) {
// if this is an overloaded composite field, we are replacing the existing field data
if (vHelper.isOverloadedCompositeField(fieldName)) {
newFields.removeAll(fieldName);
}
newFields.putAll(fieldName, compositeFields.get(fieldName));
}
}
}

private void addLoadDateField(Multimap<String,NormalizedContentInterface> newFields) {
// Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes
long loadDate = now.get();
NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate));
// set an indexed field value for use by the date index data type handler
loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate)));
newFields.put(LOAD_DATE_FIELDNAME, loadDateValue);
}

private void addRawFileField(RawRecordContainer value, Multimap<String,NormalizedContentInterface> newFields, String seqFileName) {
if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) {
newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName()));
}
}

private void addOrigFileField(Multimap<String,NormalizedContentInterface> newFields, long offset, String splitStart, String seqFileName) {
if (null != seqFileName) {
StringBuilder seqFile = new StringBuilder(seqFileName);

seqFile.append(SRC_FILE_DEL).append(offset);

if (null != splitStart) {
seqFile.append(SRC_FILE_DEL).append(splitStart);
}

newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString()));
}
}

private String getSeqFileName() {
String seqFileName;
seqFileName = NDC.peek();

if (trimSequenceFileName) {
seqFileName = StringUtils.substringAfterLast(seqFileName, "/");
}
return seqFileName;
}

private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap<String,NormalizedContentInterface> newFields) {
String seqFileName = null;

if (createSequenceFileName) {
seqFileName = getSeqFileName();

// place the sequence filename into the event
addOrigFileField(newFields, offset, splitStart, seqFileName);
}

addRawFileField(value, newFields, seqFileName);
}

private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also if this helper needs to filter the fields before returning, apply now
if (ingestHelper instanceof FilterIngest) {
FilterIngest fHelper = (FilterIngest) ingestHelper;
fHelper.filter(newFields);
}
}

/**
* Adds candidateFields to fields. Looks at each of the candidate fields, inspection for field errors. Sets the field harvester's exception field if any
* field errors were found.
*/
private void addFieldsAndDetectFieldErrors(Multimap<String,NormalizedContentInterface> fields, Multimap<String,NormalizedContentInterface> candidateFields) {
if (null == candidateFields) {
return;
}
Throwable fieldError = null;
for (Map.Entry<String,NormalizedContentInterface> entry : candidateFields.entries()) {
// noinspection ThrowableResultOfMethodCallIgnored
if (null != entry.getValue().getError()) {
fieldError = entry.getValue().getError();
}
fields.put(entry.getKey(), entry.getValue());
}
if (null != fieldError) {
if (null == this.originalException) {
this.originalException = new FieldNormalizationError("Failed getting all fields", fieldError);
} else {
// preserve original exception
log.error(originalException);
matthpeterson marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

public static class FieldNormalizationError extends Exception {

private static final long serialVersionUID = 1L;

public FieldNormalizationError(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package datawave.ingest.mapreduce;

import com.google.common.collect.Multimap;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.config.NormalizedContentInterface;

/**
* This optional interface is intended to complement the IngestHelperInterface interface's handling of errors that occur within ingest jobs.
*
* One use case is when IngestHelperInterface's getEventFields throws an exception. The getEventFields method will not return a Multimap of field values
* (because it instead threw an exception). Prior to FieldSalvager, this meant that the error tables would not have information on any of the
* RawRecordContainer's field values.
*
* FieldSalvager implementations can attempt to provide a subset of the field values, so that the error tables can have more helpful information about the
* failed record, perhaps aiding troubleshooting efforts. An implementation could return only those field names that are relatively well-structured and
* predictably formatted, very unlikely to cause exceptions while processing.
*/
public interface FieldSalvager {
/**
* @param rawRecordContainer
* @return Multimap containing subset of field values, possibly empty but not null
*/
Multimap<String,NormalizedContentInterface> getSalvageableEventFields(RawRecordContainer rawRecordContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import datawave.ingest.data.config.GroupedNormalizedContentInterface;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.mapreduce.EventMapper;
import datawave.ingest.mapreduce.FieldHarvester;
import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition;
Expand Down Expand Up @@ -500,7 +500,7 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
}

// Get the load date of the event from the fields map
Collection<NormalizedContentInterface> loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME);
Collection<NormalizedContentInterface> loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME);
if (!loadDates.isEmpty()) {
NormalizedContentInterface nci = loadDates.iterator().next();
Date date = new Date(Long.parseLong(nci.getEventFieldValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datawave.ingest.config.RawRecordContainerImpl;
import datawave.ingest.data.config.MarkingsHelper;

import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -53,8 +54,7 @@ public void setUp() throws Exception {
conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName());
conf.set("samplecsv.reader.class", TestCSVReader.class.getName());
conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
dataType = TypeRegistry.getType("samplecsv");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package datawave.ingest.data.config;

import datawave.ingest.data.TypeRegistry;

import datawave.policy.IngestPolicyEnforcer;
import datawave.util.TypeRegistryTestSetup;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -25,8 +24,7 @@ public void setup() {
@Test(expected = IllegalArgumentException.class)
public void testInvalidConfig() {
DataTypeHelperImpl helper = new DataTypeHelperImpl();
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
helper.setup(conf);
}

Expand All @@ -36,8 +34,7 @@ public void testValidConfig() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
Assert.assertThat(conf.get("data.name"), is("fake"));
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand All @@ -54,8 +51,7 @@ public void testDowncaseFields() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface {
private Map<String,String> _markings;
private Throwable _error;

protected NonGroupedInstance() {
public NonGroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down Expand Up @@ -141,7 +141,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface
private String _group;
private String _subGroup;

protected GroupedInstance() {
public GroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down
Loading