Skip to content

Commit

Permalink
#622 fix(scipt and write to /all and /datasetId)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai authored and djtfmartin committed Feb 3, 2022
1 parent b03d6cd commit 321cbf8
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 30 deletions.
4 changes: 4 additions & 0 deletions livingatlas/configs/la-pipelines-local-hdfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ sample-avro:
index:
inputPath: hdfs://localhost:8020/pipelines-data
zkHost: localhost:9983
outlier:
inputPath: hdfs://localhost:8020/pipelines-data
targetPath: hdfs://localhost:8020/pipelines-outlier

10 changes: 10 additions & 0 deletions livingatlas/configs/la-pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ sampling:
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
runner: SparkRunner

# Calculate distance to the expert distribution layers
# class:au.org.ala.pipelines.beam.DistributionOutlierPipeline
outlier:
appName: Expert distribution outliers for {datasetId}
baseUrl: https://spatial.ala.org.au/ws/
targetPath: &outlierTargetPath '{fsPath}/pipelines-outlier'
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
runner: SparkRunner

# class: au.org.ala.pipelines.beam.ALAInterpretedToSensitivePipeline
sensitive:
appName: Sensitive Data Processing for {datasetId}
Expand All @@ -192,6 +201,7 @@ solr:
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
jackKnifePath: "{fsPath}/pipelines-jackknife"
clusteringPath: "{fsPath}/pipelines-clustering"
outlierPath: *outlierTargetPath
solrCollection: biocache
includeSampling: false
includeJackKnife: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
*
* <p>Example: java au.org.ala.pipelines.beam.DistributionOutlierPipeline
* --config=/data/la-pipelines/config/la-pipelines.yaml --fsPath=/data
*
* <p>Running with Jar java
* -Dlog4j.configuration=file://../pipelines/src/main/resources/log4j-colorized.properties
* -Dlog4j.configurationFile=file://../pipelines/src/main/resources/log4j-colorized.properties -cp
* ../pipelines/target/pipelines-2.10.0-SNAPSHOT-shaded.jar
* au.org.ala.pipelines.beam.DistributionOutlierPipeline
* --config=/data/la-pipelines/config/la-pipelines.yaml,la-pipelines-local.yaml --fsPath=/data
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
Expand Down Expand Up @@ -69,13 +76,21 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

log.info("Adding step 3: writing to outliers");

kvRecords.apply(
"Write to file",
AvroIO.write(DistributionOutlierRecord.class)
.to(outputPath + "/outliers")
.to(outputPath + "/outlier")
.withoutSharding()
.withSuffix(".avro"));

// kvRecords
// .apply("to String", distributionTransform.flatToString())
// .apply(
// "Write to text",
// TextIO.write().to(outputPath+"/outlier").withoutSharding().withSuffix(".txt"));

log.info("Running the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,15 @@ public KV<String, Relationships> apply(Relationships input) {

private static PCollection<KV<String, DistributionOutlierRecord>> loadOutlierRecords(
SolrPipelineOptions options, Pipeline p) {
String path = PathBuilder.buildPath(options.getOutlierPath(), "*" + AVRO_EXTENSION).toString();

String dataResourceFolder = options.getDatasetId();
if (dataResourceFolder == null || "all".equalsIgnoreCase(dataResourceFolder)) {
dataResourceFolder = "all";
}

String path =
PathBuilder.buildPath(options.getOutlierPath(), dataResourceFolder, "*" + AVRO_EXTENSION)
.toString();
log.info("Loading outlier from {}", path);

return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption
return PathBuilder.buildDatasetAttemptPath(options, "sampling", false);
}

/** Build a path to outlier records. */
/**
* Build a path to outlier records. {fsPath}/pipelines-outlier/{datasetId}
* {fsPath}/pipelines-outlier/all
*/
public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options)
throws IOException {
// default: {fsPath}/pipelines-outlier
Expand All @@ -84,11 +87,13 @@ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions
// {fsPath}/pipelines-outlier/{datasetId}
if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) {
outputPath = PathBuilder.buildPath(outputPath, options.getDatasetId()).toString();
} else {
// {fsPath}/pipelines-outlier/all
outputPath = PathBuilder.buildPath(outputPath, "all").toString();
}
// delete previous runs
FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath);
ALAFsUtils.createDirectory(fs, outputPath);

return outputPath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,26 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class DistributionOutlierTest {
String spatial_url = "http://devt.ala.org.au:8080/ws/";
// String spatial_url = "https://spatial.ala.org.au/ws/";

public void getLayer() {
DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url);
try {
List<DistributionLayer> layers =
impl.findLayersByLsid(
"urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f9b");
assertSame(1, layers.size());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
// String spatial_url = "http://devt.ala.org.au:8080/ws/";
String spatial_url = "https://spatial-test.ala.org.au/ws/";
String lsidGreyNurseShark =
"urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283";

@Test
public void getMultiLayers() {
DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url);
try {
List<DistributionLayer> layers =
impl.findLayersByLsid(
"urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283");
assertSame(5, layers.size());
List<DistributionLayer> layers = impl.findLayersByLsid(lsidGreyNurseShark);
assertSame(1, layers.size());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}

@Test
public void outliers() {
DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url);
try {
Expand All @@ -48,20 +39,18 @@ public void outliers() {
points.put("2eb7cda9-f248-4e9e-89b7-44db7312e58a", inPoint);

Map outPoint = new HashMap();
outPoint.put("decimalLatitude", -26.1);
outPoint.put("decimalLatitude", 26.1);
outPoint.put("decimalLongitude", 127.5);
points.put("6756a12e-d07c-4fc6-8637-a0036f0b76c9", outPoint);

Map<String, Double> results =
impl.outliers(
"urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f9b",
points);
Map<String, Double> results = impl.outliers(lsidGreyNurseShark, points);
assertSame(1, results.size());
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void multiLayers() {
DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url);
try {
Expand Down
13 changes: 10 additions & 3 deletions livingatlas/scripts/la-pipelines
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Usage:
$CMD [options] sample (<dr>...|all) $WHERE
$CMD [options] clustering (all) $WHERE
$CMD [options] jackknife (all) $WHERE
$CMD [options] outlier (all) $WHERE
$CMD [options] outlier (<dr>...|all) $WHERE
$CMD [options] solr (<dr>...|all) $WHERE
$CMD [options] solr-schema (<dr>...|all) $WHERE
$CMD [options] archive-list
Expand All @@ -135,12 +135,15 @@ Options:
----
$CMD $VER
License Apache-2.0
License Apache-2.0
EOF
)"

# Enable logging

if ($debug) ; then verbosity=6; else verbosity=5; fi
if [[ $PROD = true ]] ; then LIB_DIR=/usr/share/la-pipelines ; else LIB_DIR=. ; fi

source $LIB_DIR/logging_lib.sh $verbosity $no_colors $dr

STEP=Initial
Expand Down Expand Up @@ -682,7 +685,7 @@ function clustering () {
}

function outlier () {
local dr="*" ltype="$TYPE"
local dr="$1" ltype="$TYPE"
CLASS=au.org.ala.pipelines.beam.DistributionOutlierPipeline

logStepStart "Outlier" $dr
Expand All @@ -695,7 +698,7 @@ function outlier () {
spark-embed-pipeline $ltype $SH_ARGS $CLASS $dr

SH_ARGS=outlier-sh-args.spark-cluster
spark-cluster-pipeline $ltype $SH_ARGS $CLASS $dr "add-jackknife $dr"
spark-cluster-pipeline $ltype $SH_ARGS $CLASS $dr "add-outlier $dr"

logStepEnd "Outlier" $dr $ltype $SECONDS
}
Expand Down Expand Up @@ -943,6 +946,10 @@ if ($jackknife || $do_all); then
fi
fi

if ($outlier || $do_all); then
do_step_simple outlier
fi

if ($solr || $do_all); then
do_step_simple solr
fi
Expand Down

0 comments on commit 321cbf8

Please sign in to comment.