Skip to content

Commit

Permalink
#1084 sync living atlas module
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Collins committed Aug 4, 2024
1 parent fab5f99 commit 9690406
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 36 deletions.
2 changes: 1 addition & 1 deletion livingatlas/migration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<!-- this library is not in the parent POM, but is in use by gbif/crawler
which uses version 1.58
-->
<spark.version>3.3.1</spark.version>
<spark.version>3.4.0</spark.version>
<jcommander.version>1.78</jcommander.version>
<sonar.skip>true</sonar.skip>
</properties>
Expand Down
5 changes: 3 additions & 2 deletions livingatlas/pipelines/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
version = 2.7.5
maxColumn = 120
version = 3.7.3
maxColumn = 120
runner.dialect = scala212
44 changes: 29 additions & 15 deletions livingatlas/pipelines/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,15 @@

<!-- Tools-->
<commons-configuration.version>1.6</commons-configuration.version>
<netty.version>3.9.9.Final</netty.version>
<netty-all.version>4.1.42.Final</netty-all.version>
<!-- <netty.version>3.9.9.Final</netty.version>-->
<netty-all.version>4.1.100.Final</netty-all.version>
<snakeyaml.version>1.25</snakeyaml.version>
<streamex.version>0.7.2</streamex.version>
<jcommander.version>1.78</jcommander.version>
<layers-store.version>2.0.3</layers-store.version>
<json-simple.version>1.1.1</json-simple.version>
<hadoop-compress.version>1.2</hadoop-compress.version>

<!-- GBIF parent currently using 2.4.0 -->
<spark.embedded.version>2.4.5</spark.embedded.version>

<!-- Plugins -->
<org.jetbrains.annotations>13.0</org.jetbrains.annotations>
<git-commit-id-plugin.version>2.2.4</git-commit-id-plugin.version>
Expand All @@ -58,10 +55,8 @@
<checker-qual.version>3.9.1</checker-qual.version>
<snappy-java.version>1.1.8.4</snappy-java.version>

<spark.version>3.3.1</spark.version>
<hadoop.version>3.3.4</hadoop.version>
<hadoop.version>3.4.0</hadoop.version>
<maven-scala-plugin.version>4.8.0</maven-scala-plugin.version>
<scala.version>2.12.17</scala.version>
</properties>

<build>
Expand Down Expand Up @@ -184,7 +179,7 @@
<configuration combine.self="override">
<java>
<googleJavaFormat>
<version>1.7</version>
<version>${googleJavaFormat.version}</version>
<style>GOOGLE</style>
</googleJavaFormat>
</java>
Expand Down Expand Up @@ -252,6 +247,12 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-solr</artifactId>
<version>${apache.beam.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
Expand All @@ -268,11 +269,11 @@
<artifactId>commons-configuration</artifactId>
<version>${commons-configuration.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.netty</groupId>-->
<!-- <artifactId>netty</artifactId>-->
<!-- <version>${netty.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down Expand Up @@ -303,6 +304,13 @@
<groupId>org.gbif.registry</groupId>
<artifactId>registry-ws-client</artifactId>
</exclusion>
<!-- DO NOT REMOVE -->
<!-- This is required for integration tests in some envronments. -->
<!-- The integration test error is an java.lang.UnsatisfiedLinkError for a JNA path -->
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -737,7 +745,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper-version}</version>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
Expand Down Expand Up @@ -832,6 +840,12 @@
<artifactId>hadoop-compress</artifactId>
<version>${hadoop-compress.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.23.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
public class ShapeFile implements Serializable {
/** Path to the shape file */
String path;

/** The name field to use from the shape file. */
String field;

/** URL to source of the shapefile */
String source;

/** Intersect buffer 0.1 = 11km, 0.135 = 15km, 0.18 = 20km */
Double intersectBuffer = 0.18;

/** Intersect mapping to allow intersected values to mapped to different values e.g. CX -> AU * */
Map<String, String> intersectMapping;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.gbif.pipelines.common.beam.metrics.MetricsHandler;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
import org.gbif.pipelines.core.pojo.HdfsConfigs;
Expand Down Expand Up @@ -296,10 +297,17 @@ public void processElement(ProcessContext c) {
PipelineResult result = pipeline.run();
result.waitUntilFinish();

MetricsHandler.saveCountersToFile(
HdfsConfigs.create(options.getHdfsSiteConfig(), options.getCoreSiteConfig()),
jackknifePath + "/metrics.yaml",
result.metrics());
String path = jackknifePath + "/metrics.yaml";
String countersInfo = MetricsHandler.getCountersInfo(result.metrics());
FileSystem fs =
FsUtils.getFileSystem(
HdfsConfigs.create(options.getHdfsSiteConfig(), options.getCoreSiteConfig()), path);
try {
FsUtils.createFile(fs, path, countersInfo);
log.info("Metadata was written to a file - {}", path);
} catch (IOException ex) {
log.warn("Write pipelines metadata file", ex);
}

log.info("3. Pipeline has been finished");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.directory.api.util.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.gbif.dwc.terms.DwcTerm;
import org.gbif.pipelines.common.beam.options.PipelinesOptionsFactory;
Expand Down Expand Up @@ -174,8 +174,8 @@ public void processElement(
.withOtherCatalogNumbers(otherCatalogNumbers);

// specimen only hashes
if (Strings.isNotEmpty(speciesKey)
&& Strings.isNotEmpty(basisOfRecord)
if (StringUtils.isNotEmpty(speciesKey)
&& StringUtils.isNotEmpty(basisOfRecord)
&& specimenBORs.contains(basisOfRecord)) {

Stream<String> ids =
Expand All @@ -188,7 +188,8 @@ public void processElement(
// output hashes for each combination
ids.filter(
value ->
!Strings.isEmpty(value) && !omitIds.contains(value.toUpperCase()))
!StringUtils.isEmpty(value)
&& !omitIds.contains(value.toUpperCase()))
.distinct()
.collect(Collectors.toList())
.forEach(
Expand Down Expand Up @@ -225,14 +226,14 @@ public void processElement(
}

// 2. type status hashkeys
if (Strings.isNotEmpty(taxonKey) && typeStatus != null) {
if (StringUtils.isNotEmpty(taxonKey) && typeStatus != null) {
for (String t : typeStatus) {
out.output(builder.withHashKey(taxonKey + "|" + t).build());
}
}

// 3. taxonKey|year|recordedBy hashkeys
if (Strings.isNotEmpty(taxonKey) && year != null && recordedBy != null) {
if (StringUtils.isNotEmpty(taxonKey) && year != null && recordedBy != null) {
out.output(
builder.withHashKey(taxonKey + "|" + year + "|" + recordedBy).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public static void main(String[] args) throws Exception {
// FIXME: Issue logged here: https://github.com/AtlasOfLivingAustralia/la-pipelines/issues/105
System.exit(0);
}

/**
* Includes the following steps:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,16 @@ public boolean coordinatesMatchCentre(
}
}

/** @return size of centres */
/**
* @return size of centres
*/
public int size() {
return centres.size();
}

/** @return keys */
/**
* @return keys
*/
public Set<String> keys() {
return centres.keySet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,9 @@ object PredicateExportDwCAPipeline {
extensionFileName: String,
extensionFields: Array[String]
): Elem = {
<extension rowType={extensionUri} encoding="UTF-8" fieldsTerminatedBy="\t" linesTerminatedBy="\r\n" fieldsEnclosedBy="&quot;" ignoreHeaderLines="1">
<extension rowType={
extensionUri
} encoding="UTF-8" fieldsTerminatedBy="\t" linesTerminatedBy="\r\n" fieldsEnclosedBy="&quot;" ignoreHeaderLines="1">
<files>
<location>{extensionFileName}.txt</location>
</files>
Expand Down Expand Up @@ -669,7 +671,9 @@ object PredicateExportDwCAPipeline {
extensionFileName: String,
extensionFields: Array[String]
): Elem = {
<extension rowType={extensionUri} encoding="UTF-8" fieldsTerminatedBy="\t" linesTerminatedBy="\r\n" fieldsEnclosedBy="&quot;" ignoreHeaderLines="1">
<extension rowType={
extensionUri
} encoding="UTF-8" fieldsTerminatedBy="\t" linesTerminatedBy="\r\n" fieldsEnclosedBy="&quot;" ignoreHeaderLines="1">
<files>
<location>{extensionFileName}.txt</location>
</files>
Expand Down Expand Up @@ -698,7 +702,9 @@ object PredicateExportDwCAPipeline {
val coreFileName =
coreURI.substring(coreURI.lastIndexOf("/") + 1).toLowerCase
val metaXml = <archive xmlns="http://rs.tdwg.org/dwc/text/">
<core rowType={coreURI} encoding="UTF-8" fieldsTerminatedBy="\t" linesTerminatedBy="\r\n" fieldsEnclosedBy="&quot;" ignoreHeaderLines="1">
<core rowType={
coreURI
} encoding="UTF-8" fieldsTerminatedBy="\t" linesTerminatedBy="\r\n" fieldsEnclosedBy="&quot;" ignoreHeaderLines="1">
<files>
<location>{coreFileName}.txt</location>
</files>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void testCountryEEZ1() {

assertFalse(resp.getLocations().isEmpty());
}

// -35.482884,146.804061
@Test
public void testCountryEEZ2() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
public class NameCheckKVStoreTestIT {

@ClassRule public static IntegrationTestUtils itUtils = IntegrationTestUtils.getInstance();

/**
* Tests the Get operation on {@link KeyValueCache} that wraps a simple KV store backed by a
* HashMap.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
<gbif-wrangler.version>0.5-SNAPSHOT</gbif-wrangler.version>
<gbif-occurrence.version>0.195.0-H3-SNAPSHOT</gbif-occurrence.version>
<vocabulary-lookup.version>1.0.0</vocabulary-lookup.version>
<registry.version>3.96.6-SNAPSHOT</registry.version>
<registry.version>3.96.6.1-SNAPSHOT</registry.version>
<gbif-common-ws.version>1.26</gbif-common-ws.version>
<gbif-postal-service.version>1.9.0-SNAPSHOT</gbif-postal-service.version>
<gbif-common-spreadsheet.version>0.2</gbif-common-spreadsheet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void transformationTest() {
null,
"POLYGON((100000 515000,100000 520000,105000 520000,105000 515000,100000 515000))",
"EPSG:28992",
"POLYGON ((52.619749292808244 4.575033022857827, 52.66468072273537 4.574203170903049, 52.665162889286556 4.648106265726084, 52.6202308261076 4.648860682668264, 52.619749292808244 4.575033022857827))"
"POLYGON ((52.619749292808244 4.575033022857827, 52.664680722735376 4.574203170903048, 52.665162889286556 4.648106265726084, 52.6202308261076 4.648860682668264, 52.619749292808244 4.575033022857827))"
};

final MetadataRecord mdr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void footprintWKTTest() {
LocationRecord.newBuilder()
.setId("1")
.setFootprintWKT(
"POLYGON ((52.619749292808244 4.575033022857827, 52.66468072273537 4.574203170903049, 52.665162889286556 4.648106265726084, 52.6202308261076 4.648860682668264, 52.619749292808244 4.575033022857827))")
"POLYGON ((52.619749292808244 4.575033022857827, 52.664680722735376 4.574203170903048, 52.665162889286556 4.648106265726084, 52.6202308261076 4.648860682668264, 52.619749292808244 4.575033022857827))")
.build();

// When
Expand Down

0 comments on commit 9690406

Please sign in to comment.