diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index 84d94182a..dbab051a0 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -1,29 +1,33 @@ package org.datatransferproject.cloud.google; +import static java.lang.String.format; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.cloud.datastore.*; import com.google.cloud.datastore.StructuredQuery.CompositeFilter; import com.google.cloud.datastore.StructuredQuery.PropertyFilter; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.datatransferproject.api.launcher.Monitor; -import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; -import org.datatransferproject.types.transfer.errors.ErrorDetail; - import java.io.IOException; import java.io.Serializable; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; - -import static java.lang.String.format; +import java.util.concurrent.ExecutionException; +import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; +import org.datatransferproject.types.transfer.errors.ErrorDetail; public class GoogleCloudIdempotentImportExecutor implements IdempotentImportExecutor { @@ -39,9 +43,10 @@ public class GoogleCloudIdempotentImportExecutor implements IdempotentImportExec private final Monitor monitor; private final ObjectMapper objectMapper; - // These are all variables corresponding to the job state. Only initialized when setJobId() is called - private Map knownValues; - private Map errors; + // These are all variables corresponding to the job state. Only initialized when setJobId() is + // called + private LoadingCache> knownValuesCache = null; + private LoadingCache> errorsCache = null; private UUID jobId; private String jobIdPrefix; @@ -69,12 +74,12 @@ public T executeOrThrowException( String idempotentId, String itemName, Callable callable) throws Exception { Preconditions.checkNotNull(jobId, "executing a callable before initialization of a job"); - if (knownValues.containsKey(idempotentId)) { + if (knownValuesCache.get(idempotentId).isPresent()) { monitor.debug( () -> jobIdPrefix + format("Using cached key %s from cache for %s", idempotentId, itemName)); - return (T) knownValues.get(idempotentId); + return (T) knownValuesCache.get(idempotentId).get(); } try { @@ -98,54 +103,60 @@ public T executeOrThrowException( private void addResult(String idempotentId, T result) throws IOException { - knownValues.put(idempotentId, result); - try { Transaction transaction = datastore.newTransaction(); - transaction.put(createResultEntity(idempotentId, result)); - if (errors.containsKey(idempotentId)) { - // if the errors contain this key, that means the ID + if (errorsCache.get(idempotentId).isPresent()) { + // if the errors contain this key, that means the ID failed perviously, + // since it is succeeding now, remove it from the errors datastore transaction.delete(getErrorKey(idempotentId, jobId)); - errors.remove(idempotentId); } transaction.commit(); - } catch (DatastoreException e) { + } catch (DatastoreException | ExecutionException e) { monitor.severe(() -> jobIdPrefix + "Error writing result to datastore: " + e); + } finally { + errorsCache.invalidate(idempotentId); + knownValuesCache.put(idempotentId, Optional.of(result)); } } private void addError(String idempotentId, ErrorDetail errorDetail) throws IOException { - errors.put(idempotentId, errorDetail); try { Transaction transaction = datastore.newTransaction(); transaction.put(createErrorEntity(idempotentId, errorDetail)); transaction.commit(); } catch (DatastoreException e) { monitor.severe(() -> jobIdPrefix + "Error writing ErrorDetails to datastore: " + e); + } finally { + errorsCache.put(idempotentId, Optional.of(errorDetail)); } } @Override public T getCachedValue(String idempotentId) throws IllegalArgumentException { - if (!knownValues.containsKey(idempotentId)) { - throw new IllegalArgumentException( - idempotentId - + " is not a known key, known keys: " - + Joiner.on(", ").join(knownValues.keySet())); + try { + if (knownValuesCache.get(idempotentId).isEmpty()) { + throw new IllegalArgumentException(idempotentId + " is not a known key"); + } + return (T) knownValuesCache.get(idempotentId).get(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); } - return (T) knownValues.get(idempotentId); } @Override public boolean isKeyCached(String idempotentId) { - return knownValues.containsKey(idempotentId); + try { + return knownValuesCache.get(idempotentId).isPresent(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } } @Override public Collection getErrors() { - return ImmutableList.copyOf(errors.values()); + return ImmutableList.copyOf(getErrorDetailsForJob(jobId).values()); } // In non-tests setJobId is only ever called once per executor, so the initialization of @@ -154,9 +165,53 @@ public Collection getErrors() { public void setJobId(UUID jobId) { Preconditions.checkNotNull(jobId); this.jobId = jobId; - this.knownValues = getKnownValuesForJob(jobId); - this.errors = getErrorDetailsForJob(jobId); jobIdPrefix = "Job " + jobId + ": "; + this.knownValuesCache = + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(30)) + .build( + new CacheLoader>() { + @Override + public Optional load(String idempotentId) { + return getKnownValue(idempotentId); + } + }); + + this.errorsCache = + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(30)) + .build( + new CacheLoader>() { + @Override + public Optional load(String idempotentId) { + return getError(idempotentId); + } + }); + } + + private Optional getKnownValue(String idempotentId) { + Preconditions.checkNotNull(jobId); + try { + return Optional.of( + datastore.get(getResultsKey(idempotentId, jobId)).getString(RESULTS_FIELD)); + } catch (Exception e) { + return Optional.empty(); + } + } + + private Optional getError(String idempotentId) { + Preconditions.checkNotNull(jobId); + try { + Entity error = datastore.get(getErrorKey(idempotentId, jobId)); + return error == null + ? Optional.empty() + : Optional.of( + objectMapper.readerFor(ErrorDetail.class).readValue(error.getString(ERROR_FIELD))); + } catch (IOException e) { + return Optional.empty(); + } } private Map getKnownValuesForJob(UUID jobId) { @@ -201,7 +256,6 @@ private Map getErrorDetailsForJob(UUID jobId) { return datastoreKnownErrors; } - private Entity createResultEntity(String idempotentId, T result) throws IOException { return createResultEntity(idempotentId, this.jobId, result); @@ -211,10 +265,14 @@ private Entity createResultEntity(String idempotentId, Entity createResultEntity(String idempotentId, UUID jobId, T result) throws IOException { return GoogleCloudUtils.createEntityBuilder( - getResultsKey(idempotentId, jobId), - ImmutableMap.of( - RESULTS_FIELD, result, JOB_ID_FIELD, jobId.toString(), IDEMPOTENT_ID_FIELD, - idempotentId)) + getResultsKey(idempotentId, jobId), + ImmutableMap.of( + RESULTS_FIELD, + result, + JOB_ID_FIELD, + jobId.toString(), + IDEMPOTENT_ID_FIELD, + idempotentId)) .build(); } @@ -230,17 +288,16 @@ private Entity createErrorEntity(String idempotentId, ErrorDetail error) throws } @VisibleForTesting - Entity createErrorEntity(String idempotentId, UUID jobId, ErrorDetail error) - throws IOException { + Entity createErrorEntity(String idempotentId, UUID jobId, ErrorDetail error) throws IOException { return GoogleCloudUtils.createEntityBuilder( - getErrorKey(idempotentId, jobId), - ImmutableMap.of( - ERROR_FIELD, - objectMapper.writeValueAsString(error), - JOB_ID_FIELD, - jobId.toString(), - IDEMPOTENT_ID_FIELD, - idempotentId)) + getErrorKey(idempotentId, jobId), + ImmutableMap.of( + ERROR_FIELD, + objectMapper.writeValueAsString(error), + JOB_ID_FIELD, + jobId.toString(), + IDEMPOTENT_ID_FIELD, + idempotentId)) .build(); }