diff --git a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java index 6f4d1ff96..416357365 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java +++ b/modules/api/src/main/java/org/apache/fluo/api/client/SnapshotBase.java @@ -17,7 +17,9 @@ import java.util.Collection; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.fluo.api.client.scanner.ScannerBuilder; import org.apache.fluo.api.data.Bytes; @@ -166,4 +168,22 @@ Map> gets(Collection rows, * @return transactions start timestamp allocated from Oracle. */ long getStartTimestamp(); + + default CompletableFuture getsAsync(String row, Column column) { + return CompletableFuture.completedFuture(gets(row, column)); + } + + default CompletableFuture getsAsync(String row, Column column, String defaultValue) { + return CompletableFuture.completedFuture(gets(row, column, defaultValue)); + } + + default CompletableFuture getAsync(Bytes row, Column column) { + return CompletableFuture.completedFuture(get(row, column)); + } + + default CompletableFuture getAsync(Bytes row, Column column, Bytes defaultValue) { + return CompletableFuture.completedFuture(get(row, column, defaultValue)); + } + + } diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java b/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java new file mode 100644 index 000000000..c549c5aa1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.core.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.collect.Collections2; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.core.impl.TransactionImpl; + +public class AsyncReader { + private BlockingQueue asyncGetsQueue; + private ExecutorService executorService; + private TransactionImpl tx; + + public AsyncReader(TransactionImpl tx) { + this.tx = tx; + asyncGetsQueue = new LinkedBlockingQueue<>(); + executorService = Executors.newSingleThreadExecutor(); + } + + public CompletableFuture get(Bytes row, Column column) { + return get(row, column, null); + } + + public CompletableFuture get(Bytes row, Column column, Bytes defaultValue) { + AsyncGet curAsyncGet = new AsyncGet(row, column, defaultValue); + asyncGetsQueue.add(curAsyncGet); + + executorService.submit(() -> { + List getsList = new ArrayList<>(); + asyncGetsQueue.drainTo(getsList); + + try { + Collection rowColumns = Collections2.transform(getsList, ag -> ag.rc); + + Map getsMap = tx.get(rowColumns); + + for (AsyncGet asyncGet : getsList) { + Bytes result = getsMap.get(asyncGet.rc); + asyncGet.res.complete(result == null ? asyncGet.defaultValue : result); + } + } catch (Exception e) { + for (AsyncGet asyncGet : getsList) { + asyncGet.res.completeExceptionally(e); + } + } + }); + + return curAsyncGet.res; + } + + public CompletableFuture gets(String row, Column column) { + return gets(row, column, null); + } + + public CompletableFuture gets(String row, Column column, String defaultValue) { + Bytes defaultValueBytes = defaultValue == null ? new Bytes() : Bytes.of(defaultValue); + return get(Bytes.of(row), column, defaultValueBytes).thenApply(b -> b.toString()); + } + + public void close() { + executorService.shutdown(); + } + + static class AsyncGet { + RowColumn rc; + CompletableFuture res; + Bytes defaultValue; + + public AsyncGet(Bytes row, Column column, Bytes defaultValue) { + rc = new RowColumn(row, column); + res = new CompletableFuture<>(); + this.defaultValue = defaultValue; + } + } +} diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index 0abdafedc..0dd4ea7b4 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; @@ -75,6 +76,7 @@ import org.apache.fluo.core.async.SyncCommitObserver; import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException; import org.apache.fluo.core.exceptions.StaleScanException; +import org.apache.fluo.core.impl.AsyncReader; import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl; import org.apache.fluo.core.oracle.Stamp; import org.apache.fluo.core.util.ByteUtil; @@ -135,6 +137,8 @@ private static enum TxStatus { private TransactorNode tnode = null; private TxStatus status = TxStatus.OPEN; private boolean commitAttempted = false; + private AsyncReader asyncReader = null; + // for testing private boolean stopAfterPreCommit = false; @@ -294,6 +298,33 @@ private Map getImpl(Bytes row, Set columns, return ret; } + @Override + public CompletableFuture getAsync(Bytes row, Column column) { + return getAsyncReader().get(row, column); + } + + @Override + public CompletableFuture getAsync(Bytes row, Column column, Bytes defaultValue) { + return getAsyncReader().get(row, column, defaultValue); + } + + @Override + public CompletableFuture getsAsync(String row, Column column) { + return getAsyncReader().gets(row, column); + } + + @Override + public CompletableFuture getsAsync(String row, Column column, String defaultValue) { + return getAsyncReader().gets(row, column, defaultValue); + } + + private AsyncReader getAsyncReader() { + if (asyncReader == null) { + asyncReader = new AsyncReader(this); + } + return asyncReader; + } + @Override public ScannerBuilder scanner() { checkIfOpen(); @@ -780,6 +811,10 @@ private Long getTransactorID() { } private synchronized void close(boolean checkForStaleScan) { + if (asyncReader != null) { + asyncReader.close(); + } + if (status != TxStatus.CLOSED) { status = TxStatus.CLOSED; diff --git a/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java new file mode 100644 index 000000000..ff1f38706 --- /dev/null +++ b/modules/integration/src/test/java/org/apache/fluo/integration/impl/TransactionImplIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.integration.impl; + +import java.util.concurrent.CompletableFuture; + +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.integration.ITBaseImpl; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests TransactionImpl classes + */ +public class TransactionImplIT extends ITBaseImpl { + + @Test + public void testgetsAsync() throws Exception { + try (Transaction tx = client.newTransaction()) { + tx.set("row1", new Column("col1"), "val1"); + tx.set("row2", new Column("col2"), "val2"); + tx.set("row3", new Column("col3"), "val3"); + + tx.commit(); + } + + try (Transaction tx = client.newTransaction()) { + CompletableFuture res1 = tx.getsAsync("row1", new Column("col1")); + CompletableFuture res2 = tx.getsAsync("row2", new Column("col2"), "foo"); + CompletableFuture res3 = tx.getsAsync("row3", new Column("col3")); + CompletableFuture res4 = tx.getsAsync("row4", new Column("col4"), "val4"); + + Assert.assertEquals("val1", res1.get()); + Assert.assertEquals("val2", res2.get()); + Assert.assertEquals("val3", res3.get()); + Assert.assertEquals("val4", res4.get()); + } + } + + @Test + public void testgetAsync() throws Exception { + Bytes row1 = Bytes.of("row1"); + Bytes row2 = Bytes.of("row2"); + Bytes row3 = Bytes.of("row3"); + Bytes row4 = Bytes.of("row4"); + + Bytes val1 = Bytes.of("val1"); + Bytes val2 = Bytes.of("val2"); + Bytes val3 = Bytes.of("val3"); + Bytes val4 = Bytes.of("val4"); + + try (Transaction tx = client.newTransaction()) { + tx.set(row1, new Column("col1"), val1); + tx.set(row2, new Column("col2"), val2); + tx.set(row3, new Column("col3"), val3); + + tx.commit(); + } + + try (Transaction tx = client.newTransaction()) { + CompletableFuture res1 = tx.getAsync(row1, new Column("col1")); + CompletableFuture res2 = tx.getAsync(row2, new Column("col2"), Bytes.of("foo")); + CompletableFuture res3 = tx.getAsync(row3, new Column("col3")); + CompletableFuture res4 = tx.getAsync(row4, new Column("col4"), Bytes.of("val4")); + + Assert.assertEquals(val1, res1.get()); + Assert.assertEquals(val2, res2.get()); + Assert.assertEquals(val3, res3.get()); + Assert.assertEquals(val4, res4.get()); + } + } + + +}