Skip to content

Commit

Permalink
#622 init(distribution from indexrecord)
Browse files Browse the repository at this point in the history
  • Loading branch information
qifeng-bai committed Nov 24, 2021
1 parent 8baa86b commit 3262fad
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package au.org.ala.distribution;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Getter;
import lombok.Setter;

/**
* Represent a field in the sampling service. A field is a property derived from a spatial layer.
*/
@Getter
@Setter
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class DistributionLayer {
private int gid;
private String data_resource_uid;
private String scientific;
private String pid;
private String type;
private String genus_name;
private String lsid;
private float area_km;
private String common_nam;
private int spcode;
private String genus_lsid;
private String bounding_box;
private String group_name;
private Boolean endemic;
private String image_url;
private String family_lsid;
private String specific_n;
private String wmsurl;
private String family;
private int geom_idx;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package au.org.ala.distribution;

import java.util.List;
import java.util.Map;
import retrofit2.Call;
import retrofit2.http.*;

/** Simple client to the ALA sampling service. */
public interface DistributionService {

/** Return expert distribution layers in the ALA spatial portal */
@GET("distribution/")
Call<List<DistributionLayer>> getLayers();

@GET("distribution/lsids/{id}")
Call<List<DistributionLayer>> getLayersByLsid(
@Path("id") String id, @Query("nowkt") String nowkt);

/**
* @param lsid
* @param points Map<uuid, <decimalLatitude,decimalLongitude>>
* @return
*/
@POST("distribution/outliers/{id}")
Call<Map<String, Double>> outliers(
@Path("id") String lsid, @Body Map<String, Map<String, Double>> points);

// @FormUrlEncoded
// @POST("distribution/outliers/{id}")
// Call<DistributionRequest> outliers(@Path(value="id",encoded = true) String lsid,
// @Field("pointsJson") Map<String, Map<String, Double>> points);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package au.org.ala.distribution;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.converter.jackson.JacksonConverterFactory;

@Slf4j
public class DistributionServiceImpl implements Serializable {

private Retrofit retrofit;
private DistributionService service;

private DistributionServiceImpl(String baseUrl) {
ObjectMapper om =
new ObjectMapper().setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
JacksonConverterFactory jcf = JacksonConverterFactory.create(om);
retrofit = new Retrofit.Builder().baseUrl(baseUrl).addConverterFactory(jcf).build();
service = retrofit.create(DistributionService.class);
}

public static DistributionServiceImpl init(String baseUrl) {
// set up sampling service
return new DistributionServiceImpl(baseUrl);
}

public List<DistributionLayer> getLayers() throws IOException, ExpertDistributionException {
// Response<List<DistributionLayer>> response =
// distributionService.getLayersByLsid("urn:lsid:biodiversity.org.au:afd.taxon:4f3a5260-4f39-4393-a644-4d05b1c45f92", "false").execute();
Response<List<au.org.ala.distribution.DistributionLayer>> response =
service.getLayers().execute();
int code = response.code();
if (code >= 200 && code < 300) {
List<au.org.ala.distribution.DistributionLayer> layers = response.body();
return layers;
} else {
errorHandler(code, response);
return null;
}
}

public List<DistributionLayer> findLayersByLsid(String lsid)
throws IOException, ExpertDistributionException {
Response<List<DistributionLayer>> response = service.getLayersByLsid(lsid, "false").execute();
int code = response.code();
if (code >= 200 && code < 300) {
List<au.org.ala.distribution.DistributionLayer> layers = response.body();
return layers;
} else {
errorHandler(code, response);
return null;
}
}

public Map<String, Double> outliers(String lsid, Map<String, Map<String, Double>> points)
throws IOException, ExpertDistributionException {
Response<Map<String, Double>> response = service.outliers(lsid, points).execute();
int code = response.code();
if (code >= 200 && code < 300) {
Map<String, Double> results = response.body();
return results;
} else {
errorHandler(code, response);
return null;
}
}

/**
* Todo Handle error information Need to sync with Spatial Service
*
* @param code
* @param response
* @throws IOException
* @throws ExpertDistributionException
*/
private void errorHandler(int code, Response<?> response)
throws IOException, ExpertDistributionException {
String errorBody = response.errorBody().string();
if (code >= 400 && code < 500) {
throw new ExpertDistributionException(errorBody);
} else {
throw new RuntimeException(errorBody);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package au.org.ala.distribution;

public class ExpertDistributionException extends Exception {
public ExpertDistributionException(String errorMessage) {
super(errorMessage);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package au.org.ala.pipelines.interpreters;

import static org.gbif.pipelines.core.utils.ModelUtils.*;

import au.org.ala.pipelines.parser.CoordinatesParser;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.gbif.dwc.terms.DwcTerm;
import org.gbif.kvs.geocode.LatLng;
import org.gbif.pipelines.core.parsers.common.ParsedField;
import org.gbif.pipelines.io.avro.ALADistributionRecord;
import org.gbif.pipelines.io.avro.ExtendedRecord;
import org.gbif.pipelines.io.avro.IndexRecord;
import org.gbif.pipelines.io.avro.Record;

/*
* living atlases.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ALADistributionInterpreter {

public static void interpretOccurrenceID(IndexRecord ir, ALADistributionRecord dr) {
dr.setOccurrenceID(ir.getId());
}

public static void interpretLocation(IndexRecord ir, ALADistributionRecord dr) {
String latlng = ir.getLatLng();
String[] coordinates = latlng.split(",");
dr.setDecimalLatitude(Double.parseDouble(coordinates[0]));
dr.setDecimalLongitude(Double.parseDouble(coordinates[1]));
}

public static void interpretSpeciesId(IndexRecord ir, ALADistributionRecord dr) {
dr.setSpeciesID(ir.getTaxonID());
}

/*
* Interprete from verbatim
*/
public static void interpretOccurrenceID(ExtendedRecord er, ALADistributionRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.occurrenceID);
if (!Strings.isNullOrEmpty(value)) {
dr.setOccurrenceID(value);
}
}

public static void interpretLocation(ExtendedRecord er, ALADistributionRecord dr) {
ParsedField<LatLng> parsedLatLon = CoordinatesParser.parseCoords(er);
addIssue(dr, parsedLatLon.getIssues());

if (parsedLatLon.isSuccessful()) {
LatLng latlng = parsedLatLon.getResult();
dr.setDecimalLatitude(latlng.getLatitude());
dr.setDecimalLongitude(latlng.getLongitude());
}
}

public static void interpretSpeciesId(ExtendedRecord er, ALADistributionRecord dr) {
String value = extractNullAwareValue(er, DwcTerm.taxonConceptID);
if (!Strings.isNullOrEmpty(value)) {
dr.setSpeciesID(value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package au.org.ala.pipelines.options;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;

/**
* Options for pipelines that run against spatial service for Expert distribution layer calculation.
*/
public interface DistributionPipelineOptions extends AllDatasetsPipelinesOptions {

@Description("Base URL for Spatial service")
@Default.String("https://spatial.ala.org.au/ws/")
String getBaseUrl();

void setBaseUrl(String baseUrl);

@Description("Default batch size")
@Default.Integer(25000)
Integer getBatchSize();

void setBatchSize(Integer batchSize);

@Description("Keep download sampling CSVs")
@Default.Integer(1000)
Integer getBatchStatusSleepTime();

void setBatchStatusSleepTime(Integer batchStatusSleepTime);
}
Loading

0 comments on commit 3262fad

Please sign in to comment.