From 21eeca0451505fe291ebf122afb800f39c1ec808 Mon Sep 17 00:00:00 2001 From: Siham Hussein Date: Wed, 17 Jul 2024 09:24:15 -0500 Subject: [PATCH 1/6] use loading cache in google cloud executor --- .../GoogleCloudIdempotentImportExecutor.java | 83 +++++++++++++------ 1 file changed, 59 insertions(+), 24 deletions(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index 84d94182a..56aa4cdb7 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -9,6 +9,9 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.datatransferproject.api.launcher.Monitor; @@ -22,6 +25,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import static java.lang.String.format; @@ -40,8 +44,8 @@ public class GoogleCloudIdempotentImportExecutor implements IdempotentImportExec private final ObjectMapper objectMapper; // These are all variables corresponding to the job state. Only initialized when setJobId() is called - private Map knownValues; - private Map errors; + private LoadingCache> knownValuesCache = null; + private LoadingCache> errorsCache = null; private UUID jobId; private String jobIdPrefix; @@ -69,12 +73,12 @@ public T executeOrThrowException( String idempotentId, String itemName, Callable callable) throws Exception { Preconditions.checkNotNull(jobId, "executing a callable before initialization of a job"); - if (knownValues.containsKey(idempotentId)) { + if (knownValues.get(idempotentId).isPresent()) { monitor.debug( - () -> - jobIdPrefix - + format("Using cached key %s from cache for %s", idempotentId, itemName)); - return (T) knownValues.get(idempotentId); + () -> + jobIdPrefix + + format("Using cached key %s from cache for %s", idempotentId, itemName)); + return (T) knownValues.get(idempotentId).get(); } try { @@ -98,54 +102,64 @@ public T executeOrThrowException( private void addResult(String idempotentId, T result) throws IOException { - knownValues.put(idempotentId, result); - try { Transaction transaction = datastore.newTransaction(); - transaction.put(createResultEntity(idempotentId, result)); - if (errors.containsKey(idempotentId)) { - // if the errors contain this key, that means the ID + if (errorsCache.get(idempotentId).isPresent()) { + // if the errors contain this key, that means the ID failed perviously, + // since it is succeeding now, remove it from the errors datastore transaction.delete(getErrorKey(idempotentId, jobId)); - errors.remove(idempotentId); } transaction.commit(); } catch (DatastoreException e) { monitor.severe(() -> jobIdPrefix + "Error writing result to datastore: " + e); + } finally { + errorsCache.invalidate(idempotentId); + knownValuesCache.put(idempotentId, Optional.of(result)); } } private void addError(String idempotentId, ErrorDetail errorDetail) throws IOException { - errors.put(idempotentId, errorDetail); try { Transaction transaction = datastore.newTransaction(); transaction.put(createErrorEntity(idempotentId, errorDetail)); transaction.commit(); } catch (DatastoreException e) { monitor.severe(() -> jobIdPrefix + "Error writing ErrorDetails to datastore: " + e); + } finally { + errorsCache.put(idempotentId, Optional.of(errorDetail)); } } @Override public T getCachedValue(String idempotentId) throws IllegalArgumentException { - if (!knownValues.containsKey(idempotentId)) { - throw new IllegalArgumentException( - idempotentId - + " is not a known key, known keys: " - + Joiner.on(", ").join(knownValues.keySet())); + try { + if (knownValuesCache.get(idempotentId).isEmpty()) { + throw new IllegalArgumentException(idempotentId + " is not a known key"); + } + return (T) knownValuesCache.get(idempotentId).get(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); } - return (T) knownValues.get(idempotentId); } @Override public boolean isKeyCached(String idempotentId) { - return knownValues.containsKey(idempotentId); + try { + return knownValuesCache.get(idempotentId).isPresent(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } } @Override public Collection getErrors() { - return ImmutableList.copyOf(errors.values()); + try { + return ImmutableList.copyOf(getErrorDetailsForJob(jobId).values()); + } catch (IOException e) { + throw new IllegalStateException(e); + } } // In non-tests setJobId is only ever called once per executor, so the initialization of @@ -154,9 +168,30 @@ public Collection getErrors() { public void setJobId(UUID jobId) { Preconditions.checkNotNull(jobId); this.jobId = jobId; - this.knownValues = getKnownValuesForJob(jobId); - this.errors = getErrorDetailsForJob(jobId); jobIdPrefix = "Job " + jobId + ": "; + this.knownValuesCache = + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(30)) + .build( + new CacheLoader>() { + @Override + public Optional load(String idempotentId) { + return getKnownValue(idempotentId); + } + }); + + this.errorsCache = + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(30)) + .build( + new CacheLoader>() { + @Override + public Optional load(String idempotentId) { + return getError(idempotentId); + } + }); } private Map getKnownValuesForJob(UUID jobId) { From d60f3a29d15ca015bc4c083b6fb6c84060a2db8c Mon Sep 17 00:00:00 2001 From: Siham Hussein Date: Wed, 17 Jul 2024 09:43:36 -0500 Subject: [PATCH 2/6] add getError and getKnownValue methods for loading cache --- .../GoogleCloudIdempotentImportExecutor.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index 56aa4cdb7..a072a3731 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -20,9 +20,10 @@ import java.io.IOException; import java.io.Serializable; -import java.util.Collection; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -73,12 +74,12 @@ public T executeOrThrowException( String idempotentId, String itemName, Callable callable) throws Exception { Preconditions.checkNotNull(jobId, "executing a callable before initialization of a job"); - if (knownValues.get(idempotentId).isPresent()) { + if (knownValuesCache.get(idempotentId).isPresent()) { monitor.debug( () -> jobIdPrefix + format("Using cached key %s from cache for %s", idempotentId, itemName)); - return (T) knownValues.get(idempotentId).get(); + return (T) knownValuesCache.get(idempotentId).get(); } try { @@ -194,6 +195,28 @@ public Optional load(String idempotentId) { }); } + private Optional getKnownValue(String idempotentId) { + Preconditions.checkNotNull(jobId); + try { + return Optional.of(datastore.get(getResultKey(idempotentId)).getString(RESULTS_FIELD)); + } catch (Exception e) { + return Optional.empty(); + } + } + + private Optional getError(String idempotentId) { + Preconditions.checkNotNull(jobId); + try { + Entity error = datastore.get(getErrorKey(idempotentId)); + return error == null + ? Optional.empty() + : Optional.of( + objectMapper.readerFor(ErrorDetail.class).readValue(error.getString(ERROR_FIELD))); + } catch (IOException e) { + return Optional.empty(); + } + } + private Map getKnownValuesForJob(UUID jobId) { Map dataStoreKnownValues = new HashMap<>(); Query query = From 83ea1377470ba79f692bcb0ec14895c62638a8d8 Mon Sep 17 00:00:00 2001 From: Siham Hussein Date: Wed, 17 Jul 2024 09:52:38 -0500 Subject: [PATCH 3/6] fix build error --- .../cloud/google/GoogleCloudIdempotentImportExecutor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index a072a3731..70764edc8 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -207,7 +208,7 @@ private Optional getKnownValue(String idempotentId) { private Optional getError(String idempotentId) { Preconditions.checkNotNull(jobId); try { - Entity error = datastore.get(getErrorKey(idempotentId)); + Entity error = datastore.get(getErrorKey(idempotentId, jobId)); return error == null ? Optional.empty() : Optional.of( From 2a630c4a48a9d615953e276880faff4dd2a14dd4 Mon Sep 17 00:00:00 2001 From: Siham Hussein Date: Wed, 17 Jul 2024 10:11:37 -0500 Subject: [PATCH 4/6] fix typo --- .../cloud/google/GoogleCloudIdempotentImportExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index 70764edc8..29cb6093f 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -199,7 +199,7 @@ public Optional load(String idempotentId) { private Optional getKnownValue(String idempotentId) { Preconditions.checkNotNull(jobId); try { - return Optional.of(datastore.get(getResultKey(idempotentId)).getString(RESULTS_FIELD)); + return Optional.of(datastore.get(getResultsKey(idempotentId, jobId)).getString(RESULTS_FIELD)); } catch (Exception e) { return Optional.empty(); } From 52ae014780b16c4924810bb6246f7abae6fe8843 Mon Sep 17 00:00:00 2001 From: Siham Hussein Date: Wed, 17 Jul 2024 10:15:01 -0500 Subject: [PATCH 5/6] couple formatting fixes --- .../GoogleCloudIdempotentImportExecutor.java | 100 +++++++++--------- 1 file changed, 51 insertions(+), 49 deletions(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index 29cb6093f..567f17ee6 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -1,12 +1,13 @@ package org.datatransferproject.cloud.google; +import static java.lang.String.format; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.cloud.datastore.*; import com.google.cloud.datastore.StructuredQuery.CompositeFilter; import com.google.cloud.datastore.StructuredQuery.PropertyFilter; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; @@ -14,10 +15,6 @@ import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.datatransferproject.api.launcher.Monitor; -import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; -import org.datatransferproject.types.transfer.errors.ErrorDetail; - import java.io.IOException; import java.io.Serializable; import java.time.Duration; @@ -28,8 +25,9 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; - -import static java.lang.String.format; +import org.datatransferproject.api.launcher.Monitor; +import org.datatransferproject.spi.transfer.idempotentexecutor.IdempotentImportExecutor; +import org.datatransferproject.types.transfer.errors.ErrorDetail; public class GoogleCloudIdempotentImportExecutor implements IdempotentImportExecutor { @@ -45,7 +43,8 @@ public class GoogleCloudIdempotentImportExecutor implements IdempotentImportExec private final Monitor monitor; private final ObjectMapper objectMapper; - // These are all variables corresponding to the job state. Only initialized when setJobId() is called + // These are all variables corresponding to the job state. Only initialized when setJobId() is + // called private LoadingCache> knownValuesCache = null; private LoadingCache> errorsCache = null; private UUID jobId; @@ -77,9 +76,9 @@ public T executeOrThrowException( if (knownValuesCache.get(idempotentId).isPresent()) { monitor.debug( - () -> - jobIdPrefix - + format("Using cached key %s from cache for %s", idempotentId, itemName)); + () -> + jobIdPrefix + + format("Using cached key %s from cache for %s", idempotentId, itemName)); return (T) knownValuesCache.get(idempotentId).get(); } @@ -172,34 +171,35 @@ public void setJobId(UUID jobId) { this.jobId = jobId; jobIdPrefix = "Job " + jobId + ": "; this.knownValuesCache = - CacheBuilder.newBuilder() - .maximumSize(1000) - .expireAfterAccess(Duration.ofMinutes(30)) - .build( - new CacheLoader>() { - @Override - public Optional load(String idempotentId) { - return getKnownValue(idempotentId); - } - }); + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(30)) + .build( + new CacheLoader>() { + @Override + public Optional load(String idempotentId) { + return getKnownValue(idempotentId); + } + }); this.errorsCache = - CacheBuilder.newBuilder() - .maximumSize(1000) - .expireAfterAccess(Duration.ofMinutes(30)) - .build( - new CacheLoader>() { - @Override - public Optional load(String idempotentId) { - return getError(idempotentId); - } - }); + CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterAccess(Duration.ofMinutes(30)) + .build( + new CacheLoader>() { + @Override + public Optional load(String idempotentId) { + return getError(idempotentId); + } + }); } private Optional getKnownValue(String idempotentId) { Preconditions.checkNotNull(jobId); try { - return Optional.of(datastore.get(getResultsKey(idempotentId, jobId)).getString(RESULTS_FIELD)); + return Optional.of( + datastore.get(getResultsKey(idempotentId, jobId)).getString(RESULTS_FIELD)); } catch (Exception e) { return Optional.empty(); } @@ -210,8 +210,8 @@ private Optional getError(String idempotentId) { try { Entity error = datastore.get(getErrorKey(idempotentId, jobId)); return error == null - ? Optional.empty() - : Optional.of( + ? Optional.empty() + : Optional.of( objectMapper.readerFor(ErrorDetail.class).readValue(error.getString(ERROR_FIELD))); } catch (IOException e) { return Optional.empty(); @@ -260,7 +260,6 @@ private Map getErrorDetailsForJob(UUID jobId) { return datastoreKnownErrors; } - private Entity createResultEntity(String idempotentId, T result) throws IOException { return createResultEntity(idempotentId, this.jobId, result); @@ -270,10 +269,14 @@ private Entity createResultEntity(String idempotentId, Entity createResultEntity(String idempotentId, UUID jobId, T result) throws IOException { return GoogleCloudUtils.createEntityBuilder( - getResultsKey(idempotentId, jobId), - ImmutableMap.of( - RESULTS_FIELD, result, JOB_ID_FIELD, jobId.toString(), IDEMPOTENT_ID_FIELD, - idempotentId)) + getResultsKey(idempotentId, jobId), + ImmutableMap.of( + RESULTS_FIELD, + result, + JOB_ID_FIELD, + jobId.toString(), + IDEMPOTENT_ID_FIELD, + idempotentId)) .build(); } @@ -289,17 +292,16 @@ private Entity createErrorEntity(String idempotentId, ErrorDetail error) throws } @VisibleForTesting - Entity createErrorEntity(String idempotentId, UUID jobId, ErrorDetail error) - throws IOException { + Entity createErrorEntity(String idempotentId, UUID jobId, ErrorDetail error) throws IOException { return GoogleCloudUtils.createEntityBuilder( - getErrorKey(idempotentId, jobId), - ImmutableMap.of( - ERROR_FIELD, - objectMapper.writeValueAsString(error), - JOB_ID_FIELD, - jobId.toString(), - IDEMPOTENT_ID_FIELD, - idempotentId)) + getErrorKey(idempotentId, jobId), + ImmutableMap.of( + ERROR_FIELD, + objectMapper.writeValueAsString(error), + JOB_ID_FIELD, + jobId.toString(), + IDEMPOTENT_ID_FIELD, + idempotentId)) .build(); } From acd29bab139e2bee1d090f4ea2232a5c2aaa1ce9 Mon Sep 17 00:00:00 2001 From: Siham Hussein Date: Wed, 17 Jul 2024 10:26:39 -0500 Subject: [PATCH 6/6] fix exception catching --- .../cloud/google/GoogleCloudIdempotentImportExecutor.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java index 567f17ee6..dbab051a0 100644 --- a/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java +++ b/extensions/cloud/portability-cloud-google/src/main/java/org/datatransferproject/cloud/google/GoogleCloudIdempotentImportExecutor.java @@ -112,7 +112,7 @@ private void addResult(String idempotentId, T result) transaction.delete(getErrorKey(idempotentId, jobId)); } transaction.commit(); - } catch (DatastoreException e) { + } catch (DatastoreException | ExecutionException e) { monitor.severe(() -> jobIdPrefix + "Error writing result to datastore: " + e); } finally { errorsCache.invalidate(idempotentId); @@ -156,11 +156,7 @@ public boolean isKeyCached(String idempotentId) { @Override public Collection getErrors() { - try { - return ImmutableList.copyOf(getErrorDetailsForJob(jobId).values()); - } catch (IOException e) { - throw new IllegalStateException(e); - } + return ImmutableList.copyOf(getErrorDetailsForJob(jobId).values()); } // In non-tests setJobId is only ever called once per executor, so the initialization of