Skip to content

[Quick Start] Building a simple recommendation engine with QBit (CallBack nonBlocking)

fadihub edited this page Jun 8, 2015 · 3 revisions

##overview

To really grasp QBit, one must grasp the concepts of a CallBack and queues. A CallBack is a way to get an async response in QBit from a microservice. You call a service method and it calls you back. There are two golden rules to the Queue club:

-Don't block.

-Use a callback if you are not ready to handle events/methods, and continue handling events/methods that you are ready for.

Building a simple recommendation engine with QBit - CallBack Blocking

This wiki will walk you through the process of building a simple recommendation engine with QBit, in the [previous example ] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Building-a-simple-recommendation-engine-with-QBit-(CallBack-Blocking)) we talked about how loadUser is blocking which might result in blocking threads that handle all the messages. In this example the blocking issue will be fixed and things are going to be very simple as well.

What you will build

You will build a simple recommendation engine with QBit; that will give a set of recommendations to users. When you run it you will get the following:

Recommendations for: Bob
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}
Recommendations for: Joe
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}
Recommendations for: Scott
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}
Recommendations for: William
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}

How to complete this guide

In order to complete this example successfully you will need the following installed on your machine:

Now that your machine is all ready let's get started:

https://github.com/fadihub/worker-callback-nonblocking.git

Once this is done you can test the service, let's first explain the process:

The process will be explained in more detail under [[Detailed Tutorial] Building a simple recommendation engine with QBit - CallBack nonBlocking. ] (https://github.com/advantageous/qbit/wiki/%5BDetailed-Tutorial%5D-Building-a-simple-recommendation-engine-with-QBit-(CallBack-nonBlocking))

User.java Listing

~/src/main/java/io.advantageous.qbit.example.recommendationengine/User

package io.advantageous.qbit.example.recommendationengine;

/* Domain object. */
public class User {

    private final String userName;

    public User(String userName){
        this.userName = userName;
    }

    public String getUserName() {
        return userName;
    }
}

Recommendation.java Listing

package io.advantageous.qbit.example.recommendationengine;


/* Domain object. */
public class Recommendation {

    private final String recommendation;

    public Recommendation(String recommendation) {
        this.recommendation = recommendation;
    }


    @Override
    public String toString() {
        return "Recommendation{" +
                "recommendation='" + recommendation + '\'' +
                '}';
    }
}

UserDataServiceClient.java Listing

package io.advantageous.qbit.example.recommendationengine;

import io.advantageous.qbit.reactive.Callback;

public interface UserDataServiceClient {

    void loadUser(Callback<User> callBack, String userId);
}

UserDataService.java Listing

package io.advantageous.qbit.example.recommendationengine;


import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.boon.core.Sys;

import java.util.ArrayList;
import java.util.List;

import static io.advantageous.boon.Boon.puts;


public class UserDataService {


    private final List<Runnable> userLoadCallBacks = new ArrayList<>(1_000);

    public void loadUser(final Callback<User> callBack, final String userId) {

        puts("UserDataService :: loadUser called", userId);
        userLoadCallBacks.add(
                new Runnable() {
                    @Override
                    public void run() {
                        callBack.accept(new User(userId));
                    }
                });

    }


    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.LIMIT})
    public void pretendToDoIO() {
        Sys.sleep(100);

        if (userLoadCallBacks.size()==0) {
            return;
        }
        for (Runnable runnable : userLoadCallBacks) {
            runnable.run();
        }
        userLoadCallBacks.clear();

    }




}

RecommendationServiceClient.java Listing

package io.advantageous.qbit.example.recommendationengine;

import io.advantageous.qbit.reactive.Callback;

import java.util.List;

/**
 * @author  rhightower
 * on 2/20/15.
 */
public interface RecommendationServiceClient {


    void recommend(final Callback<List<Recommendation>> recommendationsCallback,
                   final String userName);
}

RecommendationService.java Listing

package io.advantageous.qbit.example.recommendationengine;


import io.advantageous.boon.Lists;
import io.advantageous.boon.cache.SimpleLRUCache;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

public class RecommendationService {


    private final SimpleLRUCache<String, User> users =
            new SimpleLRUCache<>(10_000);


    private BlockingQueue<Runnable> callbacks = new ArrayBlockingQueue<Runnable>(10_000);
    private UserDataServiceClient userDataService;


    public RecommendationService(UserDataServiceClient userDataService) {
        this.userDataService = userDataService;
    }


    public void recommend(final Callback<List<Recommendation>> recommendationsCallback,
                          final String userName) {


        System.out.println("recommend called");

        User user = users.get(userName);

        if (user == null) {
            userDataService.loadUser(
                    loadedUser -> {
                        handleLoadFromUserDataService(loadedUser, recommendationsCallback);
                    }, userName);
        } else {
            recommendationsCallback.accept(runRulesEngineAgainstUser(user));
        }

    }

    /**
     * Handle defered recommendations based on user loads.
     */
    private void handleLoadFromUserDataService(final User loadedUser,
                                               final Callback<List<Recommendation>> recommendationsCallback) {

        /** Add a runnable to the callbacks list. */
        callbacks.add(() -> {
            List<Recommendation> recommendations = runRulesEngineAgainstUser(loadedUser);
            recommendationsCallback.accept(recommendations);
        });
    }


    @QueueCallback({
            QueueCallbackType.EMPTY,
            QueueCallbackType.START_BATCH,
            QueueCallbackType.LIMIT})
    private void handleCallbacks() {

        flushServiceProxy(userDataService);
        Runnable runnable = callbacks.poll();

        while (runnable != null) {
            runnable.run();
            runnable = callbacks.poll();
        }
    }

    /* Fake CPU intensive operation. */
    private List<Recommendation> runRulesEngineAgainstUser(final User user) {
        return Lists.list(new Recommendation("Take a walk"), new Recommendation("Read a book"),
                new Recommendation("Love more, complain less"));
    }

}

PrototypeMain.java Listing

package io.advantageous.qbit.example.recommendationengine;

import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.service.ServiceQueue;

import java.util.List;

import static io.advantageous.boon.core.Lists.list;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/20/15.
 */
public class PrototypeMain {

    public static void main(String... args) {



        /* Create user data service and client proxy. */
        ServiceQueue userDataService = serviceBuilder()
                .setServiceObject(new UserDataService())
                .build().startServiceQueue();
        userDataService.startCallBackHandler();
        UserDataServiceClient userDataServiceClient = userDataService
                .createProxy(UserDataServiceClient.class);



        /* Create recommendation service and client proxy. */
        RecommendationService recommendationServiceImpl =
                new RecommendationService(userDataServiceClient);
        ServiceQueue recommendationServiceQueue = serviceBuilder()
                .setServiceObject(recommendationServiceImpl)
                .build().startServiceQueue().startCallBackHandler();

        RecommendationServiceClient recommendationServiceClient =
                recommendationServiceQueue.createProxy(RecommendationServiceClient.class);


        /* Use recommendationServiceClient for 4 recommendations for
          Bob, Joe, Scott and William. */
        List<String> userNames = list("Bob", "Joe", "Scott", "William");

        userNames.forEach( userName->
                        recommendationServiceClient.recommend(recommendations -> {
                            System.out.println("Recommendations for: " + userName);
                            recommendations.forEach(recommendation->
                                    System.out.println("\t" + recommendation));
                        }, userName)
        );

        flushServiceProxy(recommendationServiceClient);
        Sys.sleep(1000);

    }
}

Test The Service

With your terminal cd worker-callback-nonblocking

then gradle clean build and finally gradle run you should get the following:

Recommendations for: Bob
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}
Recommendations for: Joe
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}
Recommendations for: Scott
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}
Recommendations for: William
	Recommendation{recommendation='Take a walk'}
	Recommendation{recommendation='Read a book'}
	Recommendation{recommendation='Love more, complain less'}

Summary

You have built and tested the non blocking version of the recommendation engine with QBit, see you in the next tutorial!

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally