-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue-967 #969
Issue-967 #969
Conversation
Created the following classes: AsyncReader TransactionImplTest
This is my first pass at implementing getsAsync, though I haven't had a good chance to look it over and test properly. I started to try and test it but was having trouble creating a transaction to test with. I'm going to give it another go tomorrow. |
Also need to implement close() still |
import org.apache.fluo.core.impl.TransactionImpl; | ||
|
||
public class AsyncReader { | ||
private PriorityQueue<AsyncGet> getsQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend using a BlockingQueue and not a PriorityQueue. No need to sort request.
|
||
public AsyncReader(TransactionImpl tx) { | ||
this.tx = tx; | ||
getsQueue = new PriorityQueue<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could create a LinkedBlockingQueue
AsyncGet curAsyncGet = new AsyncGet(row, column, defaultValue); | ||
getsQueue.add(curAsyncGet); | ||
executorService.submit(() -> { | ||
int i = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the face of concurrency this lambda is not correct. I would recommend something like the following and I will try to explain why.
//use arraylist instead of hashset because it has better performance and no need for unique
List <AsyncGet> gets = new ArrayList();
asyncGets.drainTo(gets);
//not sure if I have the syntax below correct, but I think Guava has this method
Collection<RowColumn> rowColumns = Collections2.transform(gets, ag -> ag.rc);
Map<RowColumn, Bytes> getsMap = tx.get(rowColumns);
//process everything we drained off the queue... do not want to touch the queue as new things may have been added.
for(AsyncGet asynGet : gets) {
Bytes result = getsMap.get(asyncGet.rc);
asyncGet.res.complete(result); //TODO handle default value
}
With the current code if the execservice had multiple threads then multiple threads could process the same AsyncGet request. Using drainTo with LinkedBlockingQueue avoids this. This is not true for all collections, but this particular collection impl gets a lock in drainTo.
Map<RowColumn, Bytes> getsMap = tx.get(rowColumns); | ||
|
||
while (!getsQueue.isEmpty()) { | ||
AsyncGet asyncGet = getsQueue.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned above, should not access queue here. Consider the case where things were added to the queue after get was called above. For those cases null or defaultValue may be incorrectly returned.
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
public class TransactionImplTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test should be in a different place. Fluo has Integration test (called ITs) that run against a real Fluo instance (backed by real Accumulo and Zookeeper instances). These test are at fluo/modules/integration/src/test/java/org/apache/fluo/integration
. I would recommend putting your test there under client or impl and extending ITBaseImpl. After that each test should have a FluoClient called client available. So should be able to do the following.
@Test
public void testBasic() {
try (Transaction tx = client.newTransaction()) {
tx.set("row1", new Column("col1"), "val1");
tx.set("row2", new Column("col2"), "val2");
 tx.set("row3", new Column("col3"), "val3");

tx.commit();
}
try (Transaction tx = client.newTransaction()) {
//do async gets....
}
}
Changed getsAsync tasks to drain the AsyncGet Queue. Deleted TransactionImplTest and created TransactionImplIT.
Assert.assertEquals("val2", tx.getsAsync("row2", new Column("col2"), "foo").get()); | ||
Assert.assertEquals("val3", tx.getsAsync("row3", new Column("col3")).get()); | ||
Assert.assertEquals("val4", tx.getsAsync("row4", new Column("col4"), "val4").get()); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can make the test method throw an exception and drop this catch.
Assert.assertEquals("val4", tx.getsAsync("row4", new Column("col4"), "val4").get()); | ||
} catch (Exception e) { | ||
Assert.fail("getsAsync caused the following exception: " + e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be nice to also test the following :
try (Transaction tx = client.newTransaction()) {
Future<Bytes> val1 = tx.getsAsync("row1", new Column("col1"));
Future<Bytes> val2 = tx.getsAsync("row2", new Column("col2"), "foo");
Future<Bytes> val3 = tx.getsAsync("row3", new Column("col3"));
Future<Bytes> val4 = tx.getsAsync("row4", new Column("col4"), "val4");
//TODO check values
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkosh44 did you see the comment above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, does the testgetAsync() method not cover these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite, the test immediately calls get()
on the Future and get()
blocks so its never doing more than one async operation at time. I modified one of the test to do the following and ran it locally, I put in the timing info to see make sure the getAsync calls w/o the get() were fast.
try (Transaction tx = client.newTransaction()) {
long t1 = System.currentTimeMillis();
Assert.assertEquals("val1", tx.getsAsync("row1", new Column("col1")).get());
Assert.assertEquals("val2", tx.getsAsync("row2", new Column("col2"), "foo").get());
Assert.assertEquals("val3", tx.getsAsync("row3", new Column("col3")).get());
Assert.assertEquals("val4", tx.getsAsync("row4", new Column("col4"), "val4").get());
long t2 = System.currentTimeMillis();
System.out.println(t2-t1);
}
try (Transaction tx = client.newTransaction()) {
long t1 = System.currentTimeMillis();
CompletableFuture<String> val1 = tx.getsAsync("row1", new Column("col1"));
CompletableFuture<String> val2 = tx.getsAsync("row2", new Column("col2"), "foo");
CompletableFuture<String> val3 = tx.getsAsync("row3", new Column("col3"));
CompletableFuture<String> val4 = tx.getsAsync("row4", new Column("col4"), "val4");
long t2 = System.currentTimeMillis();
Assert.assertEquals("val1", val1.get());
Assert.assertEquals("val2", val2.get());
Assert.assertEquals("val3", val3.get());
Assert.assertEquals("val4", val4.get());
long t3 = System.currentTimeMillis();
System.out.println((t2-t1) + " " + (t3- t2));
}
@@ -165,4 +167,14 @@ | |||
* @return transactions start timestamp allocated from Oracle. | |||
*/ | |||
long getStartTimestamp(); | |||
|
|||
default CompletableFuture<String> getsAsync(String row, Column column) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need Bytes getAsync(Bytes row, Column column)
and Bytes getAsync(Bytes row, Column column)
methods also. The string methods could just call the bytes methods.
|
||
asyncGetsQueue.drainTo(getsList); | ||
|
||
Collection<RowColumn> rowColumns = Collections2.transform(getsList, ag -> ag.rc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its possible this will throw an exception. If it does, need to complete all of the futures exceptionally.
…nd added unit tests. Added an exception catch during async methods to complete exceptionally
Just a note, I noticed all of the get methods with Byte were called "get(...)" while all the get methods with Strings were called "gets(...)". I kept this consistent with the async methods. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkosh44 this is looking good. I am thinking of holding this PR until after 1.2.0 is released. The reason I would like to do that is because there are some other places in the API where it would be nice to have async functionality, like LoaderExecutor and the commit method.
I would like all of the async changes to be considered together, and not done across releases. If we release this in 1.2.0 and then discover we would like to change this API while workiing on other async functionality then it can not be changed w/o deprecation.
Also I would like to do some performance testing of this change. When do that, I will post the results here.
executorService.shutdown(); | ||
} | ||
|
||
class AsyncGet { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could make this static. Since it is not static it has an extra hidden reference to AsyncReader.this
which is not used and makes the objects slightly larger.
|
||
public CompletableFuture<String> gets(String row, Column column, String defaultValue) { | ||
Bytes defaultValueBytes = defaultValue == null ? new Bytes() : Bytes.of(defaultValue); | ||
return get(Bytes.of(row), column, defaultValueBytes).thenApply(b -> b.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I like the use of thenApply here.
} | ||
} catch (Exception e) { | ||
for (AsyncGet asyncGet : getsList) { | ||
asyncGet.res.completeExceptionally(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an exception were to occur in the for loop inside the try, then it is possible that some of the futures are already completed. That case seems unexpected, not sure what if anything we should do about that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about removing each AsyncGet object from getsList as we complete it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking at the javadoc and I don't think anything needs to be done. The javadoc for completeExceptionally says the following
If not already completed, causes invocations of get() and related methods to throw the given exception.
So it seems if its already completed, then nothing will happen.
|
||
@Override | ||
public CompletableFuture<String> getsAsync(String row, Column column) { | ||
if (asyncReader == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could shorten these method with a method like the following
private AsyncReader getAsyncReader() {
if (asyncReader == null) {
 asyncReader = new AsyncReader(this);
 }
}
Then could do the following below
return getAsyncReader().gets(row, column);
} | ||
|
||
default CompletableFuture<String> getsAsync(String row, Column column, String defaultValue) { | ||
return CompletableFuture.supplyAsync(() -> gets(row, column, defaultValue)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you see my comment on the issue about supplyAsync? These request will be put on a JVM wide thread pool and these request do I/O. So they may block other unrelated work in the JVM, but not completely sure. I will try to look into this some more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did see but I thought it was OK for the default method to be blocking as long as the implementations weren't blocking. I can remove the default implementation all together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The supplyAsync method will run the task on the ForkJoin common pool which is a JVM wide pool. I was thinking the default method should just do something like the following to avoid this.
return CompletableFuture.completedFuture(gets(row, column, defaultValue));
@jkosh44 I noticed your branch is based off an older commit. I usually add a git remote called upstream like the following. git remote add upstream https://github.com/apache/fluo.git Then with that I added I update my local master with the latest commits from upstream before branching for a new issue. # update all remote tracking branches
git fetch --all
git checkout master
# bring local master branch up to date with latest commits
git merge upstream/master
# create new branch from latest master
git checkout -b issue-xyz Don't worry about this PR being based on an older commit, its ok. |
…emoved from the list so it is not later completed excpetionally in the case of an exception
Is it too late to follow the git upstream steps for this issue? |
No, you can try. If you are new to git I would recommend experimenting in another branch. Try the following just to see what happens. Does the rebase command have conflicts?
|
for (AsyncGet asyncGet : getsList) { | ||
Bytes result = getsMap.get(asyncGet.rc); | ||
asyncGet.res.complete(result == null ? defaultValue : result); | ||
getsList.remove(asyncGet); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following comments are moot because I don't think things needs to be removed based on earlier comment. Just sharing some knowledge in following comments.
This could cause a concurrent modification exception. The foreach has a hidden iterator. In java, many iterators will throw a concurrent modification exception if the collection is modified during iteration. Using a while loop with an explicit iterator and calling remove on the iterator is the best way to remove while iterating.
Also removing items from an arraylist could be an O(N^2) operation. In the situation of removing from the middle of a list a linked list may be better.
|
||
for (AsyncGet asyncGet : getsList) { | ||
Bytes result = getsMap.get(asyncGet.rc); | ||
asyncGet.res.complete(result == null ? defaultValue : result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be asyncGet.defaultValue
Corrected errors involved with defaultValue No longer remove completed Futures from AsyncGet list
My latest commit should address all the issues mentioned. |
@keith-turner Also I never commented on the 1.2.0 release. That sounds good to me. If you need someone to work on other Async functionality let me know and I'd be happy to help. |
Created the following classes: AsyncReader TransactionImplTest
Changed getsAsync tasks to drain the AsyncGet Queue. Deleted TransactionImplTest and created TransactionImplIT.
…nd added unit tests. Added an exception catch during async methods to complete exceptionally
…emoved from the list so it is not later completed excpetionally in the case of an exception
Corrected errors involved with defaultValue No longer remove completed Futures from AsyncGet list
…e-967 Conflicts: modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
@jkosh44 these changes are looking good. I am still puzzling out the async story for Fluo. Currently there are a few areas I have identified and opened issue about.
|
I left a comment on #722 and will give it a go. On an unrelated note, I feel like all my commits fail on the continuous-integration Travis CI build. Is there a specific reason? I'm not really sure what I'm doing to cause this. |
The builds on travis are not always predictable. Sometimes this is an issue with Travis and sometimes its Fluo. I have been trying to correct Fluo's flakiness as we run into it. For your latest commit, the new IT is missing an import stmt for CompletableFuture. Lets see what happens with Travis after that is fixed. |
I squashed and rebased this and added a few comments in #1099. Looking at this, it would be nice to have in the next release. |
Just reviewed it and compared it to this one and it looks good to me |
This work was merged in #1099 |
Initial implementation for getsAsync()
Created the following classes:
AsyncReader
TransactionImplTest