Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop design for asynchronous observers #968

Open
keith-turner opened this issue Nov 8, 2017 · 0 comments
Open

Develop design for asynchronous observers #968

keith-turner opened this issue Nov 8, 2017 · 0 comments

Comments

@keith-turner
Copy link
Contributor

keith-turner commented Nov 8, 2017

Currently when a Fluo worker thread executes an observer, the execution flow looks something like the following.

  • Worker Thread 1 : execute Observer process method for transaction 1
  • Worker Thread 1 : process method executes get on transaction 1. Wait for this to return
  • Worker Thread 1 : process method executes get on transaction 1. Wait for this to return
  • Worker Thread 1 : process method executes get on transaction 1. Wait for this to return
  • Worker Thread 1 : process method return, transaction is queued for asynchronous commit

While executing get methods above, the worker thread is idle. I am curious if we could design an asynchronous API for observers that allows the worker thread to do other work while a get method is executing.

One possible design would be to have new observer interface like the following.

public interface AsyncObserver {
  /**
   * This method should immediately return a completable future.  When the returned 
   * future completes, the transaction will be committed.
   */
  CompletableFuture<Void> process(TransactionBase tx, Bytes row, Column col);
}

Below is an example AsyncObserver that is rewrite of the ContentObserver from a Fluo Tour exercise solution. The example below use the getAsync methods proposed in #967

 public static class ContentObserver implements AsyncObserver
  {

    private List<String> tokenize(String document){//TODO}
    
    private CompletableFuture<Void> adjustCounts(TransactionBase tx, int delta, 
                                                 List<String> words) {

      List<CompletableFuture<Void>> futures = new ArrayList<>();

      for (String word : new HashSet<>(words)) {
        CompletableFuture<Void> future = tx.getsAsync("w:" + word, WORD_COUNT, "0")
            .thenApply(Integer::parseInt)
            .thenApply(count -> delta + count)
            .thenAccept(newCount -> {
              if (newCount == 0)
                tx.delete("w:" + word, WORD_COUNT);
              else
                tx.set("w:" + word, WORD_COUNT, newCount + "");
            });
        futures.add(future);
      }

      // Return a future that is only complete after all async word operations 
      // are complete. Yuck, why isn't there an allOf method that takes a 
      // Collection?
      return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
    }
    
    @Override
    public CompletableFuture<Void> process(TransactionBase tx, Bytes row, Column col) {
      
      CompletableFuture<String> content = tx.getsAsync(row, CONTENT_COL);
      CompletableFuture<String> status = tx.getsAsync(row, REF_STATUS_COL);
      CompletableFuture<String> processed = tx.getsAsync(row, PROCESSED_COL, "false");
      
      // After all three of the operations complete, then the lambda passed to thenCompose
      // will either do nothing or return another async operation to compute word counts.

      return CompletableFuture.allOf(content, status, processed).thenCompose(v -> {
        int delta = 0;
        if (status.get().equals("referenced") && processed.get().equals("false")) {
          tx.set(row, PROCESSED_COL, "true");
          delta = 1;
        }

        if (status.get().equals("unreferenced")) {
          for (Column c : new Column[] {PROCESSED_COL, CONTENT_COL, 
                                        REF_COUNT_COL, REF_STATUS_COL})
            tx.delete(row, c);

          if (processed.get().equals("true")) {
            delta = -1;
          }
        }

        if (delta == 0) {
          // nothing to do, so return a completed operation
          return CompletableFuture.completedFuture(null);
        } else {
          // return another async operation to update word counts
          return adjustCounts(tx, delta, tokenize(content.get()));
        }
      });
    }
  }

The purpose of having this API would be to increase throughput. To realistically achieve this, Accumulo would need to support async methods as mentioned in #651.

@keith-turner keith-turner mentioned this issue Nov 27, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant