Skip to content

Commit

Permalink
#622 impl(add outlier to solr index)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai authored and djtfmartin committed Mar 3, 2022
1 parent 8366805 commit 35e734b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,11 @@ public static void main(String[] args) throws Exception {

public static void run(DistributionPipelineOptions options) throws Exception {

FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());
String outputPath =
PathBuilder.buildDatasetAttemptPath(options, "interpreted/distribution", false);
if (options.getDatasetId() == null || "all".equalsIgnoreCase(options.getDatasetId())) {
outputPath =
PathBuilder.buildPath(options.getAllDatasetsInputPath(), "distribution").toString();
}
//Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options);

// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);

ALAFsUtils.createDirectory(fs, outputPath);

log.info("Adding step 1: Collecting index records");
Pipeline p = Pipeline.create(options);
Expand All @@ -89,13 +80,7 @@ public static void run(DistributionPipelineOptions options) throws Exception {
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

// kvRecords.apply( "Write to
// file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/interpreted").withoutSharding().withSuffix(".avro"));
kvRecords
.apply("to String", distributionTransform.flatToString())
.apply(
"Write to text",
TextIO.write().to(outputPath + "/interpreted").withoutSharding().withSuffix(".text"));
kvRecords.apply( "Write to file",AvroIO.write(ALADistributionRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro"));

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public interface SolrPipelineOptions extends IndexingPipelineOptions {
@Description("Path to clustering avro files")
@Default.String("/data/pipelines-clustering")
String getClusteringPath();

void setClusteringPath(String clusteringPath);

@Description("Path to outlier avro files")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption
return PathBuilder.buildDatasetAttemptPath(options, "sampling", false);
}

/** Build a path to outlier records. */
public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) throws IOException {
// default: {fsPath}/pipelines-outlier
FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());

String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString();

// {fsPath}/pipelines-outlier/{datasetId}
if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) {
outputPath =
PathBuilder.buildPath(outputPath, options.getDatasetId()).toString();
}
// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);
ALAFsUtils.createDirectory(fs, outputPath);

return outputPath;
}


/**
* NOTE: It will delete the existing folder Build a path to outlier records.
* {fsPath}/pipelines-outlier/{datasetId} {fsPath}/pipelines-outlier/all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"fields":[
{"name": "id", "type": "string", "doc": "Pipelines identifier"},
{"name" : "occurrenceID", "type" : ["null", "string"], "default" : null, "doc" : "Occurrence identifier"},
{"name" : "distanceOutOfEDL", "type" : [ "double"], "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" },
{"name" : "distanceOutOfEDL", "type" : "double", "default" : -1, ",doc" : "Distance to the expert distribution layers. -1: no EDL, 0: in EDL, >0 =out of EDL" },
{"name": "decimalLatitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLatitude"},
{"name": "decimalLongitude","type":["null","double"],"default":null, "doc":"http://rs.tdwg.org/dwc/terms/decimalLongitude"},
{"name": "speciesID","type":"string", "doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "speciesID","type":"string", "default": "","doc":"http://rs.tdwg.org/dwc/terms/taxonConceptID"},
{"name": "issues", "type": "IssueRecord", "default":{}}
]
}

0 comments on commit 35e734b

Please sign in to comment.