Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Added recordedByID into hdfs query builder and etc.
  • Loading branch information
muttcg committed Mar 13, 2020
1 parent 4b8d8b2 commit 2ddab83
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ public static String getHiveType(Term term) {
* Checks if the term is stored as an Hive array.
*/
public static boolean isHiveArray(Term term) {
return GbifTerm.mediaType == term || GbifTerm.issue == term || GbifInternalTerm.networkKey == term;
return GbifTerm.mediaType == term
|| GbifTerm.issue == term
|| GbifInternalTerm.networkKey == term
|| GbifTerm.recordedByID == term;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public class TermUtils {

private static final Set<? extends Term> COMPLEX_TYPE = ImmutableSet.of(GbifTerm.mediaType,
GbifTerm.issue,
GbifInternalTerm.networkKey);
GbifInternalTerm.networkKey,
GbifTerm.recordedByID);

private static final Set<? extends Term> INTERPRETED_DOUBLE = ImmutableSet.of(DwcTerm.decimalLatitude,
DwcTerm.decimalLongitude,
Expand Down Expand Up @@ -159,6 +160,7 @@ public class TermUtils {
GbifTerm.depthAccuracy,
GbifInternalTerm.unitQualifier,
GbifTerm.issue,
GbifTerm.recordedByID,
DcTerm.references,
GbifTerm.datasetKey,
GbifTerm.publishingCountry,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.gbif.occurrence.common.json;

import java.io.IOException;
import java.util.List;

import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.type.CollectionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;

/**
* Utility class to serialize and deserialize Strings instances from/to JSON.
*/
public class ListStringSerDeserUtils {

private static final Logger LOG = LoggerFactory.getLogger(ListStringSerDeserUtils.class);
private static final String SER_ERROR_MSG = "Unable to serialize list of string objects to JSON";
private static final String DESER_ERROR_MSG = "Unable to deserialize String into list of string objects";

private static final ObjectMapper MAPPER = new ObjectMapper();

static {
// Don't change this section, methods used here guarantee backwards compatibility with Jackson 1.8.8
MAPPER.configure(DeserializationConfig.Feature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
MAPPER.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
MAPPER.getSerializationConfig().setSerializationInclusion(JsonSerialize.Inclusion.ALWAYS);
}

private static final CollectionType LIST_STRINGS_TYPE =
MAPPER.getTypeFactory().constructCollectionType(List.class, String.class);


private ListStringSerDeserUtils() {
// private constructor
}

/**
* Converts the list of string objects into a JSON string.
*/
public static String toJson(List<String> strings) {
try {
if (strings != null && !strings.isEmpty()) {
return MAPPER.writeValueAsString(strings);
}
} catch (IOException e) {
logAndRethrow(SER_ERROR_MSG, e);
}
return "";
}

/**
* Converts a Json string into a list of string objects.
*/
public static List<String> fromJson(String strings) {
try {
return MAPPER.readValue(strings, LIST_STRINGS_TYPE);
} catch (IOException e) {
logAndRethrow(DESER_ERROR_MSG, e);
}
return null;
}

/**
* Logs an error and re-throws the exception.
*/
private static void logAndRethrow(String message, Throwable throwable) {
LOG.error(message, throwable);
Throwables.propagate(throwable);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -75,7 +74,6 @@ public static Map<String, String> buildInterpretedOccurrenceMap(Occurrence occur
interpretedOccurrence.put(GbifTerm.typifiedName.simpleName(), occurrence.getTypifiedName());
interpretedOccurrence.put(GbifTerm.lastParsed.simpleName(), getSimpleValue(occurrence.getLastParsed()));
interpretedOccurrence.put(GbifTerm.lastInterpreted.simpleName(), getSimpleValue(occurrence.getLastInterpreted()));
interpretedOccurrence.put(GbifTerm.recordedByID.simpleName(), getSimpleValue(occurrence.getRecordedByIds()));

Optional.ofNullable(occurrence.getVerbatimField(DcTerm.identifier))
.ifPresent(x -> interpretedOccurrence.put(DcTerm.identifier.simpleName(), x));
Expand Down Expand Up @@ -138,6 +136,7 @@ public static Map<String, String> buildInterpretedOccurrenceMap(Occurrence occur

extractOccurrenceIssues(occurrence).ifPresent(issues -> interpretedOccurrence.put(GbifTerm.issue.simpleName(), issues));
extractMediaTypes(occurrence).ifPresent(mediaTypes -> interpretedOccurrence.put(GbifTerm.mediaType.simpleName(), mediaTypes));
extractRecordedByIds(occurrence).ifPresent(uids -> interpretedOccurrence.put(GbifTerm.recordedByID.simpleName(), uids));

// Sampling
interpretedOccurrence.put(DwcTerm.sampleSizeUnit.simpleName(), occurrence.getSampleSizeUnit());
Expand Down Expand Up @@ -220,17 +219,6 @@ private static String getSimpleValue(Object value) {
return null;
}

/**
* Transform a list of UserIdentifier's into simple string, like:
* ORCID:1312312,METADATA:123123,OTHER:12312
*/
private static String getSimpleValue(List<UserIdentifier> recordedByIds) {
return recordedByIds.stream()
.map(x -> x.getType().name() + ":" + x.getValue())
.collect(Collectors.joining(","));
}


/**
* Validates if the occurrence record it's a repatriated record.
*/
Expand All @@ -244,6 +232,15 @@ private static Optional<String> getRepatriated(Occurrence occurrence) {
return Optional.empty();
}

/**
* Extracts the recordedById types from the record.
*/
private static Optional<String> extractRecordedByIds(Occurrence occurrence) {
return Optional.ofNullable(occurrence.getRecordedByIds())
.map(uis -> uis.stream().map(UserIdentifier::getValue)
.collect(Collectors.joining(";")));
}

/**
* Extracts the media types from the record.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import com.google.common.base.CharMatcher;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -100,13 +101,8 @@ public class HiveQueryVisitor {
// where query to execute a select all
private static final String ALL_QUERY = "true";

private static final String MEDIATYPE_CONTAINS_FMT = "array_contains(" +
HiveColumnsUtils.getHiveColumn(GbifTerm.mediaType) + ",'%s')";
private static final String ISSUE_CONTAINS_FMT = "array_contains(" +
HiveColumnsUtils.getHiveColumn(GbifTerm.issue) + ",'%s')";

private static final String NETWORK_KEY_CONTAINS_FMT = "array_contains(" +
HiveColumnsUtils.getHiveColumn(GbifInternalTerm.networkKey) + ",'%s')";
private static final Function<Term, String> ARRAY_FN = t ->
"array_contains(" + HiveColumnsUtils.getHiveColumn(t) + ",'%s')";

private static final String HIVE_ARRAY_PRE = "ARRAY";

Expand Down Expand Up @@ -186,6 +182,7 @@ public class HiveQueryVisitor {
.put(OccurrenceSearchParameter.RELATIVE_ORGANISM_QUANTITY, GbifTerm.relativeOrganismQuantity)
.put(OccurrenceSearchParameter.COLLECTION_KEY, GbifInternalTerm.collectionKey)
.put(OccurrenceSearchParameter.INSTITUTION_KEY, GbifInternalTerm.institutionKey)
.put(OccurrenceSearchParameter.RECORDED_BY_ID, GbifTerm.recordedByID)
.build();

private final Joiner commaJoiner = Joiner.on(", ").skipNulls();
Expand Down Expand Up @@ -313,11 +310,13 @@ public void visit(EqualsPredicate predicate) throws QueryBuildingException {
appendTaxonKeyFilter(predicate.getValue());
} else if (OccurrenceSearchParameter.MEDIA_TYPE == predicate.getKey()) {
Optional.ofNullable(VocabularyUtils.lookupEnum(predicate.getValue(), MediaType.class))
.ifPresent( mediaType -> builder.append(String.format(MEDIATYPE_CONTAINS_FMT, mediaType.name())));
.ifPresent(mediaType -> builder.append(String.format(ARRAY_FN.apply(GbifTerm.mediaType), mediaType.name())));
} else if (OccurrenceSearchParameter.ISSUE == predicate.getKey()) {
builder.append(String.format(ISSUE_CONTAINS_FMT, predicate.getValue().toUpperCase()));
builder.append(String.format(ARRAY_FN.apply(GbifTerm.issue), predicate.getValue().toUpperCase()));
} else if (OccurrenceSearchParameter.NETWORK_KEY == predicate.getKey()) {
builder.append(String.format(NETWORK_KEY_CONTAINS_FMT, predicate.getValue()));
builder.append(String.format(ARRAY_FN.apply(GbifInternalTerm.networkKey), predicate.getValue()));
} else if (OccurrenceSearchParameter.RECORDED_BY_ID == predicate.getKey()) {
builder.append(String.format(ARRAY_FN.apply(GbifTerm.recordedByID), predicate.getValue()));
} else {
visitSimplePredicate(predicate, EQUALS_OPERATOR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public enum Group {
Pair.of(Group.INTERPRETED, DwcTerm.establishmentMeans),
Pair.of(Group.INTERPRETED, GbifTerm.lastInterpreted),
Pair.of(Group.INTERPRETED, GbifTerm.mediaType),
Pair.of(Group.INTERPRETED, GbifTerm.issue)
Pair.of(Group.INTERPRETED, GbifTerm.issue),
Pair.of(Group.INTERPRETED, GbifTerm.recordedByID)
);

/**
Expand Down Expand Up @@ -136,7 +137,7 @@ public enum Group {
Pair.of(Group.INTERPRETED, GbifTerm.speciesKey),
Pair.of(Group.INTERPRETED, DcTerm.license)
);

/**
* The terms that will be included in the species list for downloads
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public final class HiveDataTypes {
public static final String TYPE_ARRAY_STRING = "ARRAY<STRING>";
// An index of types for terms, if used in the interpreted context
private static final Map<Term, String> TYPED_TERMS;
private static final Set<Term> ARRAY_STRING_TERMS = ImmutableSet.of(GbifTerm.mediaType, GbifTerm.issue, GbifInternalTerm.networkKey);
private static final Set<Term> ARRAY_STRING_TERMS =
ImmutableSet.of(GbifTerm.mediaType, GbifTerm.issue, GbifInternalTerm.networkKey, GbifTerm.recordedByID);

// dates are all stored as BigInt
private static final Set<Term> BIGINT_TERMS = ImmutableSet.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
package org.gbif.occurrence.persistence.util;

import com.google.common.base.Splitter;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.gbif.api.model.common.Identifier;
import org.gbif.api.model.common.MediaObject;
import org.gbif.api.model.occurrence.Occurrence;
import org.gbif.api.model.occurrence.UserIdentifier;
import org.gbif.api.model.occurrence.VerbatimOccurrence;
import org.gbif.api.util.ClassificationUtils;
import org.gbif.api.util.VocabularyUtils;
Expand All @@ -30,33 +41,27 @@
import org.gbif.hbase.util.ResultReader;
import org.gbif.occurrence.common.TermUtils;
import org.gbif.occurrence.common.json.ExtensionSerDeserUtils;
import org.gbif.occurrence.common.json.ListStringSerDeserUtils;
import org.gbif.occurrence.common.json.MediaSerDeserUtils;
import org.gbif.occurrence.persistence.api.Fragment;
import org.gbif.occurrence.persistence.hbase.Columns;
import org.gbif.occurrence.persistence.hbase.ExtResultReader;

import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.validation.ValidationException;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import javax.annotation.Nullable;
import javax.validation.ValidationException;

/**
* A utility class to build object models from the HBase occurrence "row".
*/
Expand Down Expand Up @@ -243,6 +248,7 @@ public static Occurrence buildOccurrence(@Nullable Result row) {
occ.setIdentifiers(extractIdentifiers(key, row));
occ.setIssues(extractIssues(row));
occ.setMedia(buildMedia(row));
occ.setRecordedByIds(buildRecordedByIds(row));

//It should be replaced by License.fromString(value).orNull() but conflicts of Guava versions avoid its usage
occ.setLicense(VocabularyUtils.lookupEnum(ExtResultReader.getString(row, DcTerm.license), License.class));
Expand Down Expand Up @@ -369,4 +375,20 @@ public static List<MediaObject> buildMedia(Result result) {
return media;
}

private static List<UserIdentifier> buildRecordedByIds(Result result) {
List<UserIdentifier> ids = null;
String idsJson = ExtResultReader.getString(result, Columns.column(GbifTerm.recordedByID));
if (idsJson != null && !idsJson.isEmpty()) {
try {
ids = Optional.ofNullable(ListStringSerDeserUtils.fromJson(idsJson))
.map(x -> x.stream().map(UserIdentifier::new).collect(Collectors.toList()))
.orElse(Collections.emptyList());
} catch (Exception e) {
LOG.warn("Unable to deserialize recordedById objects from hbase", e);
}
}

return ids;
}

}

0 comments on commit 2ddab83

Please sign in to comment.