Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate distance to expert distribution layers #631

Merged
merged 53 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
3262fad
#622 init(distribution from indexrecord)
qifeng-bai Nov 24, 2021
e13929a
Merge branch 'dev' into 622_distribution
qifeng-bai Nov 24, 2021
c873356
#622 improve(using indexrecord)
qifeng-bai Nov 25, 2021
5f08a8f
#622 impl(add outlier to solr index)
qifeng-bai Nov 25, 2021
09d3408
#622 refactor(Use DistributionOutlier)
qifeng-bai Nov 26, 2021
28cbab8
#622 refactor
qifeng-bai Nov 26, 2021
8d41ef8
#622 impl(index distance to edl to solr)
qifeng-bai Nov 28, 2021
238d2b9
#622 impl(la-pipeline script)
qifeng-bai Nov 29, 2021
3cf2ecb
#622 fix(change 'step' to OUTLIER)
qifeng-bai Dec 2, 2021
dbfe4d1
#622 fix(temporary remove outlier tests)
qifeng-bai Dec 3, 2021
14fd378
#622 fix(scipt and write to /all and /datasetId)
qifeng-bai Dec 10, 2021
402a8e9
#622 fix(filter out records without location and taxon)
qifeng-bai Dec 15, 2021
ba10d56
#622 fix(using logback, not log4j )
qifeng-bai Dec 16, 2021
27436e8
#622 doc(how to use external logback config)
qifeng-bai Dec 16, 2021
f6f7712
Merge branch 'dev' into 622_distribution
qifeng-bai Dec 16, 2021
7a9ee6c
#622 fix(use logback.xml after dev update)
qifeng-bai Dec 16, 2021
578f75f
#622 impl(only output records which have outlier)
qifeng-bai Dec 20, 2021
c80bbd3
#622 fix(urlencode lsid)
qifeng-bai Jan 9, 2022
378064e
#622 fix(remove log4j config from script)
qifeng-bai Jan 14, 2022
81b891a
#622 impl(Only calculated new added records)
qifeng-bai Jan 21, 2022
3bb724b
#622 fix(loading avro from an empty folder)
qifeng-bai Jan 23, 2022
eda9310
#622 improve(counters)
qifeng-bai Feb 2, 2022
4f3a8a0
#622 improve(counter)
qifeng-bai Feb 2, 2022
1f75c10
#622 init(distribution from indexrecord)
qifeng-bai Nov 24, 2021
cd4b8a6
#622 improve(using indexrecord)
qifeng-bai Nov 25, 2021
08ce46d
#622 impl(add outlier to solr index)
qifeng-bai Nov 25, 2021
e3e4643
#622 refactor(Use DistributionOutlier)
qifeng-bai Nov 26, 2021
080eb7e
#622 refactor
qifeng-bai Nov 26, 2021
1e96020
#622 impl(index distance to edl to solr)
qifeng-bai Nov 28, 2021
2ab2666
#622 impl(la-pipeline script)
qifeng-bai Nov 29, 2021
1bc87a8
#622 fix(change 'step' to OUTLIER)
qifeng-bai Dec 2, 2021
b03d6cd
#622 fix(temporary remove outlier tests)
qifeng-bai Dec 3, 2021
321cbf8
#622 fix(scipt and write to /all and /datasetId)
qifeng-bai Dec 10, 2021
c1da691
#622 fix(filter out records without location and taxon)
qifeng-bai Dec 15, 2021
7c36ac6
#622 fix(using logback, not log4j )
qifeng-bai Dec 16, 2021
63c2556
#622 doc(how to use external logback config)
qifeng-bai Dec 16, 2021
92783d1
#622 fix(use logback.xml after dev update)
qifeng-bai Dec 16, 2021
8a46afe
#622 impl(only output records which have outlier)
qifeng-bai Dec 20, 2021
11f3a4e
#622 fix(urlencode lsid)
qifeng-bai Jan 9, 2022
6bf5a94
#622 fix(remove log4j config from script)
qifeng-bai Jan 14, 2022
e4a6237
#622 impl(Only calculated new added records)
qifeng-bai Jan 21, 2022
2e86fd4
#622 fix(loading avro from an empty folder)
qifeng-bai Jan 23, 2022
01aa1dd
#622 improve(counters)
qifeng-bai Feb 2, 2022
5e51baa
#622 improve(counter)
qifeng-bai Feb 2, 2022
fd02ee3
Ensure all outlier records are written to AVRO (not just new ones)
djtfmartin Feb 3, 2022
8e193fc
Merge branch '622_distribution' of https://github.com/gbif/pipelines …
qifeng-bai Feb 3, 2022
bd31546
Merge pull request #657 from gbif/622_distribution_dm
qifeng-bai Feb 16, 2022
241bd35
Merge branch '622_distribution' of https://github.com/gbif/pipelines …
qifeng-bai Feb 16, 2022
dd59900
#622 fix(generate single output file)
qifeng-bai Feb 16, 2022
c092852
fix for #680 and a fix for missing properties when indexed generate w…
djtfmartin Mar 2, 2022
5b657fa
Merge branch '622_distribution' into 622_distribution_dm
djtfmartin Mar 2, 2022
d05e9fb
Merge pull request #681 from gbif/622_distribution_dm
qifeng-bai Mar 2, 2022
87668c5
#622 refactor(comments and add withoutSharding)
qifeng-bai Mar 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions livingatlas/configs/la-pipelines-local-hdfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ sample-avro:
index:
inputPath: hdfs://localhost:8020/pipelines-data
zkHost: localhost:9983
outlier:
inputPath: hdfs://localhost:8020/pipelines-data
targetPath: hdfs://localhost:8020/pipelines-outlier

10 changes: 10 additions & 0 deletions livingatlas/configs/la-pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ sampling:
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
runner: SparkRunner

# Calculate distance to the expert distribution layers
# class:au.org.ala.pipelines.beam.DistributionOutlierPipeline
outlier:
appName: Expert distribution outliers for {datasetId}
baseUrl: https://spatial.ala.org.au/ws/
targetPath: &outlierTargetPath '{fsPath}/pipelines-outlier'
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
runner: SparkRunner

# class: au.org.ala.pipelines.beam.ALAInterpretedToSensitivePipeline
sensitive:
appName: Sensitive Data Processing for {datasetId}
Expand All @@ -192,6 +201,7 @@ solr:
allDatasetsInputPath: '{fsPath}/pipelines-all-datasets'
jackKnifePath: "{fsPath}/pipelines-jackknife"
clusteringPath: "{fsPath}/pipelines-clustering"
outlierPath: *outlierTargetPath
solrCollection: biocache
includeSampling: false
includeJackKnife: false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package au.org.ala.distribution;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Getter;
import lombok.Setter;

/**
* Represent a field in the sampling service. A field is a property derived from a spatial layer.
*/
@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class DistributionLayer {
private int gid;
private String data_resource_uid;
private String scientific;
private String pid;
private String type;
private String genus_name;
private String lsid;
private float area_km;
private String common_nam;
private int spcode;
private String genus_lsid;
private String bounding_box;
private String group_name;
private Boolean endemic;
private String image_url;
private String family_lsid;
private String specific_n;
private String wmsurl;
private String family;
private int geom_idx;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package au.org.ala.distribution;

import java.util.List;
import java.util.Map;
import retrofit2.Call;
import retrofit2.http.*;

/** Simple client to the ALA sampling service. */
public interface DistributionService {

/** Return expert distribution layers in the ALA spatial portal */
@GET("distribution/")
Call<List<DistributionLayer>> getLayers();

@GET("distribution/lsids/{id}")
Call<List<DistributionLayer>> getLayersByLsid(
@Path("id") String id, @Query("nowkt") String nowkt);

/**
* @param lsid
* @param points Map<uuid, <decimalLatitude,decimalLongitude>>
* @return
*/
@POST("distribution/outliers/{id}")
Call<Map<String, Double>> outliers(
@Path("id") String lsid, @Body Map<String, Map<String, Double>> points);

// @FormUrlEncoded
// @POST("distribution/outliers/{id}")
// Call<DistributionRequest> outliers(@Path(value="id",encoded = true) String lsid,
// @Field("pointsJson") Map<String, Map<String, Double>> points);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package au.org.ala.distribution;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.converter.jackson.JacksonConverterFactory;

@Slf4j
public class DistributionServiceImpl implements Serializable {

private Retrofit retrofit;
private DistributionService service;

private DistributionServiceImpl(String baseUrl) {
ObjectMapper om =
new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
JacksonConverterFactory jcf = JacksonConverterFactory.create(om);
retrofit = new Retrofit.Builder().baseUrl(baseUrl).addConverterFactory(jcf).build();
service = retrofit.create(DistributionService.class);
}

public static DistributionServiceImpl init(String baseUrl) {
// set up sampling service
return new DistributionServiceImpl(baseUrl);
}

public List<DistributionLayer> getLayers() throws IOException, ExpertDistributionException {
// Response<List<DistributionLayer>> response =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented code

// distributionService.getLayersByLsid("urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f92", "false").execute();
Response<List<au.org.ala.distribution.DistributionLayer>> response =
service.getLayers().execute();
int code = response.code();
if (code >= 200 && code < 300) {
List<au.org.ala.distribution.DistributionLayer> layers = response.body();
return layers;
} else {
errorHandler(code, response);
return null;
}
}

public List<DistributionLayer> findLayersByLsid(String lsid)
throws IOException, ExpertDistributionException {
Response<List<DistributionLayer>> response = service.getLayersByLsid(lsid, "false").execute();
int code = response.code();
if (code >= 200 && code < 300) {
List<au.org.ala.distribution.DistributionLayer> layers = response.body();
return layers;
} else {
errorHandler(code, response);
return null;
}
}

public Map<String, Double> outliers(String lsid, Map<String, Map<String, Double>> points)
throws IOException, ExpertDistributionException {
Response<Map<String, Double>> response = service.outliers(lsid, points).execute();
int code = response.code();
if (code >= 200 && code < 300) {
Map<String, Double> results = response.body();
return results;
} else {
errorHandler(code, response);
return null;
}
}

/**
* Todo Handle error information Need to sync with Spatial Service
*
* @param code
* @param response
* @throws IOException
* @throws ExpertDistributionException
*/
private void errorHandler(int code, Response<?> response)
throws IOException, ExpertDistributionException {
String errorBody = response.errorBody().string();
if (code >= 400 && code < 500) {
throw new ExpertDistributionException(errorBody);
} else {
throw new RuntimeException(errorBody);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package au.org.ala.distribution;

public class ExpertDistributionException extends Exception {
public ExpertDistributionException(String errorMessage) {
super(errorMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package au.org.ala.pipelines.beam;

import au.org.ala.pipelines.options.DistributionOutlierPipelineOptions;
import au.org.ala.pipelines.transforms.DistributionOutlierTransform;
import au.org.ala.pipelines.util.VersionInfo;
import au.org.ala.utils.ALAFsUtils;
import au.org.ala.utils.CombinedYamlConfiguration;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PCollection;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.io.avro.*;
import org.slf4j.MDC;

/**
* A pipeline that calculate distance to the expert distribution layers (EDL)
*
* <p>distanceOutOfELD: 0 -> inside of EDL, -1: -> No EDLs. >0 -> out of EDL
*
* <p>Example: java au.org.ala.pipelines.beam.DistributionOutlierPipeline
* --config=/data/la-pipelines/config/la-pipelines.yaml --fsPath=/data
*
* <p>Running with Jar java
* -Dlog4j.configuration=file://../pipelines/src/main/resources/log4j-colorized.properties
* -Dlog4j.configurationFile=file://../pipelines/src/main/resources/log4j-colorized.properties -cp
* ../pipelines/target/pipelines-2.10.0-SNAPSHOT-shaded.jar
* au.org.ala.pipelines.beam.DistributionOutlierPipeline
* --config=/data/la-pipelines/config/la-pipelines.yaml,la-pipelines-local.yaml --fsPath=/data
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DistributionOutlierPipeline {

public static void main(String[] args) throws Exception {
VersionInfo.print();
CombinedYamlConfiguration conf = new CombinedYamlConfiguration(args);
String[] combinedArgs = conf.toArgs("general", "outlier");

DistributionOutlierPipelineOptions options =
PipelinesOptionsFactory.create(DistributionOutlierPipelineOptions.class, combinedArgs);
MDC.put("datasetId", options.getDatasetId());
MDC.put("attempt", options.getAttempt().toString());
MDC.put("step", "OUTLIER");
PipelinesOptionsFactory.registerHdfs(options);
run(options);
System.exit(0);
}

public static void run(DistributionOutlierPipelineOptions options) throws Exception {

// Create output path
// default: {fsPath}/pipelines-outlier
// or {fsPath}/pipelines-outlier/{datasetId}
String outputPath = ALAFsUtils.buildPathOutlierUsingTargetPath(options);

log.info("Adding step 1: Collecting index records");
Pipeline p = Pipeline.create(options);

PCollection<IndexRecord> indexRecords = ALAFsUtils.loadIndexRecords(options, p);

DistributionOutlierTransform distributionTransform =
new DistributionOutlierTransform(options.getBaseUrl());

log.info("Adding step 2: calculating outliers index");
PCollection<DistributionOutlierRecord> kvRecords =
indexRecords
.apply("Key by species", distributionTransform.toKv())
.apply("Grouping by species", GroupByKey.create())
.apply(
"Calculating outliers based on the species",
distributionTransform.calculateOutlier())
.apply("Flatten records", Flatten.iterables());

log.info("Adding step 3: writing to outliers");

kvRecords.apply(
"Write to file",
AvroIO.write(DistributionOutlierRecord.class)
.to(outputPath + "/outlier")
.withoutSharding()
.withSuffix(".avro"));

// kvRecords
// .apply("to String", distributionTransform.flatToString())
// .apply(
// "Write to text",
// TextIO.write().to(outputPath+"/outlier").withoutSharding().withSuffix(".txt"));

log.info("Running the pipeline");
PipelineResult result = p.run();
result.waitUntilFinish();
}
}
Loading