Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-967 #969

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.util.Collection;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
Expand Down Expand Up @@ -166,4 +168,22 @@ Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
* @return transactions start timestamp allocated from Oracle.
*/
long getStartTimestamp();

default CompletableFuture<String> getsAsync(String row, Column column) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need Bytes getAsync(Bytes row, Column column) and Bytes getAsync(Bytes row, Column column) methods also. The string methods could just call the bytes methods.

return CompletableFuture.completedFuture(gets(row, column));
}

default CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
return CompletableFuture.completedFuture(gets(row, column, defaultValue));
}

default CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
return CompletableFuture.completedFuture(get(row, column));
}

default CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
return CompletableFuture.completedFuture(get(row, column, defaultValue));
}


}
100 changes: 100 additions & 0 deletions modules/core/src/main/java/org/apache/fluo/core/impl/AsyncReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.core.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import com.google.common.collect.Collections2;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.impl.TransactionImpl;

public class AsyncReader {
private BlockingQueue<AsyncGet> asyncGetsQueue;
private ExecutorService executorService;
private TransactionImpl tx;

public AsyncReader(TransactionImpl tx) {
this.tx = tx;
asyncGetsQueue = new LinkedBlockingQueue<>();
executorService = Executors.newSingleThreadExecutor();
}

public CompletableFuture<Bytes> get(Bytes row, Column column) {
return get(row, column, null);
}

public CompletableFuture<Bytes> get(Bytes row, Column column, Bytes defaultValue) {
AsyncGet curAsyncGet = new AsyncGet(row, column, defaultValue);
asyncGetsQueue.add(curAsyncGet);

executorService.submit(() -> {
List<AsyncGet> getsList = new ArrayList<>();
asyncGetsQueue.drainTo(getsList);

try {
Collection<RowColumn> rowColumns = Collections2.transform(getsList, ag -> ag.rc);

Map<RowColumn, Bytes> getsMap = tx.get(rowColumns);

for (AsyncGet asyncGet : getsList) {
Bytes result = getsMap.get(asyncGet.rc);
asyncGet.res.complete(result == null ? asyncGet.defaultValue : result);
}
} catch (Exception e) {
for (AsyncGet asyncGet : getsList) {
asyncGet.res.completeExceptionally(e);
}
}
});

return curAsyncGet.res;
}

public CompletableFuture<String> gets(String row, Column column) {
return gets(row, column, null);
}

public CompletableFuture<String> gets(String row, Column column, String defaultValue) {
Bytes defaultValueBytes = defaultValue == null ? new Bytes() : Bytes.of(defaultValue);
return get(Bytes.of(row), column, defaultValueBytes).thenApply(b -> b.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like the use of thenApply here.

}

public void close() {
executorService.shutdown();
}

static class AsyncGet {
RowColumn rc;
CompletableFuture<Bytes> res;
Bytes defaultValue;

public AsyncGet(Bytes row, Column column, Bytes defaultValue) {
rc = new RowColumn(row, column);
res = new CompletableFuture<>();
this.defaultValue = defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.apache.fluo.core.async.SyncCommitObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.AsyncReader;
import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.util.ByteUtil;
Expand Down Expand Up @@ -135,6 +137,8 @@ private static enum TxStatus {
private TransactorNode tnode = null;
private TxStatus status = TxStatus.OPEN;
private boolean commitAttempted = false;
private AsyncReader asyncReader = null;


// for testing
private boolean stopAfterPreCommit = false;
Expand Down Expand Up @@ -294,6 +298,33 @@ private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns,
return ret;
}

@Override
public CompletableFuture<Bytes> getAsync(Bytes row, Column column) {
return getAsyncReader().get(row, column);
}

@Override
public CompletableFuture<Bytes> getAsync(Bytes row, Column column, Bytes defaultValue) {
return getAsyncReader().get(row, column, defaultValue);
}

@Override
public CompletableFuture<String> getsAsync(String row, Column column) {
return getAsyncReader().gets(row, column);
}

@Override
public CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
return getAsyncReader().gets(row, column, defaultValue);
}

private AsyncReader getAsyncReader() {
if (asyncReader == null) {
asyncReader = new AsyncReader(this);
}
return asyncReader;
}

@Override
public ScannerBuilder scanner() {
checkIfOpen();
Expand Down Expand Up @@ -780,6 +811,10 @@ private Long getTransactorID() {
}

private synchronized void close(boolean checkForStaleScan) {
if (asyncReader != null) {
asyncReader.close();
}

if (status != TxStatus.CLOSED) {
status = TxStatus.CLOSED;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.integration.impl;

import java.util.concurrent.CompletableFuture;

import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.integration.ITBaseImpl;
import org.junit.Assert;
import org.junit.Test;

/**
* Tests TransactionImpl classes
*/
public class TransactionImplIT extends ITBaseImpl {

@Test
public void testgetsAsync() throws Exception {
try (Transaction tx = client.newTransaction()) {
tx.set("row1", new Column("col1"), "val1");
tx.set("row2", new Column("col2"), "val2");
tx.set("row3", new Column("col3"), "val3");

tx.commit();
}

try (Transaction tx = client.newTransaction()) {
CompletableFuture<String> res1 = tx.getsAsync("row1", new Column("col1"));
CompletableFuture<String> res2 = tx.getsAsync("row2", new Column("col2"), "foo");
CompletableFuture<String> res3 = tx.getsAsync("row3", new Column("col3"));
CompletableFuture<String> res4 = tx.getsAsync("row4", new Column("col4"), "val4");

Assert.assertEquals("val1", res1.get());
Assert.assertEquals("val2", res2.get());
Assert.assertEquals("val3", res3.get());
Assert.assertEquals("val4", res4.get());
}
}

@Test
public void testgetAsync() throws Exception {
Bytes row1 = Bytes.of("row1");
Bytes row2 = Bytes.of("row2");
Bytes row3 = Bytes.of("row3");
Bytes row4 = Bytes.of("row4");

Bytes val1 = Bytes.of("val1");
Bytes val2 = Bytes.of("val2");
Bytes val3 = Bytes.of("val3");
Bytes val4 = Bytes.of("val4");

try (Transaction tx = client.newTransaction()) {
tx.set(row1, new Column("col1"), val1);
tx.set(row2, new Column("col2"), val2);
tx.set(row3, new Column("col3"), val3);

tx.commit();
}

try (Transaction tx = client.newTransaction()) {
CompletableFuture<Bytes> res1 = tx.getAsync(row1, new Column("col1"));
CompletableFuture<Bytes> res2 = tx.getAsync(row2, new Column("col2"), Bytes.of("foo"));
CompletableFuture<Bytes> res3 = tx.getAsync(row3, new Column("col3"));
CompletableFuture<Bytes> res4 = tx.getAsync(row4, new Column("col4"), Bytes.of("val4"));

Assert.assertEquals(val1, res1.get());
Assert.assertEquals(val2, res2.get());
Assert.assertEquals(val3, res3.get());
Assert.assertEquals(val4, res4.get());
}
}


}