Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype adding async get methods to Transaction #967

Closed
keith-turner opened this issue Nov 8, 2017 · 7 comments · Fixed by #1099
Closed

Prototype adding async get methods to Transaction #967

keith-turner opened this issue Nov 8, 2017 · 7 comments · Fixed by #1099
Milestone

Comments

@keith-turner
Copy link
Contributor

keith-turner commented Nov 8, 2017

Fetching multiple cells in the Fluo tour describes get methods that Fluo has to quickly read multiple cells. While using these method is faster than calling get(row, col) sequentially, they can be a bit cumbersome. The same thing performance wise could be accomplished with asynchronous get methods, however I am not convinced this would be less cumbersome. I have been thinking about this idea for a while, but I have yet to convince myself its a fully baked or good idea. I currently am opening this issue to share my thoughts, not to advocate for this feature.

Suppose the following methods were added to SnapshotBase. These method would queue up the get operation in the background and return immediately.

CompletableFuture<String> getsAsync(String row, Column column);
CompletableFuture<String> getsAsync(String row, Column column, String defaultValue);

Using these methods, this process() method from a Fluo Tour exercise solution could be written as follows.

  public void process(TransactionBase tx, String row, Column col) throws Exception {

    // Use Future here instead of CompletableFuture because its shorter and has 
    // needed get method.  This should be much faster than calling three blocking 
    // get methods.
    Future<String> content = tx.getsAsync(row, CONTENT_COL);
    Future<String> status = tx.getsAsync(row, REF_STATUS_COL);
    Future<String> processed = tx.getsAsync(row, PROCESSED_COL, "false");

    // Instead of doing status.equals below have to do status.get().equals.  Same with
    // processed.
    if (status.get().equals("referenced") && processed.get().equals("false")) {
      adjustCounts(tx, +1, tokenize(content.get()));
      tx.set(row, PROCESSED_COL, "true");
    }

    if (status.get().equals("unreferenced")) {
      for (Column c : new Column[] {PROCESSED_COL, CONTENT_COL, REF_COUNT_COL, REF_STATUS_COL})
        tx.delete(row, c);

      if (processed.get().equals("true")) {
        adjustCounts(tx, -1, tokenize(content.get()));
      }
    }
  }

The method above is only one line shorter and I think having to call status.get() vs just using status is a bit more cumbersome and possibly buggy. For example status.equals("referenced") above would probably compile (because equals takes Object), but it would always return false.

The adjustCounts method from a Tour exercise solution could be rewritten as follows.

  // This method reads the current counts for the passed in words, and then writes out
  // the current count plus the delta for each work.
  private void adjustCounts(TransactionBase tx, int delta, List<String> words) {
    
    List<Future<Void>> futures = new ArrayList<>();

    for (String word : new HashSet<>(words)) {
      Future<Void> future = tx.getsAsync("w:" + word, WORD_COUNT, "0")
          .thenApply(Integer::parseInt)
          .thenApply(count -> delta + count)
          .thenAccept(newCount -> {
            if (newCount == 0)
              tx.delete("w:" + word, WORD_COUNT);
            else
              tx.set("w:" + word, WORD_COUNT, newCount + "");
          });
    }

    // wait for all futures to finish
    for (Future<Void> future : futures) {
      future.get();
    }
  }

Personally I think this method is slightly easier to understand, given an understanding of CompletableFuture. I found CompletableFuture a bit daunting when I first looked at it, but it grew on me.

I am interested in seeing more use cases for these proposed get async methods. I am also interested in prototyping them in-order to make it possible to experiment with them.

@jkosh44
Copy link
Contributor

jkosh44 commented Nov 9, 2017

I'd like to try and put together some prototypes.

@keith-turner
Copy link
Contributor Author

I have thought about two possible approaches to prototyping this. Both approaches would have the following.

  • An internal class called AsyncGet that contains a RowColumn and CompletableFuture
  • A per transaction queue of AyncGet objects
  • Whenever getAsycn is called on a transaction it creates an AsyncGet object and adds it to the queue
  • Something eventually processes the queue by taking all AsyncGet objects and calling the existing get(Collection<RowColumn>) method and uses the results to complete all of the futures.

The difference between the two approaches is how the queue is processed. Below are two ways I have been thinking of processing the queue.

  • There is a single threaded executor service per transaction (created 1st time getAsync is called). Each time something is added to the queue a task is also added to the executor service. What the task does is take everything that is present on the queue processes it as described above. When a transaction is closed, this service is stopped.
  • The first time anything tries to use the result of any completable future returned by getAsync, then everything in the queue is processed. Ideally this would be done by the thread executing the transaction and trying to use the future. However I am not sure this approach is even possible. I have not had enough time to explore CompeltableFuture in depth, but from what I have seen so far this approach does not seem possible. I am writing it up because I think its the ideal approach, but may not be possible with CompletableFuture.

@jkosh44
Copy link
Contributor

jkosh44 commented Nov 12, 2017

Hi Keith,

Sorry I haven't been able to look at this until today. My first question is, should I be implementing these methods in the SnapShotBase Interface or one of the implementing classes. If the second option, where are the implementing classes?

Thanks,
Joe

@keith-turner
Copy link
Contributor Author

You could add a default method to SnapShotBase like the following.

  default CompletableFuture<String> getsAsync(String row, Column column) {
    return CompletableFuture.completedFuture(gets(row, column));
  }

This default method is actually synchronous and blocks. However it provides a default impl for any existing classes that implement the interface and do not provide this new method.

You will want to override this in TransactionImpl and provide an impl that is async. The TransactionImpl class is a bit too big, it needs to be refactored. Could possibly put this async get functionality in its own class and have TransactionImpl call it. Maybe something like the following.

class AsyncReader {
   private queue
   private executor

  AsyncReader(TransactionImpl tx) {//TODO}
  CompletableFuture<String> gets(String row, String column) {//TODO}
  void close() {//TODO clean up any resource, like shutdown thread pool}
}

@jkosh44
Copy link
Contributor

jkosh44 commented Nov 14, 2017

I'm a bit new to multi-threading so apologies if any of these questions seem basic.

I think I understand your idea and am going to start working on it. My question is though why can't we do something like this:

    default CompletableFuture<String> getsAsync(String row, Column column) {
      return CompletableFuture.supplyAsync(() -> gets(row, column));
    }

Is the idea that we want to run each get method sequentially but as a whole do them asynchronously from everything else?

@keith-turner
Copy link
Contributor Author

keith-turner commented Nov 14, 2017

@jkosh44 there are two problems with the supplyAsync(() -> gets(row, column)) approach. First, the task will execute in a thread pool that is shared JVM wide. The gets operation does network I/O. Executing a I/O task in this JVM wide thread pool could block other task (like parallel stream operations and parallel sort operations).

Second, each gets(row, column) method will make a remote procedure call (RPC) to get data from an Accumulo server. If all of the async calls are batched into a single gets(Collection<RowColumn>) call, then this may do less RPCs depending on the number of servers the data is on.

@keith-turner
Copy link
Contributor Author

Project Loom is something to consider when thinking about adding async methods to Fluo's public API.

http://cr.openjdk.java.net/~rpressler/loom/Loom-Proposal.html

Not sure what the timeline for Loom is.

This was linked to pull requests Jul 1, 2020
@ctubbsii ctubbsii added this to the 2.0.0 milestone Jul 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants