Skip to content

Commit

Permalink
#622 impl(add outlier to solr index)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai authored and djtfmartin committed Jan 31, 2022
1 parent 586ce12 commit a4f7600
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,11 @@ public static void main(String[] args) throws Exception {

public static void run(DistributionPipelineOptions options) throws Exception {

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());
String outputPath =
PathBuilder.buildDatasetAttemptPath(options, "interpreted/distribution", false);
if (options.getDatasetId() == null || "all".equalsIgnoreCase(options.getDatasetId())) {
outputPath =
PathBuilder.buildPath(options.getAllDatasetsInputPath(), "distribution").toString();
}
//Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options);

// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);

ALAFsUtils.createDirectory(fs, outputPath);

log.info("Adding step 1: Collecting index records");
Pipeline p = Pipeline.create(options);
Expand All @@ -89,13 +80,7 @@ public static void run(DistributionPipelineOptions options) throws Exception {
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

// kvRecords.apply( "Write to
// file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/interpreted").withoutSharding().withSuffix(".avro"));
kvRecords
.apply("to String", distributionTransform.flatToString())
.apply(
"Write to text",
TextIO.write().to(outputPath + "/interpreted").withoutSharding().withSuffix(".text"));
kvRecords.apply( "Write to file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro"));

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class IndexRecordToSolrPipeline {
JackKnifeOutlierRecord.newBuilder().setId(EMPTY).setItems(new ArrayList<>()).build();
static final Relationships nullClustering =
Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build();
static final ALADistributionRecord nullOutlier =
ALADistributionRecord.newBuilder().setId(EMPTY).build();

public static void main(String[] args) throws Exception {
VersionInfo.print();
Expand Down Expand Up @@ -96,6 +98,13 @@ public static void run(SolrPipelineOptions options) {
log.info("Skipping adding clustering to the index");
}

if (options.getIncludeOutlier()) {
log.info("Adding outlier to the index");
indexRecordsCollection = addOutlierInfo(options, pipeline, indexRecordsCollection);
} else {
log.info("Skipping adding outlier to the index");
}

PCollection<KV<String, IndexRecord>> recordsWithoutCoordinates = null;

if (options.getIncludeSampling()) {
Expand Down Expand Up @@ -183,6 +192,8 @@ public KV<String, IndexRecord> apply(KV<String, IndexRecord> input) {
writeToSolr(options, indexRecordsCollection, conn, schemaFields, dynamicFieldPrefixes);
}



log.info("Starting pipeline");
pipeline.run(options).waitUntilFinish();

Expand Down Expand Up @@ -273,6 +284,26 @@ private static PCollection<KV<String, IndexRecord>> addClusteringInfo(
return indexRecordJoinClustering.apply(ParDo.of(addClusteringInfo()));
}


private static PCollection<KV<String, IndexRecord>> addOutlierInfo(
SolrPipelineOptions options,
Pipeline pipeline,
PCollection<KV<String, IndexRecord>> indexRecords) {

// Load outlier records, keyed on ID
PCollection<KV<String, ALADistributionRecord>> outlierRecords =
loadOutlierRecords(options, pipeline);
PCollection<KV<String, KV<IndexRecord, ALADistributionRecord>>>
indexRecordJoinOurlier =
Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier);


// Add Jackknife information
return indexRecordJoinOurlier.apply(ParDo.of(addOutlierInfo()));
}



private static void writeToSolr(
SolrPipelineOptions options,
PCollection<KV<String, IndexRecord>> kvIndexRecords,
Expand Down Expand Up @@ -526,6 +557,28 @@ public void processElement(ProcessContext c) {
};
}

private static DoFn<KV<String, KV<IndexRecord, ALADistributionRecord>>, KV<String, IndexRecord>>
addOutlierInfo() {

return new DoFn<KV<String, KV<IndexRecord, ALADistributionRecord>>, KV<String, IndexRecord>>() {
@ProcessElement
public void processElement(ProcessContext c) {

KV<String, KV<IndexRecord, ALADistributionRecord>> e = c.element();

IndexRecord indexRecord = e.getValue().getKey();
String id = indexRecord.getId();

ALADistributionRecord outlierRecord = e.getValue().getValue();

indexRecord.getDoubles().put(DISTANCE_TO_EDL, outlierRecord.getDistanceOutOfEDL());

c.output(KV.of(id, indexRecord));
}
};
}


/** Load index records from AVRO. */
private static PCollection<KV<String, IndexRecord>> loadIndexRecords(
SolrPipelineOptions options, Pipeline p) {
Expand Down Expand Up @@ -592,4 +645,23 @@ public KV<String, Relationships> apply(Relationships input) {
}
}));
}

private static PCollection<KV<String, ALADistributionRecord>> loadOutlierRecords(
SolrPipelineOptions options, Pipeline p) {
String path =
PathBuilder.buildPath(
options.getAllDatasetsInputPath() + "/distribution/", "distribution*" + AVRO_EXTENSION)
.toString();
log.info("Loading outlier from {}", path);

return p.apply(AvroIO.read(ALADistributionRecord.class).from(path))
.apply(
MapElements.via(
new SimpleFunction<ALADistributionRecord, KV<String, ALADistributionRecord>>() {
@Override
public KV<String, ALADistributionRecord> apply(ALADistributionRecord input) {
return KV.of(input.getId(), input);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ public interface SolrPipelineOptions extends IndexingPipelineOptions {

void setIncludeClustering(Boolean includeClustering);

@Description("Include distance to expert distribution layers")
@Default.Boolean(false)
Boolean getIncludeOutlier();

void setIncludeOutlier(Boolean includeOutlier);

@Description("Path to clustering avro files")
@Default.String("/data/pipelines-clustering")
String getClusteringPath();

void setClusteringPath(String clusteringPath);

@Description("Number of partitions to use")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public interface IndexFields {
String DATE_PRECISION = "datePrecision";
String DECADE = "decade";
String DEFAULT_VALUES_USED = "defaultValuesUsed";
String DISTANCE_TO_EDL = "distanceToEDL";
String DUPLICATE_JUSTIFICATION = "duplicateJustification";
String DUPLICATE_STATUS = "duplicateStatus";
String DUPLICATE_TYPE = "duplicateType";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption
return PathBuilder.buildDatasetAttemptPath(options, "sampling", false);
}

/** Build a path to outlier records. */
public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) throws IOException {
// default: {fsPath}/pipelines-outlier
FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());

String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString();

// {fsPath}/pipelines-outlier/{datasetId}
if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) {
outputPath =
PathBuilder.buildPath(outputPath, options.getDatasetId()).toString();
}
// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);
ALAFsUtils.createDirectory(fs, outputPath);

return outputPath;
}


/**
* Removes a directory with content if the folder exists
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"fields":[
{"name": "id", "type": "string", "doc": "Pipelines identifier"},
{"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"},
{"name" : "distanceOutOfEDL", "type" : [ "double"], "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" },
{"name" : "distanceOutOfEDL", "type" : "double", "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" },
{"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"},
{"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"},
{"name": "speciesID","type":"string", "doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "speciesID","type":"string", "default": "","doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "issues", "type": "IssueRecord", "default":{}}
]
}

0 comments on commit a4f7600

Please sign in to comment.