diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index ad473c28..99794fc9 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -1,35 +1,46 @@
name: Release
-on:
- workflow_dispatch
+permissions:
+ contents: write
+on: workflow_dispatch
jobs:
release:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v2
- - uses: actions/cache@v1
- with:
- path: ~/.m2/repository
- key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
- restore-keys: |
- ${{ runner.os }}-maven-
- - name: Set up JDK 11
- uses: actions/setup-java@v1
- with:
- java-version: 11
- - name: Release
- uses: qcastel/github-actions-maven-release@v1.11.1
- with:
- release-branch-name: "master"
- maven-args: "-P sonatype"
- git-release-bot-name: "release-bot"
- git-release-bot-email: "release-bot@fleetpin.co.nz"
-
- gpg-enabled: "true"
- gpg-key-id: ${{ secrets.GPG_KEY_ID }}
- gpg-key: ${{ secrets.GPG_KEY }}
-
- maven-repo-server-id: sonatype
- maven-repo-server-username: ${{ secrets.MVN_REPO_PRIVATE_REPO_USER }}
- maven-repo-server-password: ${{ secrets.MVN_REPO_PRIVATE_REPO_PASSWORD }}
-
- access-token: ${{ secrets.GITHUB_TOKEN }}
+ - uses: actions/checkout@v4
+
+ - uses: actions/cache@v4
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-maven-
+ - name: Set up JDK 21
+ uses: actions/setup-java@v4
+ with:
+ java-version: 21
+ distribution: zulu
+ - uses: whelk-io/maven-settings-xml-action@v22
+ with:
+ servers: >
+ [
+ { "id": "sonatype", "username": "${{ secrets.MVN_REPO_PRIVATE_REPO_USER }}", "password": "${{ secrets.MVN_REPO_PRIVATE_REPO_PASSWORD }}" }
+ ]
+ - name: set name
+ run: |
+ git config --global user.name "release-bot";
+ git config --global user.email "release-bot@fleetpin.co.nz";
+
+ - name: add key
+ run: |
+ echo "${{ secrets.GPG_KEY }}" | base64 -d > private.key
+ gpg --batch --import ./private.key
+ rm ./private.key
+ gpg --list-secret-keys --keyid-format LONG
+
+ - name: prepare
+ run: |
+ mvn release:prepare -Dusername=${{ secrets.GITHUB_TOKEN }} -P sonatype
+
+ - name: release
+ run: |
+ mvn release:perform -Dusername=${{ secrets.GITHUB_TOKEN }} -P sonatype
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index bee3c8fe..75e8a838 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -1,25 +1,26 @@
name: Test
-on:
+on:
push:
jobs:
build:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v2
- - uses: actions/cache@v1
- with:
- path: ~/.m2/repository
- key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
- restore-keys: |
- ${{ runner.os }}-maven-
- - name: Set up JDK 11
- uses: actions/setup-java@v1
- with:
- java-version: 11
- - name: Build with Maven
- run: mvn -B test --file pom.xml
- - uses: ashley-taylor/junit-report-annotations-action@master
- if: always()
- with:
- access-token: ${{ secrets.GITHUB_TOKEN }}
+ - uses: actions/checkout@v4
+ - uses: actions/cache@v4
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-maven-
+ - name: Set up JDK 21
+ uses: actions/setup-java@v4
+ with:
+ java-version: 21
+ distribution: zulu
+ - name: Build with Maven
+ run: mvn -B test --file pom.xml
+ - uses: ashley-taylor/junit-report-annotations-action@master
+ if: always()
+ with:
+ access-token: ${{ secrets.GITHUB_TOKEN }}
diff --git a/graphql-database-dynmodb-history-lambda/pom.xml b/graphql-database-dynmodb-history-lambda/pom.xml
index 2e172789..d3b4a691 100644
--- a/graphql-database-dynmodb-history-lambda/pom.xml
+++ b/graphql-database-dynmodb-history-lambda/pom.xml
@@ -5,7 +5,7 @@
com.fleetpin
graphql-database-manager
- 0.2.30-SNAPSHOT
+ 3.0.4-SNAPSHOT
graphql-database-dynmodb-history-lambda
@@ -37,9 +37,9 @@
- https://github.com/fleetpin/graphql-dynamodb-manager
- scm:git:https://github.com/fleetpin/graphql-dynamodb-manager.git
- scm:git:https://github.com/fleetpin/graphql-dynamodb-manager.git
+ https://github.com/ashley-taylor/graphql-dynamodb-manager
+ scm:git:https://github.com/ashley-taylor/graphql-dynamodb-manager.git
+ scm:git:https://github.com/ashley-taylor/graphql-dynamodb-manager.git
HEAD
@@ -72,24 +72,20 @@
graphql-database-manager-dynamo
${project.parent.version}
-
com.amazonaws
- aws-java-sdk-dynamodb
- 1.11.724
+ aws-lambda-java-events-sdk-transformer
+ 3.1.0
com.amazonaws
- aws-lambda-java-core
- 1.1.0
+ aws-lambda-java-events
+ 3.14.0
-
com.amazonaws
- aws-lambda-java-events
- 2.2.7
+ aws-lambda-java-core
+ 1.2.3
@@ -98,7 +94,7 @@
org.apache.maven.plugins
maven-dependency-plugin
- 2.10
+ 3.8.0
copy
@@ -129,7 +125,7 @@
attach-sources
- jar
+ jar-no-fork
@@ -137,7 +133,7 @@
org.apache.maven.plugins
maven-javadoc-plugin
- 3.1.1
+ 3.10.0
attach-javadocs
@@ -150,7 +146,7 @@
org.apache.maven.plugins
maven-gpg-plugin
- 1.6
+ 3.2.6
sign-artifacts
@@ -164,7 +160,7 @@
org.sonatype.plugins
nexus-staging-maven-plugin
- 1.6.7
+ 1.7.0
true
sonatype
diff --git a/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/DynamoUtil.java b/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/DynamoUtil.java
deleted file mode 100644
index 6fad3e55..00000000
--- a/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/DynamoUtil.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.fleetpin.graphql.database.dynamo.history.lambda;
-
-import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
-import software.amazon.awssdk.services.dynamodb.model.Identity;
-import software.amazon.awssdk.services.dynamodb.model.Record;
-import software.amazon.awssdk.services.dynamodb.model.StreamRecord;
-
-public class DynamoUtil {
-
- public static Record toV2(DynamodbStreamRecord record) {
- var builder = Record.builder();
- builder.awsRegion(record.getAwsRegion());
- builder.dynamodb(toV2(record.getDynamodb()));
- builder.eventID(record.getEventID());
- builder.eventName(record.getEventName());
- builder.eventSource(record.getEventSource());
- builder.eventVersion(record.getEventVersion());
- builder.userIdentity(toV2(record.getUserIdentity()));
- return builder.build();
- }
-
- public static Identity toV2(com.amazonaws.services.dynamodbv2.model.Identity userIdentity) {
- if (userIdentity == null) {
- return null;
- }
- var builder = Identity.builder();
- builder.principalId(userIdentity.getPrincipalId());
- builder.type(userIdentity.getType());
- return builder.build();
- }
-
- public static StreamRecord toV2(com.amazonaws.services.dynamodbv2.model.StreamRecord dynamodb) {
- var builder = StreamRecord.builder();
-
- builder.approximateCreationDateTime(dynamodb.getApproximateCreationDateTime().toInstant());
- builder.keys(toV2(dynamodb.getKeys()));
- builder.newImage(toV2(dynamodb.getNewImage()));
- builder.oldImage(toV2(dynamodb.getOldImage()));
- builder.sequenceNumber(dynamodb.getSequenceNumber());
- builder.sizeBytes(dynamodb.getSizeBytes());
- builder.streamViewType(dynamodb.getStreamViewType());
-
- return builder.build();
- }
-
- public static Map toV2(Map attribute) {
- if (attribute == null) {
- return null;
- }
- Map toReturn = new HashMap<>();
- attribute.forEach((key, value) -> toReturn.put(key, toV2(value)));
- return toReturn;
- }
-
- public static AttributeValue toV2(com.amazonaws.services.dynamodbv2.model.AttributeValue value) {
- if (value.getB() != null) {
- return AttributeValue.builder().b(SdkBytes.fromByteBuffer(value.getB())).build();
- }
- if (value.getBOOL() != null) {
- return AttributeValue.builder().bool(value.getBOOL()).build();
- }
- if (value.getNULL() != null) {
- return AttributeValue.builder().nul(value.getNULL()).build();
- }
- if (value.getBS() != null) {
- return AttributeValue.builder().bs(value.getBS().stream().map(SdkBytes::fromByteBuffer).collect(Collectors.toList())).build();
- }
- if (value.getL() != null) {
- return AttributeValue.builder().l(value.getL().stream().map(DynamoUtil::toV2).collect(Collectors.toList())).build();
- }
- if (value.getM() != null) {
- return AttributeValue.builder().m(toV2(value.getM())).build();
- }
- if (value.getN() != null) {
- return AttributeValue.builder().n(value.getN()).build();
- }
- if (value.getNS() != null) {
- return AttributeValue.builder().ns(value.getNS()).build();
- }
- if (value.getS() != null) {
- return AttributeValue.builder().s(value.getS()).build();
- }
- if (value.getSS() != null) {
- return AttributeValue.builder().ss(value.getSS()).build();
- }
- throw new RuntimeException("Unknown type " + value);
- }
-}
diff --git a/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/HistoryLambda.java b/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/HistoryLambda.java
index fc162e9c..654ca24f 100644
--- a/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/HistoryLambda.java
+++ b/graphql-database-dynmodb-history-lambda/src/main/java/com/fleetpin/graphql/database/dynamo/history/lambda/HistoryLambda.java
@@ -1,3 +1,15 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
package com.fleetpin.graphql.database.dynamo.history.lambda;
import static java.util.stream.Collectors.groupingBy;
@@ -5,14 +17,13 @@
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
+import com.amazonaws.services.lambda.runtime.events.transformers.v2.DynamodbEventTransformer;
import com.fleetpin.graphql.database.manager.dynamo.HistoryUtil;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Stream;
-import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
@@ -22,39 +33,42 @@ public HistoryLambda() {}
public abstract String getTableName();
- public abstract DynamoDbAsyncClient getClient();
+ public abstract DynamoDbClient getClient();
@Override
public Void handleRequest(DynamodbEvent input, Context context) {
- var records = input.getRecords().stream().map(DynamoUtil::toV2);
+ var records = DynamodbEventTransformer.toRecordsV2(input);
process(records);
return null;
}
- public void process(Stream records) {
+ public void process(List records) {
int chunkSize = 25;
AtomicInteger counter = new AtomicInteger();
var chunks = HistoryUtil
- .toHistoryValue(records)
+ .toHistoryValue(records.stream())
.map(item -> WriteRequest.builder().putRequest(builder -> builder.item(item)).build())
.collect(groupingBy(x -> counter.getAndIncrement() / chunkSize))
.values();
- var futures = chunks
- .stream()
+ chunks
+ .parallelStream()
.filter(chunk -> !chunk.isEmpty())
- .map(chunk -> {
+ .forEach(chunk -> {
var items = new HashMap>();
items.put(getTableName(), chunk);
- return getClient().batchWriteItem(builder -> builder.requestItems(items));
- })
- .toArray(CompletableFuture[]::new);
-
- try {
- CompletableFuture.allOf(futures).get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
+ writeItems(items);
+ });
+ }
+
+ private void writeItems(Map> items) {
+ var response = getClient().batchWriteItem(builder -> builder.requestItems(items));
+
+ var unprocessed = response.unprocessedItems();
+
+ if (!unprocessed.isEmpty()) {
+ writeItems(unprocessed);
}
}
}
diff --git a/graphql-database-manager-core/pom.xml b/graphql-database-manager-core/pom.xml
index 8c3e059d..454a8a79 100644
--- a/graphql-database-manager-core/pom.xml
+++ b/graphql-database-manager-core/pom.xml
@@ -5,13 +5,13 @@
com.fleetpin
graphql-database-manager
- 0.2.30-SNAPSHOT
+ 3.0.4-SNAPSHOT
graphql-database-manager-core
- 5.6.0
+ 5.11.0
UTF-8
@@ -29,9 +29,9 @@
- https://github.com/fleetpin/graphql-dynamodb-manager
- scm:git:https://github.com/fleetpin/graphql-dynamodb-manager.git
- scm:git:https://github.com/fleetpin/graphql-dynamodb-manager.git
+ https://github.com/ashley-taylor/graphql-dynamodb-manager
+ scm:git:https://github.com/ashley-taylor/graphql-dynamodb-manager.git
+ scm:git:https://github.com/ashley-taylor/graphql-dynamodb-manager.git
HEAD
@@ -62,10 +62,16 @@
com.fleetpin
graphql-builder
- 0.1.1
+ 3.0.2
provided
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${jackson.version}
+
+
org.junit.jupiter
junit-jupiter
@@ -78,7 +84,7 @@
org.apache.maven.plugins
maven-dependency-plugin
- 2.10
+ 3.8.0
copy
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DataWriter.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DataWriter.java
index 4992f017..98517f8d 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DataWriter.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DataWriter.java
@@ -1,16 +1,20 @@
package com.fleetpin.graphql.database.manager;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import java.util.function.Function;
public class DataWriter {
private final Function, CompletableFuture> bulkWriter;
private final List toPut = new ArrayList<>();
+ private final Consumer> handleFuture;
- public DataWriter(Function, CompletableFuture> bulkWriter) {
+ public DataWriter(Function, CompletableFuture> bulkWriter, Consumer> handleFuture) {
this.bulkWriter = bulkWriter;
+ this.handleFuture = handleFuture;
}
public int dispatchSize() {
@@ -40,6 +44,7 @@ public CompletableFuture put(String organisationId, T entit
synchronized (toPut) {
toPut.add(putValue);
}
+ handleFuture.accept(future);
return future;
}
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Database.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Database.java
index 86c56529..eac2f4fa 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Database.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Database.java
@@ -15,12 +15,20 @@
import com.fleetpin.graphql.database.manager.access.ForbiddenWriteException;
import com.fleetpin.graphql.database.manager.access.ModificationPermission;
import com.fleetpin.graphql.database.manager.util.BackupItem;
+import com.fleetpin.graphql.database.manager.util.HistoryBackupItem;
import com.fleetpin.graphql.database.manager.util.TableCoreUtil;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -30,6 +38,8 @@
@SuppressWarnings("unchecked")
public class Database {
+ public static ExecutorService VIRTUAL_THREAD_POOL = Executors.newVirtualThreadPerTaskExecutor();
+
private String organisationId;
private final DatabaseDriver driver;
@@ -40,10 +50,13 @@ public class Database {
private final Function> putAllow;
+ private final AtomicInteger submitted;
+
Database(String organisationId, DatabaseDriver driver, ModificationPermission putAllow) {
this.organisationId = organisationId;
this.driver = driver;
this.putAllow = putAllow;
+ this.submitted = new AtomicInteger();
items =
new TableDataLoader<>(
@@ -52,7 +65,8 @@ public class Database {
return driver.get(keys);
},
DataLoaderOptions.newOptions().setMaxBatchSize(driver.maxBatchSize())
- )
+ ),
+ this::handleFuture
); // will auto call global
queries =
@@ -62,7 +76,8 @@ public class Database {
return merge(keys.stream().map(driver::query));
},
DataLoaderOptions.newOptions().setBatchingEnabled(false)
- )
+ ),
+ this::handleFuture
); // will auto call global
queryHistories =
@@ -72,10 +87,11 @@ public class Database {
return merge(keys.stream().map(driver::queryHistory));
},
DataLoaderOptions.newOptions().setBatchingEnabled(false)
- )
+ ),
+ this::handleFuture
); // will auto call global
- put = new DataWriter(driver::bulkPut);
+ put = new DataWriter(driver::bulkPut, this::handleFuture);
}
public CompletableFuture> query(Class type, Function, QueryBuilder> func) {
@@ -119,10 +135,18 @@ public CompletableFuture> takeBackup(String organisationId) {
return driver.takeBackup(organisationId);
}
+ public CompletableFuture> takeHistoryBackup(String organisationId) {
+ return driver.takeHistoryBackup(organisationId);
+ }
+
public CompletableFuture restoreBackup(List entities) {
return driver.restoreBackup(entities);
}
+ public CompletableFuture restoreHistoryBackup(List entities) {
+ return driver.restoreHistoryBackup(entities);
+ }
+
public CompletableFuture> delete(String organisationId, Class clazz) {
return driver.delete(organisationId, clazz);
}
@@ -219,7 +243,7 @@ public CompletableFuture deleteLinks(T entity) {
if (!allow) {
throw new ForbiddenWriteException("Delete links not allowed for " + TableCoreUtil.table(entity.getClass()) + " with id " + entity.getId());
}
- //impact of clearing links to tricky
+ // impact of clearing links to tricky
items.clearAll();
queries.clearAll();
return driver.deleteLinks(organisationId, entity);
@@ -236,18 +260,18 @@ public CompletableFuture destroyOrganisation(final String organisationI
* @param database entity type to update
* @param entity revision must match database or request will fail
* @return updated entity with the revision incremented by one
- * CompletableFuture will fail with a RevisionMismatchException
+ * CompletableFuture will fail with a RevisionMismatchException
*/
public CompletableFuture put(T entity) {
return put(entity, true);
}
/**
- * @param database entity type to update
+ * @param database entity type to update
* @param entity revision must match database or request will fail
- * @param check Will only pass if the entity revision matches what is currently in the database
+ * @param check Will only pass if the entity revision matches what is currently in the database
* @return updated entity with the revision incremented by one
- * CompletableFuture will fail with a RevisionMismatchException
+ * CompletableFuture will fail with a RevisionMismatchException
*/
public CompletableFuture put(T entity, boolean check) {
return putAllow
@@ -296,24 +320,18 @@ private CompletableFuture> merge(Stream> stream
});
}
- private static final Executor DELAYER = CompletableFuture.delayedExecutor(10, TimeUnit.MILLISECONDS);
-
- @SuppressWarnings("rawtypes")
- public void start(CompletableFuture> toReturn) {
- if (toReturn.isDone()) {
- return;
+ private void start() {
+ if (items.dispatchDepth() > 0) {
+ items.dispatch();
}
-
- if (items.dispatchDepth() > 0 || queries.dispatchDepth() > 0 || queryHistories.dispatchDepth() > 0 || put.dispatchSize() > 0) {
- CompletableFuture[] all = new CompletableFuture[] { items.dispatch(), queries.dispatch(), queryHistories.dispatch(), put.dispatch() };
- CompletableFuture
- .allOf(all)
- .whenComplete((response, error) -> {
- //go around again
- start(toReturn);
- });
- } else {
- CompletableFuture.supplyAsync(() -> null, DELAYER).acceptEither(toReturn, __ -> start(toReturn));
+ if (queries.dispatchDepth() > 0) {
+ queries.dispatch();
+ }
+ if (queryHistories.dispatchDepth() > 0) {
+ queryHistories.dispatch();
+ }
+ if (put.dispatchSize() > 0) {
+ put.dispatch();
}
}
@@ -395,6 +413,41 @@ public String newId() {
}
public Set getLinkIds(Table entity, Class extends Table> type) {
- return Collections.unmodifiableSet(TableAccess.getTableLinks(entity).get(TableCoreUtil.table(type)));
+ var links = TableAccess.getTableLinks(entity).get(TableCoreUtil.table(type));
+ if (links == null) {
+ return Collections.emptySet();
+ }
+ return Collections.unmodifiableSet(links);
+ }
+
+ private CompletableFuture> handleFuture(CompletableFuture> future) {
+ if (future.isDone()) {
+ return future;
+ }
+ while (true) {
+ var current = submitted.get();
+ if (current == 0) {
+ if (submitted.compareAndSet(0, 1)) {
+ run();
+ } else {
+ continue;
+ }
+ } else {
+ if (submitted.compareAndSet(current, current + 1)) {
+ return future.thenApplyAsync(t -> t, VIRTUAL_THREAD_POOL);
+ }
+ }
+ }
+ }
+
+ private void run() {
+ VIRTUAL_THREAD_POOL.submit(() -> {
+ var start = submitted.get();
+ start();
+ if (submitted.compareAndSet(start, 0)) {
+ return;
+ }
+ run();
+ });
}
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseDriver.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseDriver.java
index fadeefa6..b8bb75aa 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseDriver.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseDriver.java
@@ -13,10 +13,12 @@
package com.fleetpin.graphql.database.manager;
import com.fleetpin.graphql.database.manager.util.BackupItem;
-import com.google.common.collect.HashMultimap;
+import com.fleetpin.graphql.database.manager.util.HistoryBackupItem;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
public abstract class DatabaseDriver {
@@ -42,8 +44,12 @@ public abstract CompletableFuture> getViaLinks(
public abstract CompletableFuture restoreBackup(List entities);
+ public abstract CompletableFuture restoreHistoryBackup(List entities);
+
public abstract CompletableFuture> takeBackup(String organisationId);
+ public abstract CompletableFuture> takeHistoryBackup(String organisationId);
+
public abstract CompletableFuture> queryHistory(DatabaseQueryHistoryKey key);
public abstract CompletableFuture> queryGlobal(Class type, String value);
@@ -78,7 +84,7 @@ protected void setLinks(final T entity, final String type, fin
entity.setLinks(type, groupIds);
}
- protected HashMultimap getLinks(final T entity) {
+ protected Map> getLinks(final T entity) {
return entity.getLinks();
}
@@ -93,7 +99,7 @@ protected void setUpdatedAt(final T entity, final Instant upda
protected void setSource(
final T entity,
final String sourceTable,
- final HashMultimap links,
+ final Map> links,
final String sourceOrganisationId
) {
entity.setSource(sourceTable, links, sourceOrganisationId);
@@ -106,4 +112,6 @@ protected String getSourceTable(final T entity) {
protected DatabaseKey createDatabaseKey(final String organisationId, final Class type, final String id) {
return new DatabaseKey<>(organisationId, type, id);
}
+
+ protected abstract ScanResult startTableScan(TableScanQuery tableScanQuery, int segment, Object from);
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseKey.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseKey.java
index 9aff2433..98d2e3fe 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseKey.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseKey.java
@@ -12,6 +12,7 @@
package com.fleetpin.graphql.database.manager;
+import com.fleetpin.graphql.database.manager.util.TableCoreUtil;
import java.util.Objects;
public class DatabaseKey {
@@ -22,7 +23,7 @@ public class DatabaseKey {
DatabaseKey(String organisationId, Class type, String id) {
this.organisationId = organisationId;
- this.type = type;
+ this.type = TableCoreUtil.baseClass(type);
this.id = id;
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseManager.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseManager.java
index 74edaee5..e814f90a 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseManager.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/DatabaseManager.java
@@ -2,6 +2,7 @@
import com.fleetpin.graphql.database.manager.access.ModificationPermission;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
public abstract class DatabaseManager {
@@ -18,4 +19,16 @@ public Database getDatabase(String organisationId) {
public Database getDatabase(String organisationId, ModificationPermission putAllow) {
return new Database(organisationId, dynamoDb, putAllow);
}
+
+ public VirtualDatabase getVirtualDatabase(String organisationId) {
+ return new VirtualDatabase(getDatabase(organisationId));
+ }
+
+ public VirtualDatabase getVirtualDatabase(String organisationId, ModificationPermission putAllow) {
+ return new VirtualDatabase(getDatabase(organisationId, putAllow));
+ }
+
+ public TableScanner startTableScan(Function builder) {
+ return new TableScanner(builder.apply(new TableScanQueryBuilder()).build(), dynamoDb, this);
+ }
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Query.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Query.java
index 508e66d3..fff053c4 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Query.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Query.java
@@ -8,15 +8,28 @@ public class Query {
private final String startsWith;
private final String after;
private final Integer limit;
+ private final Integer threadCount;
+ private final Integer threadIndex;
- Query(Class type, String startsWith, String after, Integer limit) {
+ Query(Class type, String startsWith, String after, Integer limit, Integer threadCount, Integer threadIndex) {
if (type == null) {
throw new RuntimeException("type can not be null, did you forget to call .on(Table::class)?");
}
+
+ if (threadCount != null && !isPowerOfTwo(threadCount)) {
+ throw new RuntimeException("Thread count must be a power of two");
+ }
+
+ if ((threadCount != null && threadIndex == null) || (threadCount == null && threadIndex != null)) {
+ throw new RuntimeException("Thread count and thread index must both be defined if you are doing a parallel request");
+ }
+
this.type = type;
this.startsWith = startsWith;
this.after = after;
this.limit = limit;
+ this.threadCount = threadCount;
+ this.threadIndex = threadIndex;
}
public Class getType() {
@@ -35,13 +48,21 @@ public Integer getLimit() {
return limit;
}
+ public Integer getThreadCount() {
+ return threadCount;
+ }
+
+ public Integer getThreadIndex() {
+ return threadIndex;
+ }
+
public boolean hasLimit() {
return getLimit() != null;
}
@Override
public int hashCode() {
- return Objects.hash(after, limit, startsWith, type);
+ return Objects.hash(after, limit, startsWith, type, threadIndex, threadCount);
}
@Override
@@ -54,7 +75,15 @@ public boolean equals(Object obj) {
Objects.equals(after, other.after) &&
Objects.equals(limit, other.limit) &&
Objects.equals(startsWith, other.startsWith) &&
- Objects.equals(type, other.type)
+ Objects.equals(type, other.type) &&
+ Objects.equals(threadCount, other.threadCount) &&
+ Objects.equals(threadIndex, other.threadIndex)
);
}
+
+ static boolean isPowerOfTwo(int n) {
+ if (n == 0) return false;
+
+ return (int) (Math.ceil((Math.log(n) / Math.log(2)))) == (int) (Math.floor(((Math.log(n) / Math.log(2)))));
+ }
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryBuilder.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryBuilder.java
index 701a5701..5828e441 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryBuilder.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryBuilder.java
@@ -8,6 +8,8 @@ public class QueryBuilder {
private String startsWith;
private String after;
private Integer limit;
+ private Integer threadIndex;
+ private Integer threadCount;
private QueryBuilder(Class type) {
this.type = type;
@@ -28,13 +30,23 @@ public QueryBuilder after(String from) {
return this;
}
+ public QueryBuilder threadCount(Integer threadCount) {
+ this.threadCount = threadCount;
+ return this;
+ }
+
+ public QueryBuilder threadIndex(Integer threadIndex) {
+ this.threadIndex = threadIndex;
+ return this;
+ }
+
public QueryBuilder applyMutation(Consumer> mutator) {
mutator.accept((QueryBuilder) this);
return (QueryBuilder) this;
}
public Query build() {
- return new Query(type, startsWith, after, limit);
+ return new Query(type, startsWith, after, limit, threadCount, threadIndex);
}
public static QueryBuilder create(Class type) {
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryHistoryBuilder.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryHistoryBuilder.java
index 034f8520..f8f221f5 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryHistoryBuilder.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/QueryHistoryBuilder.java
@@ -1,7 +1,6 @@
package com.fleetpin.graphql.database.manager;
import com.fleetpin.graphql.database.manager.util.HistoryCoreUtil;
-import com.google.common.base.Preconditions;
import java.time.Instant;
import java.util.function.Consumer;
@@ -55,16 +54,22 @@ public QueryHistoryBuilder applyMutation(Consumer> mut
}
public QueryHistory build() {
- Preconditions.checkArgument(HistoryCoreUtil.hasHistory(type), "Can only do history when history annotation is present.");
- Preconditions.checkArgument(!(id != null && startsWith != null), "ID and StartsWith cannot both be set.");
- Preconditions.checkArgument(!(id == null && startsWith == null), "ID or StartsWith must be set.");
+ checkArgument(HistoryCoreUtil.hasHistory(type), "Can only do history when history annotation is present.");
+ checkArgument(!(id != null && startsWith != null), "ID and StartsWith cannot both be set.");
+ checkArgument(!(id == null && startsWith == null), "ID or StartsWith must be set.");
if (fromRevision != null || toRevision != null) {
- Preconditions.checkArgument(fromUpdatedAt == null && toUpdatedAt == null, "Revision and CreatedAt cannot both be set.");
- Preconditions.checkArgument(startsWith == null, "StartsWith can only be used with updatedAt.");
+ checkArgument(fromUpdatedAt == null && toUpdatedAt == null, "Revision and CreatedAt cannot both be set.");
+ checkArgument(startsWith == null, "StartsWith can only be used with updatedAt.");
}
return new QueryHistory(type, startsWith, id, fromRevision, toRevision, fromUpdatedAt, toUpdatedAt);
}
+ private void checkArgument(boolean pass, String msg) {
+ if (!pass) {
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
public static QueryHistoryBuilder create(Class type) {
return new QueryHistoryBuilder(type);
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/ScanResult.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/ScanResult.java
new file mode 100644
index 00000000..6b679292
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/ScanResult.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.fleetpin.graphql.database.manager;
+
+import java.util.ArrayList;
+import java.util.function.Consumer;
+
+public record ScanResult(ArrayList- > items, Object next) {
+ public record Item(String organisationId, T entity, Consumer replace, Consumer delete) {}
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/ScanUpdater.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/ScanUpdater.java
new file mode 100644
index 00000000..5064843e
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/ScanUpdater.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.fleetpin.graphql.database.manager;
+
+import com.fleetpin.graphql.database.manager.ScanResult.Item;
+import java.util.function.BiConsumer;
+
+public record ScanUpdater(Class> type, BiConsumer, T> updater) {
+ public static class ScanContext {
+
+ private VirtualDatabase virtualDatabase;
+ private Item item;
+
+ protected ScanContext(VirtualDatabase virtualDatabase, Item item) {
+ this.virtualDatabase = virtualDatabase;
+ this.item = item;
+ }
+
+ public VirtualDatabase getVirtualDatabase() {
+ return virtualDatabase;
+ }
+
+ public void delete() {
+ item.delete().accept(item.entity());
+ }
+
+ public void replace(T entity) {
+ item.replace().accept(entity);
+ }
+ }
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Table.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Table.java
index f2d6bfff..35e48514 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Table.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/Table.java
@@ -15,10 +15,13 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fleetpin.graphql.builder.annotations.GraphQLIgnore;
import com.fleetpin.graphql.builder.annotations.Id;
-import com.google.common.collect.HashMultimap;
import java.time.Instant;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
public abstract class Table {
@@ -34,7 +37,7 @@ public abstract class Table {
private String sourceOrganistaionId;
@JsonIgnore
- private HashMultimap links = HashMultimap.create();
+ private Map> links = new HashMap<>();
@Id
public String getId() {
@@ -91,7 +94,7 @@ String getSourceTable() {
return sourceTable;
}
- void setSource(String sourceTable, HashMultimap links, String sourceOrganisationId) {
+ void setSource(String sourceTable, Map> links, String sourceOrganisationId) {
//so bad data does not cause error
if (createdAt == null) {
createdAt = Instant.MIN;
@@ -111,13 +114,12 @@ String getSourceOrganisationId() {
}
void setLinks(String type, Collection groupIds) {
- this.links.removeAll(type);
- this.links.putAll(type, groupIds);
+ this.links.put(type, new HashSet<>(groupIds));
}
@JsonIgnore
@GraphQLIgnore
- HashMultimap getLinks() {
+ Map> getLinks() {
return links;
}
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableAccess.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableAccess.java
index f8b54141..5f2c6adf 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableAccess.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableAccess.java
@@ -1,12 +1,13 @@
package com.fleetpin.graphql.database.manager;
-import com.google.common.collect.HashMultimap;
+import java.util.Map;
+import java.util.Set;
public interface TableAccess {
public static void setTableSource(
final T table,
final String sourceTable,
- final HashMultimap links,
+ final Map> links,
final String sourceOrganisationId
) {
table.setSource(sourceTable, links, sourceOrganisationId);
@@ -16,7 +17,7 @@ public static String getTableSourceOrganisation(final T table)
return table.getSourceOrganisationId();
}
- public static HashMultimap getTableLinks(final T table) {
+ public static Map> getTableLinks(final T table) {
return table.getLinks();
}
}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableDataLoader.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableDataLoader.java
index 0287607b..29e39ff3 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableDataLoader.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableDataLoader.java
@@ -2,24 +2,29 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.dataloader.DataLoader;
public class TableDataLoader {
private final DataLoader loader;
+ private final Function, CompletableFuture>> handleFuture;
- TableDataLoader(DataLoader loader) {
+ TableDataLoader(DataLoader loader, Function, CompletableFuture>> handleFuture) {
this.loader = loader;
+ this.handleFuture = handleFuture;
}
public CompletableFuture load(K key) {
- return (CompletableFuture) loader.load(key);
+ var future = loader.load(key);
+ return (CompletableFuture) this.handleFuture.apply(future);
}
public CompletableFuture
> loadMany(List keys) {
- //annoying waste of memory/cpu to get around cast :(
- return loader.loadMany(keys).thenApply(r -> r.stream().map(t -> (T) t).collect(Collectors.toList()));
+ // annoying waste of memory/cpu to get around cast :(
+ var future = loader.loadMany(keys).thenApply(r -> r.stream().map(t -> (T) t).collect(Collectors.toList()));
+ return (CompletableFuture>) this.handleFuture.apply(future);
}
public void clear(K key) {
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanQuery.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanQuery.java
new file mode 100644
index 00000000..5078ee76
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanQuery.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.fleetpin.graphql.database.manager;
+
+import java.util.List;
+
+public record TableScanQuery(TableScanMonitor monitor, Integer parallelism, List> updaters) {
+ interface TableScanMonitor {
+ public void onScanSegmentStart(int segment, int itemCount, Object from);
+
+ public void onScanSegmentComplete(int segment);
+ }
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanQueryBuilder.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanQueryBuilder.java
new file mode 100644
index 00000000..9b8aac56
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanQueryBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.fleetpin.graphql.database.manager;
+
+import com.fleetpin.graphql.database.manager.ScanUpdater.ScanContext;
+import com.fleetpin.graphql.database.manager.TableScanQuery.TableScanMonitor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+public class TableScanQueryBuilder {
+
+ private int parallelism = Runtime.getRuntime().availableProcessors() * 5;
+ private List> updaters = new ArrayList<>();
+ private TableScanMonitor monitor;
+
+ public TableScanQueryBuilder parallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public TableScanQueryBuilder updater(ScanUpdater> updater) {
+ updaters.add(updater);
+ return this;
+ }
+
+ public TableScanQueryBuilder updater(Class type, BiConsumer, T> updater) {
+ updaters.add(new ScanUpdater(type, updater));
+ return this;
+ }
+
+ public TableScanQuery build() {
+ return new TableScanQuery(monitor, parallelism, updaters);
+ }
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanner.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanner.java
new file mode 100644
index 00000000..441d1365
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/TableScanner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.fleetpin.graphql.database.manager;
+
+import com.fleetpin.graphql.database.manager.ScanResult.Item;
+import com.fleetpin.graphql.database.manager.ScanUpdater.ScanContext;
+import java.util.ArrayList;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public class TableScanner {
+
+ private final TableScanQuery query;
+ private final DatabaseDriver driver;
+ private final DatabaseManager databaseManager;
+
+ public TableScanner(TableScanQuery query, DatabaseDriver driver, DatabaseManager databaseManager) {
+ this.query = query;
+ this.driver = driver;
+ this.databaseManager = databaseManager;
+ }
+
+ public CompletableFuture start() {
+ var workers = new ArrayList>();
+ for (int i = 0; i < query.parallelism(); i++) {
+ var segment = i;
+ workers.add(CompletableFuture.supplyAsync(new ScannerWorker(segment), Database.VIRTUAL_THREAD_POOL));
+ }
+ return CompletableFuture.allOf(workers.toArray(CompletableFuture[]::new));
+ }
+
+ private CompletableFuture process(Item item) {
+ for (var updater : query.updaters()) {
+ if (updater.type().isAssignableFrom(item.entity().getClass())) {
+ @SuppressWarnings("unchecked")
+ ScanUpdater update = (ScanUpdater) updater;
+ return CompletableFuture.runAsync(
+ () -> {
+ var virtualDatabase = databaseManager.getVirtualDatabase(item.organisationId());
+ update.updater().accept(new ScanContext(virtualDatabase, item), item.entity());
+ },
+ Database.VIRTUAL_THREAD_POOL
+ );
+ }
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private class ScannerWorker implements Supplier {
+
+ private final int segment;
+
+ public ScannerWorker(int segment) {
+ this.segment = segment;
+ }
+
+ @Override
+ public ScannerWorker get() {
+ Object from = null;
+ do {
+ var scan = driver.startTableScan(query, segment, from);
+ if (query.monitor() != null) {
+ query.monitor().onScanSegmentStart(segment, scan.items().size(), from);
+ }
+ from = scan.next();
+
+ var workers = new ArrayList>();
+ for (var item : scan.items()) {
+ workers.add(process(item));
+ }
+
+ CompletableFuture.allOf(workers.toArray(CompletableFuture[]::new)).join();
+ } while (from != null);
+ if (query.monitor() != null) {
+ query.monitor().onScanSegmentComplete(segment);
+ }
+
+ return this;
+ }
+ }
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/VirtualDatabase.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/VirtualDatabase.java
new file mode 100644
index 00000000..8071adef
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/VirtualDatabase.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.fleetpin.graphql.database.manager;
+
+import com.fleetpin.graphql.database.manager.util.BackupItem;
+import com.fleetpin.graphql.database.manager.util.HistoryBackupItem;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+public class VirtualDatabase {
+
+ private final Database database;
+
+ public VirtualDatabase(Database database) {
+ this.database = database;
+ }
+
+ public List delete(String organisationId, Class type) {
+ return database.delete(organisationId, type).join();
+ }
+
+ public T delete(T entity, boolean deleteLinks) {
+ return database.delete(entity, deleteLinks).join();
+ }
+
+ public List getLinks(final Table entry, Class target) {
+ return database.getLinks(entry, target).join();
+ }
+
+ public Boolean destroyOrganisation(final String organisationId) {
+ return database.destroyOrganisation(organisationId).join();
+ }
+
+ public T get(Class type, String id) {
+ return database.get(type, id).join();
+ }
+
+ public List get(Class type, List ids) {
+ return database.get(type, ids).join();
+ }
+
+ public T getLink(final Table entry, Class target) {
+ return database.getLink(entry, target).join();
+ }
+
+ public Set getLinkIds(Table entity, Class extends Table> type) {
+ return database.getLinkIds(entity, type);
+ }
+
+ public Optional getLinkOptional(final Table entry, Class target) {
+ return database.getLinkOptional(entry, target).join();
+ }
+
+ public Optional getOptional(Class type, String id) {
+ return database.getOptional(type, id).join();
+ }
+
+ public String getSourceOrganisationId(Table entity) {
+ return database.getSourceOrganisationId(entity);
+ }
+
+ public T link(T entity, Class extends Table> class1, String targetId) {
+ return database.link(entity, class1, targetId).join();
+ }
+
+ public T links(T entity, Class extends Table> class1, List targetIds) {
+ return database.links(entity, class1, targetIds).join();
+ }
+
+ public String newId() {
+ return database.newId();
+ }
+
+ public T put(T entity) {
+ return database.put(entity).join();
+ }
+
+ public T put(T entity, boolean check) {
+ return database.put(entity, check).join();
+ }
+
+ public T putGlobal(T entity) {
+ return database.putGlobal(entity).join();
+ }
+
+ public List query(Class type) {
+ return database.query(type).join();
+ }
+
+ public List query(Query query) {
+ return database.query(query).join();
+ }
+
+ public List query(Class type, Function, QueryBuilder> func) {
+ return database.query(type, func).join();
+ }
+
+ public List queryGlobal(Class type, String id) {
+ return database.queryGlobal(type, id).join();
+ }
+
+ public T queryGlobalUnique(Class type, String id) {
+ return database.queryGlobalUnique(type, id).join();
+ }
+
+ public List queryHistory(QueryHistory query) {
+ return database.queryHistory(query).join();
+ }
+
+ public List querySecondary(Class type, String id) {
+ return database.querySecondary(type, id).join();
+ }
+
+ public T querySecondaryUnique(Class type, String id) {
+ return database.querySecondaryUnique(type, id).join();
+ }
+
+ public Void restoreBackup(List entities) {
+ return database.restoreBackup(entities).join();
+ }
+
+ public Void restoreHistoryBackup(List entities) {
+ return database.restoreHistoryBackup(entities).join();
+ }
+
+ public void setOrganisationId(String organisationId) {
+ database.setOrganisationId(organisationId);
+ }
+
+ public List takeBackup(String organisationId) {
+ return database.takeBackup(organisationId).join();
+ }
+
+ public List takeHistoryBackup(String organisationId) {
+ return database.takeHistoryBackup(organisationId).join();
+ }
+
+ public T takeHistoryBackup(final T entity, final Class extends Table> type, final String targetId) {
+ return database.unlink(entity, type, targetId).join();
+ }
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/BackupItem.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/BackupItem.java
index 257f00d6..8f4d33dd 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/BackupItem.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/BackupItem.java
@@ -12,16 +12,15 @@
package com.fleetpin.graphql.database.manager.util;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.HashMultimap;
import java.util.Map;
+import java.util.Set;
public interface BackupItem {
String getTable();
- Map getItem();
+ Map getItem();
- HashMultimap getLinks();
+ Map> getLinks();
String getId();
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/HistoryBackupItem.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/HistoryBackupItem.java
new file mode 100644
index 00000000..c61401ec
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/HistoryBackupItem.java
@@ -0,0 +1,15 @@
+package com.fleetpin.graphql.database.manager.util;
+
+import java.nio.ByteBuffer;
+
+public interface HistoryBackupItem extends BackupItem {
+ String getOrganisationIdType();
+
+ byte[] getIdRevision();
+
+ byte[] getIdDate();
+
+ byte[] getStartsWithUpdatedAt();
+
+ Long getUpdatedAt();
+}
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/TableCoreUtil.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/TableCoreUtil.java
index 27bdcbe3..cde76597 100644
--- a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/TableCoreUtil.java
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/util/TableCoreUtil.java
@@ -10,17 +10,26 @@
public final class TableCoreUtil {
public static String table(Class extends Table> type) {
+ var tmp = baseClass(type);
+ var name = tmp.getDeclaredAnnotation(TableName.class);
+ if (name == null) {
+ return type.getSimpleName().toLowerCase() + "s";
+ } else {
+ return name.value();
+ }
+ }
+
+ public static Class baseClass(Class type) {
Class> tmp = type;
TableName name = null;
while (name == null && tmp != null) {
name = tmp.getDeclaredAnnotation(TableName.class);
+ if (name != null) {
+ return (Class) tmp;
+ }
tmp = tmp.getSuperclass();
}
- if (name == null) {
- return type.getSimpleName().toLowerCase() + "s";
- } else {
- return name.value();
- }
+ return type;
}
public static CompletableFuture> all(List> collect) {
diff --git a/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/virtual/VirtualDataRunner.java b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/virtual/VirtualDataRunner.java
new file mode 100644
index 00000000..29cdc20e
--- /dev/null
+++ b/graphql-database-manager-core/src/main/java/com/fleetpin/graphql/database/manager/virtual/VirtualDataRunner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.fleetpin.graphql.database.manager.virtual;
+
+import com.fleetpin.graphql.builder.DataFetcherRunner;
+import com.fleetpin.graphql.builder.annotations.Context;
+import com.fleetpin.graphql.database.manager.Database;
+import com.fleetpin.graphql.database.manager.VirtualDatabase;
+import graphql.schema.DataFetcher;
+import java.lang.reflect.Method;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+public class VirtualDataRunner implements DataFetcherRunner {
+
+ @Override
+ public DataFetcher> manage(Method method, DataFetcher> fetcher) {
+ for (var parameter : method.getParameterTypes()) {
+ if (parameter.isAssignableFrom(VirtualDatabase.class) || hasContext(parameter)) {
+ var isCompletableFuture = CompletionStage.class.isAssignableFrom(method.getReturnType());
+ return env -> {
+ var result = CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return fetcher.get(env);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException runtime) {
+ throw runtime;
+ }
+ throw new RuntimeException(e);
+ }
+ },
+ Database.VIRTUAL_THREAD_POOL
+ );
+
+ if (isCompletableFuture) {
+ return result.thenCompose(r -> (CompletionStage>) r);
+ }
+
+ return result;
+ };
+ }
+ }
+
+ return fetcher;
+ }
+
+ private boolean hasContext(Class> parameter) {
+ if (parameter.isAnnotationPresent(Context.class)) {
+ return true;
+ }
+
+ for (var inter : parameter.getInterfaces()) {
+ if (hasContext(inter)) {
+ return true;
+ }
+ }
+ var parent = parameter.getSuperclass();
+ if (parent != null && hasContext(parent)) {
+ return true;
+ }
+ return false;
+ }
+}
diff --git a/graphql-database-manager-core/src/test/java/com/fleetpin/graphql/database/manager/virtual/VirtualDataRunnerTest.java b/graphql-database-manager-core/src/test/java/com/fleetpin/graphql/database/manager/virtual/VirtualDataRunnerTest.java
new file mode 100644
index 00000000..6f1f6293
--- /dev/null
+++ b/graphql-database-manager-core/src/test/java/com/fleetpin/graphql/database/manager/virtual/VirtualDataRunnerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.fleetpin.graphql.database.manager.virtual;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.fleetpin.graphql.builder.annotations.Context;
+import com.fleetpin.graphql.database.manager.VirtualDatabase;
+import graphql.schema.DataFetcher;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.junit.jupiter.api.Test;
+
+@SuppressWarnings("unchecked")
+public class VirtualDataRunnerTest {
+
+ @Test
+ public void testNormalMethod() throws Exception {
+ var runner = new VirtualDataRunner();
+
+ DataFetcher> fetcher = g -> "test";
+
+ var replacement = runner.manage(String.class.getMethod("toLowerCase"), fetcher);
+
+ // no context or database so returns itself
+ assertEquals(fetcher, replacement);
+
+ assertEquals("test", replacement.get(null));
+ }
+
+ @Test
+ public void testContext() throws Exception {
+ var runner = new VirtualDataRunner();
+
+ DataFetcher> fetcher = g -> "test";
+
+ var replacement = runner.manage(VirtualDataRunnerTest.class.getMethod("needsContext", TestContext.class), fetcher);
+
+ assertNotEquals(fetcher, replacement);
+
+ CompletableFuture future = (CompletableFuture) replacement.get(null);
+
+ assertEquals("test", future.join());
+ }
+
+ @Test
+ public void testDatabase() throws Exception {
+ var runner = new VirtualDataRunner();
+
+ DataFetcher> fetcher = g -> "test";
+
+ var replacement = runner.manage(VirtualDataRunnerTest.class.getMethod("needsDb", VirtualDatabase.class), fetcher);
+
+ assertNotEquals(fetcher, replacement);
+
+ CompletableFuture future = (CompletableFuture) replacement.get(null);
+
+ assertEquals("test", future.join());
+ }
+
+ @Test
+ public void testException() throws Exception {
+ var runner = new VirtualDataRunner();
+
+ var exception = new RuntimeException("failed");
+ DataFetcher> fetcher = g -> {
+ throw exception;
+ };
+
+ var replacement = runner.manage(VirtualDataRunnerTest.class.getMethod("needsDb", VirtualDatabase.class), fetcher);
+
+ assertNotEquals(fetcher, replacement);
+
+ CompletableFuture future = (CompletableFuture) replacement.get(null);
+
+ var got = assertThrows(CompletionException.class, future::join);
+ assertEquals(exception, got.getCause());
+ }
+
+ @Context
+ record TestContext() {}
+
+ public static String needsContext(TestContext context) {
+ return "";
+ }
+
+ public static String needsDb(VirtualDatabase db) {
+ return "";
+ }
+}
diff --git a/graphql-database-manager-dynamo/pom.xml b/graphql-database-manager-dynamo/pom.xml
index a4f7a6f3..aca71a3b 100644
--- a/graphql-database-manager-dynamo/pom.xml
+++ b/graphql-database-manager-dynamo/pom.xml
@@ -5,14 +5,14 @@
com.fleetpin
graphql-database-manager
- 0.2.30-SNAPSHOT
+ 3.0.4-SNAPSHOT
graphql-database-manager-dynamo
- 5.6.0
+ 5.11.0
UTF-8
@@ -30,9 +30,9 @@
- https://github.com/fleetpin/graphql-dynamodb-manager
- scm:git:https://github.com/fleetpin/graphql-dynamodb-manager.git
- scm:git:https://github.com/fleetpin/graphql-dynamodb-manager.git
+ https://github.com/ashley-taylor/graphql-dynamodb-manager
+ scm:git:https://github.com/ashley-taylor/graphql-dynamodb-manager.git
+ scm:git:https://github.com/ashley-taylor/graphql-dynamodb-manager.git
HEAD
@@ -60,6 +60,16 @@
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.google.guava
+ guava
+ 33.3.1-jre
+
software.amazon.awssdk
dynamodb
@@ -68,7 +78,7 @@
com.fleetpin
graphql-builder
- 0.1.5
+ 3.0.2
provided
@@ -90,7 +100,7 @@
org.apache.maven.plugins
maven-dependency-plugin
- 2.10
+ 3.8.0