Skip to content

Commit

Permalink
#781 clustering otherCatalogNumbers (PR #800)
Browse files Browse the repository at this point in the history
  • Loading branch information
timrobertson100 authored Oct 27, 2022
2 parents 301f69a + a685577 commit f4f6e93
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ minicluster
release.properties
sdks/models/src/main/avro/table/
examples/dwca-to-elasticsearch/configs/*.json
**/dependency-reduced-pom.xml

# la-pipelines debian package
livingatlas/debian/*.debhelper
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.gbif.pipelines.clustering;

import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.concatIfEligible;
import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.hashOrNull;
import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.isEligibleCode;
import static org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships.isNumeric;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.gbif.pipelines.core.parsers.clustering.OccurrenceFeatures;
import org.gbif.pipelines.core.parsers.clustering.OccurrenceRelationships;

/** Utility functions for hashing records to pregroup. */
public class HashUtilities {

/**
* Hashes the occurrenceID, catalogCode and otherCatalogNumber into a Set of codes. The
* catalogNumber is constructed as it's code and also prefixed in the commonly used format of
* IC:CC:CN and IC:CN. No code in the resulting set will be numeric as the cardinality of e.g.
* catalogNumber=100 is too high to compute, and this is intended to be used to detect explicit
* relationships with otherCatalogNumber.
*
* @return a set of hashes suitable for grouping specimen related records without other filters.
*/
public static Set<String> hashCatalogNumbers(OccurrenceFeatures o) {
Stream<String> cats =
Stream.of(
hashOrNull(o.getCatalogNumber(), false),
hashOrNull(o.getOccurrenceID(), false),
concatIfEligible(
":", o.getInstitutionCode(), o.getCollectionCode(), o.getCatalogNumber()),
concatIfEligible(":", o.getInstitutionCode(), o.getCatalogNumber()));

if (o.getOtherCatalogNumbers() != null) {
cats =
Stream.concat(cats, o.getOtherCatalogNumbers().stream().map(c -> hashOrNull(c, false)));
}

return cats.map(OccurrenceRelationships::normalizeID)
.filter(c -> isEligibleCode(c) && !isNumeric(c))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.gbif.pipelines.clustering

import java.io.File

import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
Expand Down Expand Up @@ -116,17 +115,31 @@ object Cluster {

filteredIds.foreach(id => {
id match {
case Some(id) => {
records.append(
Row(
r.getAs[Long]("gbifId"),
r.getAs[String]("datasetKey"),
r.getAs[Integer]("speciesKey") + "|" + OccurrenceRelationships.normalizeID(id)
))
}
case Some(id) =>
val speciesKey = Option(r.getAs[Int]("speciesKey"))
if (!speciesKey.isEmpty) {
records.append(
Row(
r.getAs[Long]("gbifId"),
r.getAs[String]("datasetKey"),
speciesKey + "|" + OccurrenceRelationships.normalizeID(id)
))
}
case None => // skip
}
})

// a special treatment for catalogNumber and otherCatalogNumber overlap
val features = new RowOccurrenceFeatures(r)
val hashedIDs = HashUtilities.hashCatalogNumbers(features) // normalised, non-numeric codes
hashedIDs.foreach(id => {
records.append(
Row(
r.getAs[Long]("gbifId"),
r.getAs[String]("datasetKey"),
id // only the ID overlap is suffice here
))
})
}
records
})(hashEncoder).toDF()
Expand All @@ -144,7 +157,9 @@ object Cluster {
val taxonKey = Option(r.getAs[Int]("taxonKey"))
val typeStatus = Option(r.getAs[Seq[String]]("typeStatus"))
val recordedBy = Option(r.getAs[Seq[String]]("recordedBy"))
if (!lat.isEmpty && !lng.isEmpty && !year.isEmpty && !month.isEmpty && !day.isEmpty) {
val speciesKey = Option(r.getAs[Int]("speciesKey"))

if (!lat.isEmpty && !lng.isEmpty && !year.isEmpty && !month.isEmpty && !day.isEmpty && !speciesKey.isEmpty) {
records.append(
Row(
r.getAs[Long]("gbifId"),
Expand Down Expand Up @@ -182,13 +197,25 @@ object Cluster {
val deduplicatedHashedRecords = hashAll.union(hashSpecimenIds).dropDuplicates()

// persist for debugging, enable for further processing in SQL
deduplicatedHashedRecords.write.saveAsTable(hiveTableHashed) // for diagnostics in hive
deduplicatedHashedRecords.createOrReplaceTempView("DF_hashed")
deduplicatedHashedRecords.write.saveAsTable(hiveTableHashed + "_all") // for diagnostics in hive
deduplicatedHashedRecords.createOrReplaceTempView("DF_hashed_all")

// defend against NxN runtime issues by capping the number of records in a candidate group to 10000
val hashCounts = deduplicatedHashedRecords.groupBy("hash").count().withColumnRenamed("count", "c");
hashCounts.createOrReplaceTempView("DF_hash_counts")
val hashedFiltered = sql("""
SELECT t1.gbifID, t1.datasetKey, t1.hash
FROM DF_hashed_all t1 JOIN DF_hash_counts t2 ON t1.hash=t2.hash
WHERE t2.c <= 10000
""")
hashedFiltered.write.saveAsTable(hiveTableHashed) // for diagnostics in hive
hashedFiltered.createOrReplaceTempView("DF_hashed")

// Cross join to distinct pairs of records spanning 2 datasets
val candidates = sql("""
SELECT t1.gbifId as id1, t1.datasetKey as ds1, t2.gbifId as id2, t2.datasetKey as ds2
FROM DF_hashed t1 JOIN DF_hashed t2 ON t1.hash = t2.hash
FROM
DF_hashed t1 JOIN DF_hashed t2 ON t1.hash = t2.hash
WHERE
t1.gbifId < t2.gbifId AND
t1.datasetKey != t2.datasetKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ SELECT
recordedBy, recordedByID,
ext_multimedia
FROM occurrence
WHERE speciesKey IS NOT NULL
"""

case class SimpleOccurrence(gbifID: String, decimalLatitude: Double)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.gbif.pipelines.core.parsers.clustering.RelationshipAssertion.FeatureAssertion;

/** Generates relationship assertions for occurrence records. */
Expand All @@ -21,7 +18,12 @@ public class OccurrenceRelationships {
private static final int THRESHOLD_IN_DAYS = 1;

// A list of IDs that are excluded for comparison
private static final List<String> idOmitList = newIdOmitList();
private static final Set<String> idOmitList = newIdOmitList();

private static final Set<String> SPECIMEN_BORS =
new HashSet<>(
Arrays.asList(
"PRESERVED_SPECIMEN", "LIVING_SPECIMEN", "FOSSIL_SPECIMEN", "MATERIAL_CITATION"));

/** Will either generate an assertion with justification or return null. */
public static <T extends OccurrenceFeatures> RelationshipAssertion<T> generate(T o1, T o2) {
Expand All @@ -33,6 +35,7 @@ public static <T extends OccurrenceFeatures> RelationshipAssertion<T> generate(T
// generate "facts"
compareTaxa(o1, o2, assertion);
compareIdentifiers(o1, o2, assertion);
compareCatalogNumbers(o1, o2, assertion); // more specific than identifiers
compareDates(o1, o2, assertion);
compareCollectors(o1, o2, assertion);
compareCoordinates(o1, o2, assertion);
Expand Down Expand Up @@ -84,6 +87,13 @@ public static <T extends OccurrenceFeatures> RelationshipAssertion<T> generate(T
});
}

// Relax rules to accommodate well-formed otherCatalogNumber assertions
// See https://github.com/gbif/pipelines/issues/781
if (SPECIMEN_BORS.contains(o1.getBasisOfRecord())
&& SPECIMEN_BORS.contains(o2.getBasisOfRecord())) {
passConditions.add(new FeatureAssertion[] {OTHER_CATALOG_NUMBERS_OVERLAP});
}

// always exclude things on different location or date
if (assertion.justificationDoesNotContain(DIFFERENT_DATE, DIFFERENT_COUNTRY)) {
// for any ruleset that matches we generate the assertion
Expand Down Expand Up @@ -238,27 +248,78 @@ private static <T extends OccurrenceFeatures> void compareCountry(
static <T extends OccurrenceFeatures> void compareIdentifiers(
OccurrenceFeatures o1, OccurrenceFeatures o2, RelationshipAssertion<T> assertion) {
// ignore case and [-_., ] chars
// otherCatalogNumbers is not parsed, but a good addition could be to explore that
Set<String> intersection =
o1.listIdentifiers().stream()
.filter(Objects::nonNull)
.map(OccurrenceRelationships::normalizeID)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

Set<String> toMatch =
o2.listIdentifiers().stream()
.filter(Objects::nonNull)
.map(OccurrenceRelationships::normalizeID)
.filter(Objects::nonNull)
.collect(Collectors.toSet());

intersection.retainAll(toMatch);
intersection.removeAll(idOmitList);
Set<String> filtered =
intersection.stream().filter(c -> !idOmitList.contains(c)).collect(Collectors.toSet());

if (!intersection.isEmpty()) {
if (!filtered.isEmpty()) {
assertion.collect(IDENTIFIERS_OVERLAP);
}
}

/**
* Detects if either of the catalogNumbers formatted in various ways (CN, IC:CN and IC:CC:CN) is
* present in the other records otherCatalogNumber. This is intended to detect clearly defined
* assertion where one publisher provides e.g. otherCatalogNumber=KU:MAMM:X123 and the other
* record carries those values in the individual fields.
*/
static <T extends OccurrenceFeatures> void compareCatalogNumbers(
OccurrenceFeatures o1, OccurrenceFeatures o2, RelationshipAssertion<T> assertion) {

if (catalogNumberOverlaps(
o1.getInstitutionCode(),
o1.getCollectionCode(),
o1.getCatalogNumber(),
o2.getOtherCatalogNumbers())
|| catalogNumberOverlaps(
o2.getInstitutionCode(),
o2.getCollectionCode(),
o2.getCatalogNumber(),
o1.getOtherCatalogNumbers())) {
assertion.collect(OTHER_CATALOG_NUMBERS_OVERLAP);
}
}

/**
* Returns true if the ic:cc:cn and provided target overlaps, after formatting rules are applied:
*
* <ol>
* <li>The target codes may not start with the common Cat. or Cat# prefixes (ignoring case)
* <li>Whitespace and delimited characters such as :/_ etc are ignored in the comparison
* </ol>
*/
static boolean catalogNumberOverlaps(String ic, String cc, String cn, List<String> target) {
if (target == null) return false;

Set<String> codes =
Stream.of(concatIfEligible(":", ic, cc, cn))
.map(OccurrenceRelationships::normalizeID)
.filter(c -> isEligibleCode(c) && !isNumeric(c))
.collect(Collectors.toSet());

Set<String> targetCodes =
target.stream()
.map(c -> c.replaceFirst("^[Cc]at[.#]", "")) // remove common prefixes of Cat. Cat#
.map(OccurrenceRelationships::normalizeID)
.filter(c -> isEligibleCode(c) && !isNumeric(c))
.collect(Collectors.toSet());

targetCodes.retainAll(codes);
return !targetCodes.isEmpty();
}

static boolean equalsAndNotNull(Object o1, Object o2) {
return o1 != null && Objects.equals(o1, o2);
}
Expand Down Expand Up @@ -306,34 +367,86 @@ public static String normalizeID(String id) {
return null;
}

/** Returns a concatenated string only when all atoms are eligible, otherwise null. */
public static String concatIfEligible(String separator, String... s) {
if (Arrays.stream(s).allMatch(a -> isEligibleCode(a))) {
return String.join(
separator,
Arrays.stream(s).map(OccurrenceRelationships::normalizeID).collect(Collectors.toList()));
} else {
return null;
}
}

/** Return the code if eligible or null */
public static String hashOrNull(String code, boolean allowNumeric) {
if (allowNumeric) {
return isEligibleCode(code) ? OccurrenceRelationships.normalizeID(code) : null;
} else {
return isEligibleCode(code) && !isNumeric(code)
? OccurrenceRelationships.normalizeID(code)
: null;
}
}

/** Return true if the code is not in the exclusion list or null. */
public static boolean isEligibleCode(String code) {
if (code == null
|| code.length() == 0
|| idOmitList.contains(OccurrenceRelationships.normalizeID(code))) {
return false;
} else {
return true;
}
}

public static boolean isNumeric(String s) {
if (s == null) return false;
try {
Double.parseDouble(s);
return true;
} catch (NumberFormatException nfe) {
return false;
}
}

/** Creates a new exclusion list for IDs. See https://github.com/gbif/pipelines/issues/309. */
public static List<String> newIdOmitList() {
return Arrays.asList(
null,
"",
"[]",
"*",
"--",
normalizeID("NO APLICA"),
normalizeID("NA"),
normalizeID("NO DISPONIBLE"),
normalizeID("NO DISPONIBL"),
normalizeID("NO NUMBER"),
normalizeID("UNKNOWN"),
normalizeID("s.n."),
normalizeID("Unknown s.n."),
normalizeID("Unreadable s.n."),
normalizeID("se kommentar"),
normalizeID("inget id"),
normalizeID("x"),
normalizeID("Anonymous s.n."),
normalizeID("Collector Number: s.n."),
normalizeID("No Number"),
normalizeID("Anonymous"),
normalizeID("None"),
normalizeID("No Field Number"),
normalizeID("not recorded"),
normalizeID("s.l."),
normalizeID("s.c."));
public static Set<String> newIdOmitList() {
return new HashSet(
Arrays.asList(
null,
"",
"[]",
"*",
"--",
normalizeID("NO APLICA"),
normalizeID("NA"),
normalizeID("NO DISPONIBLE"),
normalizeID("NO DISPONIBL"),
normalizeID("NO NUMBER"),
normalizeID("UNKNOWN"),
normalizeID("SN"),
normalizeID("ANONYMOUS"),
normalizeID("NONE"),
normalizeID("s.n."),
normalizeID("Unknown s.n."),
normalizeID("Unreadable s.n."),
normalizeID("se kommentar"),
normalizeID("inget id"),
normalizeID("x"),
normalizeID("Anonymous s.n."),
normalizeID("Collector Number: s.n."),
normalizeID("No Number"),
normalizeID("Anonymous"),
normalizeID("None"),
normalizeID("No Field Number"),
normalizeID("not recorded"),
normalizeID("s.l."),
normalizeID("s.c."),
normalizeID("present"),
normalizeID("Undef/entomo"),
normalizeID("s/nº"),
normalizeID("undef"),
normalizeID("no data")));
}
}
Loading

0 comments on commit f4f6e93

Please sign in to comment.