From 321cbf8c04dc38305b71556850b7c8cd2eef6530 Mon Sep 17 00:00:00 2001 From: Qifeng Date: Fri, 10 Dec 2021 14:56:35 +1100 Subject: [PATCH] #622 fix(scipt and write to /all and /datasetId) --- .../configs/la-pipelines-local-hdfs.yaml | 4 +++ livingatlas/configs/la-pipelines.yaml | 10 ++++++ .../beam/DistributionOutlierPipeline.java | 17 ++++++++- .../beam/IndexRecordToSolrPipeline.java | 10 +++++- .../java/au/org/ala/utils/ALAFsUtils.java | 9 +++-- .../ala/outlier/DistributionOutlierTest.java | 35 +++++++------------ livingatlas/scripts/la-pipelines | 13 +++++-- 7 files changed, 68 insertions(+), 30 deletions(-) diff --git a/livingatlas/configs/la-pipelines-local-hdfs.yaml b/livingatlas/configs/la-pipelines-local-hdfs.yaml index 0a7d4e7f76..d86b69926f 100644 --- a/livingatlas/configs/la-pipelines-local-hdfs.yaml +++ b/livingatlas/configs/la-pipelines-local-hdfs.yaml @@ -66,3 +66,7 @@ sample-avro: index: inputPath: hdfs://localhost:8020/pipelines-data zkHost: localhost:9983 +outlier: + inputPath: hdfs://localhost:8020/pipelines-data + targetPath: hdfs://localhost:8020/pipelines-outlier + diff --git a/livingatlas/configs/la-pipelines.yaml b/livingatlas/configs/la-pipelines.yaml index 4e42da5e47..d7b96579de 100644 --- a/livingatlas/configs/la-pipelines.yaml +++ b/livingatlas/configs/la-pipelines.yaml @@ -167,6 +167,15 @@ sampling: allDatasetsInputPath: '{fsPath}/pipelines-all-datasets' runner: SparkRunner +# Calculate distance to the expert distribution layers +# class:au.org.ala.pipelines.beam.DistributionOutlierPipeline +outlier: + appName: Expert distribution outliers for {datasetId} + baseUrl: https://spatial.ala.org.au/ws/ + targetPath: &outlierTargetPath '{fsPath}/pipelines-outlier' + allDatasetsInputPath: '{fsPath}/pipelines-all-datasets' + runner: SparkRunner + # class: au.org.ala.pipelines.beam.ALAInterpretedToSensitivePipeline sensitive: appName: Sensitive Data Processing for {datasetId} @@ -192,6 +201,7 @@ solr: allDatasetsInputPath: '{fsPath}/pipelines-all-datasets' jackKnifePath: "{fsPath}/pipelines-jackknife" clusteringPath: "{fsPath}/pipelines-clustering" + outlierPath: *outlierTargetPath solrCollection: biocache includeSampling: false includeJackKnife: false diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java index ad0460d2aa..b59141f55b 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/DistributionOutlierPipeline.java @@ -24,6 +24,13 @@ * *

Example: java au.org.ala.pipelines.beam.DistributionOutlierPipeline * --config=/data/la-pipelines/config/la-pipelines.yaml --fsPath=/data + * + *

Running with Jar java + * -Dlog4j.configuration=file://../pipelines/src/main/resources/log4j-colorized.properties + * -Dlog4j.configurationFile=file://../pipelines/src/main/resources/log4j-colorized.properties -cp + * ../pipelines/target/pipelines-2.10.0-SNAPSHOT-shaded.jar + * au.org.ala.pipelines.beam.DistributionOutlierPipeline + * --config=/data/la-pipelines/config/la-pipelines.yaml,la-pipelines-local.yaml --fsPath=/data */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) @@ -69,13 +76,21 @@ public static void run(DistributionOutlierPipelineOptions options) throws Except distributionTransform.calculateOutlier()) .apply("Flatten records", Flatten.iterables()); + log.info("Adding step 3: writing to outliers"); + kvRecords.apply( "Write to file", AvroIO.write(DistributionOutlierRecord.class) - .to(outputPath + "/outliers") + .to(outputPath + "/outlier") .withoutSharding() .withSuffix(".avro")); + // kvRecords + // .apply("to String", distributionTransform.flatToString()) + // .apply( + // "Write to text", + // TextIO.write().to(outputPath+"/outlier").withoutSharding().withSuffix(".txt")); + log.info("Running the pipeline"); PipelineResult result = p.run(); result.waitUntilFinish(); diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java index ce21419052..88a801e344 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/pipelines/beam/IndexRecordToSolrPipeline.java @@ -653,7 +653,15 @@ public KV apply(Relationships input) { private static PCollection> loadOutlierRecords( SolrPipelineOptions options, Pipeline p) { - String path = PathBuilder.buildPath(options.getOutlierPath(), "*" + AVRO_EXTENSION).toString(); + + String dataResourceFolder = options.getDatasetId(); + if (dataResourceFolder == null || "all".equalsIgnoreCase(dataResourceFolder)) { + dataResourceFolder = "all"; + } + + String path = + PathBuilder.buildPath(options.getOutlierPath(), dataResourceFolder, "*" + AVRO_EXTENSION) + .toString(); log.info("Loading outlier from {}", path); return p.apply(AvroIO.read(DistributionOutlierRecord.class).from(path)) diff --git a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java index 16deb1b6a9..aaa5630f7f 100644 --- a/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java +++ b/livingatlas/pipelines/src/main/java/au/org/ala/utils/ALAFsUtils.java @@ -71,7 +71,10 @@ public static String buildPathSamplingUsingTargetPath(AllDatasetsPipelinesOption return PathBuilder.buildDatasetAttemptPath(options, "sampling", false); } - /** Build a path to outlier records. */ + /** + * Build a path to outlier records. {fsPath}/pipelines-outlier/{datasetId} + * {fsPath}/pipelines-outlier/all + */ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions options) throws IOException { // default: {fsPath}/pipelines-outlier @@ -84,11 +87,13 @@ public static String buildPathOutlierUsingTargetPath(AllDatasetsPipelinesOptions // {fsPath}/pipelines-outlier/{datasetId} if (options.getDatasetId() != null && !"all".equalsIgnoreCase(options.getDatasetId())) { outputPath = PathBuilder.buildPath(outputPath, options.getDatasetId()).toString(); + } else { + // {fsPath}/pipelines-outlier/all + outputPath = PathBuilder.buildPath(outputPath, "all").toString(); } // delete previous runs FsUtils.deleteIfExist(options.getHdfsSiteConfig(), options.getCoreSiteConfig(), outputPath); ALAFsUtils.createDirectory(fs, outputPath); - return outputPath; } diff --git a/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java index ad5bbfbb29..2ed9594e90 100644 --- a/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java +++ b/livingatlas/pipelines/src/test/java/au/org/ala/outlier/DistributionOutlierTest.java @@ -7,35 +7,26 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.junit.Test; public class DistributionOutlierTest { - String spatial_url = "http://devt.ala.org.au:8080/ws/"; - // String spatial_url = "https://spatial.ala.org.au/ws/"; - - public void getLayer() { - DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url); - try { - List layers = - impl.findLayersByLsid( - "urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f9b"); - assertSame(1, layers.size()); - } catch (Exception e) { - System.out.println(e.getMessage()); - } - } + // String spatial_url = "http://devt.ala.org.au:8080/ws/"; + String spatial_url = "https://spatial-test.ala.org.au/ws/"; + String lsidGreyNurseShark = + "urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283"; + @Test public void getMultiLayers() { DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url); try { - List layers = - impl.findLayersByLsid( - "urn:lsid:biodiversity.org.au:afd.taxon:0c3e2403-05c4-4a43-8019-30e6d657a283"); - assertSame(5, layers.size()); + List layers = impl.findLayersByLsid(lsidGreyNurseShark); + assertSame(1, layers.size()); } catch (Exception e) { System.out.println(e.getMessage()); } } + @Test public void outliers() { DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url); try { @@ -48,20 +39,18 @@ public void outliers() { points.put("2eb7cda9-f248-4e9e-89b7-44db7312e58a", inPoint); Map outPoint = new HashMap(); - outPoint.put("decimalLatitude", -26.1); + outPoint.put("decimalLatitude", 26.1); outPoint.put("decimalLongitude", 127.5); points.put("6756a12e-d07c-4fc6-8637-a0036f0b76c9", outPoint); - Map results = - impl.outliers( - "urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f9b", - points); + Map results = impl.outliers(lsidGreyNurseShark, points); assertSame(1, results.size()); } catch (Exception e) { e.printStackTrace(); } } + @Test public void multiLayers() { DistributionServiceImpl impl = DistributionServiceImpl.init(spatial_url); try { diff --git a/livingatlas/scripts/la-pipelines b/livingatlas/scripts/la-pipelines index ffb7e6f72e..fffc864e21 100755 --- a/livingatlas/scripts/la-pipelines +++ b/livingatlas/scripts/la-pipelines @@ -111,7 +111,7 @@ Usage: $CMD [options] sample (...|all) $WHERE $CMD [options] clustering (all) $WHERE $CMD [options] jackknife (all) $WHERE - $CMD [options] outlier (all) $WHERE + $CMD [options] outlier (...|all) $WHERE $CMD [options] solr (...|all) $WHERE $CMD [options] solr-schema (...|all) $WHERE $CMD [options] archive-list @@ -135,12 +135,15 @@ Options: ---- $CMD $VER License Apache-2.0 +License Apache-2.0 EOF )" # Enable logging + if ($debug) ; then verbosity=6; else verbosity=5; fi if [[ $PROD = true ]] ; then LIB_DIR=/usr/share/la-pipelines ; else LIB_DIR=. ; fi + source $LIB_DIR/logging_lib.sh $verbosity $no_colors $dr STEP=Initial @@ -682,7 +685,7 @@ function clustering () { } function outlier () { - local dr="*" ltype="$TYPE" + local dr="$1" ltype="$TYPE" CLASS=au.org.ala.pipelines.beam.DistributionOutlierPipeline logStepStart "Outlier" $dr @@ -695,7 +698,7 @@ function outlier () { spark-embed-pipeline $ltype $SH_ARGS $CLASS $dr SH_ARGS=outlier-sh-args.spark-cluster - spark-cluster-pipeline $ltype $SH_ARGS $CLASS $dr "add-jackknife $dr" + spark-cluster-pipeline $ltype $SH_ARGS $CLASS $dr "add-outlier $dr" logStepEnd "Outlier" $dr $ltype $SECONDS } @@ -943,6 +946,10 @@ if ($jackknife || $do_all); then fi fi +if ($outlier || $do_all); then + do_step_simple outlier +fi + if ($solr || $do_all); then do_step_simple solr fi