Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-967 #969

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import java.util.Collection;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.fluo.api.client.scanner.ScannerBuilder;
import org.apache.fluo.api.data.Bytes;
Expand Down Expand Up @@ -165,4 +167,14 @@ Map<String, Map<Column, String>> gets(Collection<? extends CharSequence> rows,
* @return transactions start timestamp allocated from Oracle.
*/
long getStartTimestamp();

default CompletableFuture<String> getsAsync(String row, Column column) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need Bytes getAsync(Bytes row, Column column) and Bytes getAsync(Bytes row, Column column) methods also. The string methods could just call the bytes methods.

return CompletableFuture.supplyAsync(() -> gets(row, column));
}

default CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
return CompletableFuture.supplyAsync(() -> gets(row, column, defaultValue));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you see my comment on the issue about supplyAsync? These request will be put on a JVM wide thread pool and these request do I/O. So they may block other unrelated work in the JVM, but not completely sure. I will try to look into this some more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did see but I thought it was OK for the default method to be blocking as long as the implementations weren't blocking. I can remove the default implementation all together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The supplyAsync method will run the task on the ForkJoin common pool which is a JVM wide pool. I was thinking the default method should just do something like the following to avoid this.

  return CompletableFuture.completedFuture(gets(row, column, defaultValue));

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.core.impl;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.impl.TransactionImpl;

public class AsyncReader {
private PriorityQueue<AsyncGet> getsQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend using a BlockingQueue and not a PriorityQueue. No need to sort request.

private ExecutorService executorService;
private TransactionImpl tx;

public AsyncReader(TransactionImpl tx) {
this.tx = tx;
getsQueue = new PriorityQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could create a LinkedBlockingQueue

executorService = Executors.newSingleThreadExecutor();
}

public CompletableFuture<String> gets(String row, Column column) {
return gets(row, column, null);
}

public CompletableFuture<String> gets(String row, Column column, String defaultValue) {
AsyncGet curAsyncGet = new AsyncGet(row, column, defaultValue);
getsQueue.add(curAsyncGet);
executorService.submit(() -> {
int i = 0;
Copy link
Contributor

@keith-turner keith-turner Nov 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the face of concurrency this lambda is not correct. I would recommend something like the following and I will try to explain why.

  //use arraylist instead of hashset because it has better performance and no need for unique
  List <AsyncGet> gets = new ArrayList();  

  asyncGets.drainTo(gets);

  //not sure if I have the syntax below correct, but I think Guava has this method
  Collection<RowColumn> rowColumns = Collections2.transform(gets, ag -> ag.rc); 
  Map<RowColumn, Bytes> getsMap = tx.get(rowColumns);

  //process everything we drained off the queue... do not want to touch the queue as new things may have been added.
  for(AsyncGet asynGet : gets) {
    Bytes result = getsMap.get(asyncGet.rc);
    asyncGet.res.complete(result);  //TODO handle default value
  }

With the current code if the execservice had multiple threads then multiple threads could process the same AsyncGet request. Using drainTo with LinkedBlockingQueue avoids this. This is not true for all collections, but this particular collection impl gets a lock in drainTo.

Collection<RowColumn> rowColumns = new HashSet<>();
for (AsyncGet asyncGet : getsQueue) {
rowColumns.add(asyncGet.rc);
}
Map<RowColumn, Bytes> getsMap = tx.get(rowColumns);

while (!getsQueue.isEmpty()) {
AsyncGet asyncGet = getsQueue.poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above, should not access queue here. Consider the case where things were added to the queue after get was called above. For those cases null or defaultValue may be incorrectly returned.

Bytes bytesRes = getsMap.get(asyncGet.rc);
asyncGet.res.complete(bytesRes == null ? defaultValue : bytesRes.toString());
}
});
return curAsyncGet.res;
}

public void close() {

}

class AsyncGet {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make this static. Since it is not static it has an extra hidden reference to AsyncReader.this which is not used and makes the objects slightly larger.

RowColumn rc;
CompletableFuture<String> res;
String defaultValue;

public AsyncGet(String row, Column column, String defaultValue) {
rc = new RowColumn(row, column);
res = new CompletableFuture<>();
this.defaultValue = defaultValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.apache.fluo.core.async.SyncCommitObserver;
import org.apache.fluo.core.exceptions.AlreadyAcknowledgedException;
import org.apache.fluo.core.exceptions.StaleScanException;
import org.apache.fluo.core.impl.AsyncReader;
import org.apache.fluo.core.impl.scanner.ScannerBuilderImpl;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.util.ColumnUtil;
Expand Down Expand Up @@ -117,6 +119,8 @@ private static enum TxStatus {
private TransactorNode tnode = null;
private TxStatus status = TxStatus.OPEN;
private boolean commitAttempted = false;
private AsyncReader asyncReader = null;


// for testing
private boolean stopAfterPreCommit = false;
Expand Down Expand Up @@ -265,6 +269,24 @@ private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
return ret;
}

@Override
public CompletableFuture<String> getsAsync(String row, Column column) {
if (asyncReader == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could shorten these method with a method like the following

private AsyncReader getAsyncReader() {
  if (asyncReader == null) {
    asyncReader = new AsyncReader(this);
  }
}

Then could do the following below

  return getAsyncReader().gets(row, column);

asyncReader = new AsyncReader(this);
}

return asyncReader.gets(row, column);
}

@Override
public CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) {
if (asyncReader == null) {
asyncReader = new AsyncReader(this);
}

return asyncReader.gets(row, column, defaultValue);
}

@Override
public ScannerBuilder scanner() {
checkIfOpen();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.core.impl;

import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.client.Transaction;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.core.client.FluoClientImpl;
import org.junit.Assert;
import org.junit.Test;

public class TransactionImplTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be in a different place. Fluo has Integration test (called ITs) that run against a real Fluo instance (backed by real Accumulo and Zookeeper instances). These test are at fluo/modules/integration/src/test/java/org/apache/fluo/integration. I would recommend putting your test there under client or impl and extending ITBaseImpl. After that each test should have a FluoClient called client available. So should be able to do the following.

@Test
 public void testBasic() {
     try (Transaction tx = client.newTransaction()) {
          tx.set("row1", new Column("col1"), "val1");
          tx.set("row2", new Column("col2"), "val2");
          tx.set("row3", new Column("col3"), "val3");
 
          tx.commit();
     }

     try (Transaction tx = client.newTransaction()) {
         //do async gets....
     }
     
}

@Test
public void testBasic() {
// FluoConfiguration fluoConfig = new FluoConfiguration();
// // fluoConfig.setApplicationName(config.getApplicationName());
// // fluoConfig.setInstanceZookeepers(config.getInstanceZookeepers());
// Environment env = new Environment(fluoConfig);
// TransactionBase tx = new TestTransaction(env);

// /*try (FluoClient client = FluoFactory.newClient(fluoConfig);
// Transaction tx = client.newTransaction()) {*/

// tx.set("row1", new Column("col1"), "val1");
// tx.set("row2", new Column("col2"), "val2");
// tx.set("row3", new Column("col3"), "val3");

// tx.commit();

// try {
// Assert.assertEquals("val1", tx.getsAsync("row1", new Column("col1")).get());
// Assert.assertEquals("val2", tx.getsAsync("row2", new Column("col2")).get());
// Assert.assertEquals("val3", tx.getsAsync("row3", new Column("col3")).get());

// Assert.assertEquals("val4", tx.getsAsync("row4", new Column("col4"), "val4").get());
// } catch (Exception e) {
// System.out.println(e.toString());
// }
// /*}*/
}
}