Skip to content

Commit

Permalink
#622 refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai committed Nov 26, 2021
1 parent 09d3408 commit 28cbab8
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 64 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package au.org.ala.pipelines.beam;

import au.org.ala.pipelines.options.DistributionPipelineOptions;
import au.org.ala.pipelines.options.DistributionOutlierPipelineOptions;
import au.org.ala.pipelines.transforms.DistributionOutlierTransform;
import au.org.ala.pipelines.util.VersionInfo;
import au.org.ala.utils.ALAFsUtils;
Expand Down Expand Up @@ -37,10 +37,10 @@ public class DistributionOutlierPipeline {
public static void main(String[] args) throws Exception {
VersionInfo.print();
CombinedYamlConfiguration conf = new CombinedYamlConfiguration(args);
String[] combinedArgs = conf.toArgs("general", "distribution");
String[] combinedArgs = conf.toArgs("general", "outlier");

DistributionPipelineOptions options =
PipelinesOptionsFactory.create(DistributionPipelineOptions.class, combinedArgs);
DistributionOutlierPipelineOptions options =
PipelinesOptionsFactory.create(DistributionOutlierPipelineOptions.class, combinedArgs);
MDC.put("datasetId", options.getDatasetId());
MDC.put("attempt", options.getAttempt().toString());
MDC.put("step", "DISTRIBUTION");
Expand All @@ -49,14 +49,13 @@ public static void main(String[] args) throws Exception {
System.exit(0);
}

public static void run(DistributionPipelineOptions options) throws Exception {
public static void run(DistributionOutlierPipelineOptions options) throws Exception {

//Create output path
// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options);


log.info("Adding step 1: Collecting index records");
Pipeline p = Pipeline.create(options);

Expand All @@ -75,7 +74,12 @@ public static void run(DistributionPipelineOptions options) throws Exception {
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

kvRecords.apply( "Write to file",AvroIO.write(DistributionOutlierRecord.class).to(outputPath+"/outliers").withoutSharding().withSuffix(".avro"));
kvRecords.apply(
"Write to file",
AvroIO.write(DistributionOutlierRecord.class)
.to(outputPath + "/outliers")
.withoutSharding()
.withSuffix(".avro"));

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class IndexRecordToSolrPipeline {
static final Relationships nullClustering =
Relationships.newBuilder().setId(EMPTY).setRelationships(new ArrayList<>()).build();
static final DistributionOutlierRecord nullOutlier =
DistributionOutlierRecord.newBuilder().setId(EMPTY).build();
DistributionOutlierRecord.newBuilder().setId(EMPTY).build();

public static void main(String[] args) throws Exception {
VersionInfo.print();
Expand Down Expand Up @@ -192,8 +192,6 @@ public KV<String, IndexRecord> apply(KV<String, IndexRecord> input) {
writeToSolr(options, indexRecordsCollection, conn, schemaFields, dynamicFieldPrefixes);
}



log.info("Starting pipeline");
pipeline.run(options).waitUntilFinish();

Expand Down Expand Up @@ -284,26 +282,21 @@ private static PCollection<KV<String, IndexRecord>> addClusteringInfo(
return indexRecordJoinClustering.apply(ParDo.of(addClusteringInfo()));
}


private static PCollection<KV<String, IndexRecord>> addOutlierInfo(
SolrPipelineOptions options,
Pipeline pipeline,
PCollection<KV<String, IndexRecord>> indexRecords) {
SolrPipelineOptions options,
Pipeline pipeline,
PCollection<KV<String, IndexRecord>> indexRecords) {

// Load outlier records, keyed on ID
PCollection<KV<String, DistributionOutlierRecord>> outlierRecords =
loadOutlierRecords(options, pipeline);
PCollection<KV<String, KV<IndexRecord, DistributionOutlierRecord>>>
indexRecordJoinOurlier =
Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier);

loadOutlierRecords(options, pipeline);
PCollection<KV<String, KV<IndexRecord, DistributionOutlierRecord>>> indexRecordJoinOurlier =
Join.leftOuterJoin(indexRecords, outlierRecords, nullOutlier);

// Add Jackknife information
return indexRecordJoinOurlier.apply(ParDo.of(addOutlierInfo()));
}



private static void writeToSolr(
SolrPipelineOptions options,
PCollection<KV<String, IndexRecord>> kvIndexRecords,
Expand Down Expand Up @@ -557,10 +550,12 @@ public void processElement(ProcessContext c) {
};
}

private static DoFn<KV<String, KV<IndexRecord, DistributionOutlierRecord>>, KV<String, IndexRecord>>
addOutlierInfo() {
private static DoFn<
KV<String, KV<IndexRecord, DistributionOutlierRecord>>, KV<String, IndexRecord>>
addOutlierInfo() {

return new DoFn<KV<String, KV<IndexRecord, DistributionOutlierRecord>>, KV<String, IndexRecord>>() {
return new DoFn<
KV<String, KV<IndexRecord, DistributionOutlierRecord>>, KV<String, IndexRecord>>() {
@ProcessElement
public void processElement(ProcessContext c) {

Expand All @@ -578,7 +573,6 @@ public void processElement(ProcessContext c) {
};
}


/** Load index records from AVRO. */
private static PCollection<KV<String, IndexRecord>> loadIndexRecords(
SolrPipelineOptions options, Pipeline p) {
Expand Down Expand Up @@ -647,21 +641,20 @@ public KV<String, Relationships> apply(Relationships input) {
}

private static PCollection<KV<String, DistributionOutlierRecord>> loadOutlierRecords(
SolrPipelineOptions options, Pipeline p) {
String path =
PathBuilder.buildPath(
options.getAllDatasetsInputPath() + "/distribution/", "distribution*" + AVRO_EXTENSION)
.toString();
SolrPipelineOptions options, Pipeline p) {
String path = PathBuilder.buildPath(options.getOutlierPath(), "*" + AVRO_EXTENSION).toString();
log.info("Loading outlier from {}", path);

return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path))
.apply(
MapElements.via(
new SimpleFunction<DistributionOutlierRecord, KV<String, DistributionOutlierRecord>>() {
@Override
public KV<String, DistributionOutlierRecord> apply(DistributionOutlierRecord input) {
return KV.of(input.getId(), input);
}
}));
.apply(
MapElements.via(
new SimpleFunction<
DistributionOutlierRecord, KV<String, DistributionOutlierRecord>>() {
@Override
public KV<String, DistributionOutlierRecord> apply(
DistributionOutlierRecord input) {
return KV.of(input.getId(), input);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,11 @@
/**
* Options for pipelines that run against spatial service for Expert distribution layer calculation.
*/
public interface DistributionPipelineOptions extends AllDatasetsPipelinesOptions {
public interface DistributionOutlierPipelineOptions extends AllDatasetsPipelinesOptions {

@Description("Base URL for Spatial service")
@Default.String("https://spatial.ala.org.au/ws/")
String getBaseUrl();

void setBaseUrl(String baseUrl);

@Description("Default batch size")
@Default.Integer(25000)
Integer getBatchSize();

void setBatchSize(Integer batchSize);

@Description("Keep download sampling CSVs")
@Default.Integer(1000)
Integer getBatchStatusSleepTime();

void setBatchStatusSleepTime(Integer batchStatusSleepTime);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,15 @@ public interface SolrPipelineOptions extends IndexingPipelineOptions {
@Description("Path to clustering avro files")
@Default.String("/data/pipelines-clustering")
String getClusteringPath();

void setClusteringPath(String clusteringPath);

@Description("Path to outlier avro files")
@Default.String("/data/pipelines-outlier")
String getOutlierPath();

void setOutlierPath(String outlierPath);

@Description("Number of partitions to use")
@Default.Integer(1)
Integer getNumOfPartitions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import org.gbif.pipelines.io.avro.*;
import org.gbif.pipelines.transforms.Transform;

public class DistributionOutlierTransform extends Transform<IndexRecord, DistributionOutlierRecord> {
public class DistributionOutlierTransform
extends Transform<IndexRecord, DistributionOutlierRecord> {

private String spatialUrl;

Expand Down Expand Up @@ -56,9 +57,11 @@ public MapElements<IndexRecord, KV<String, IndexRecord>> toKv() {
public MapElements<KV<String, Iterable<IndexRecord>>, Iterable<DistributionOutlierRecord>>
calculateOutlier() {
return MapElements.via(
(new SimpleFunction<KV<String, Iterable<IndexRecord>>, Iterable<DistributionOutlierRecord>>() {
(new SimpleFunction<
KV<String, Iterable<IndexRecord>>, Iterable<DistributionOutlierRecord>>() {
@Override
public Iterable<DistributionOutlierRecord> apply(KV<String, Iterable<IndexRecord>> input) {
public Iterable<DistributionOutlierRecord> apply(
KV<String, Iterable<IndexRecord>> input) {
String lsid = input.getKey();
Iterable<IndexRecord> records = input.getValue();
Iterator<IndexRecord> iter = records.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,18 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption
}

/** Build a path to outlier records. */
public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) throws IOException {
public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options)
throws IOException {
// default: {fsPath}/pipelines-outlier
FileSystem fs =
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());
FileSystemFactory.getInstance(options.getHdfsSiteConfig(), options.getCoreSiteConfig())
.getFs(options.getTargetPath());

String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString();
String outputPath = PathBuilder.buildPath(options.getTargetPath()).toString();

// {fsPath}/pipelines-outlier/{datasetId}
if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) {
outputPath =
PathBuilder.buildPath(outputPath, options.getDatasetId()).toString();
outputPath = PathBuilder.buildPath(outputPath, options.getDatasetId()).toString();
}
// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);
Expand All @@ -92,7 +92,6 @@ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions
return outputPath;
}


/**
* Removes a directory with content if the folder exists
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package au.org.ala.distribution;
package au.org.ala.outlier;

import static org.junit.Assert.*;

import au.org.ala.distribution.DistributionLayer;
import au.org.ala.distribution.DistributionServiceImpl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -11,7 +13,7 @@
* Tests ported from
* https://github.com/AtlasOfLivingAustralia/biocache-store/blob/master/src/test/scala/au/org/ala/biocache/DistanceRangeParserTest.scala
*/
public class DistributionTest {
public class DistributionOutlierTest {
String spatial_url = "http://devt.ala.org.au:8080/ws/";
// String spatial_url = "https://spatial.ala.org.au/ws/";

Expand Down

0 comments on commit 28cbab8

Please sign in to comment.