-
Notifications
You must be signed in to change notification settings - Fork 140
[Detailed Tutorial] Building a simple recommendation engine with QBit (CallBack Blocking)
##overview
To really grasp QBit, one must grasp the concepts of a CallBack and queues. A CallBack is a way to get an async response in QBit from a microservice. You call a service method and it calls you back.
This wiki will walk you through the process of building a simple recommendation engine with QBit, in this example things are going to be very simple, and you will notice that we are blocking on loadUser
. This is bad for most apps, but we meant to show it here so that we can explain how to fix it in the next example.
You will build a simple recommendation engine with QBit; that will give a set of recommendations to users. When you run it you will get the following:
Recommendations for: Bob
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Joe
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Scott
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: William
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
In order to complete this example successfully you will need the following installed on your machine:
- Gradle; if you need help installing it, visit Installing Gradle.
- Your favorite IDE or text editor (we recommend [Intellig IDEA ] (https://www.jetbrains.com/idea/) latest version).
- [JDK ] (http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) 1.8 or later.
- Build and install QBit on your machine click [Building QBit ] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Building-QBit-the-microservice-lib-for-Java) for instrutions.
Now that your machine is all ready let's get started:
- [Download ] (https://github.com/fadihub/worker-callback-blocking/archive/master.zip) and unzip the source repository for this guide, or clone it using Git:
https://github.com/fadihub/worker-callback-blocking.git
Once this is done you can test the service, let's first explain the process:
This is just the User
object or the Domain object.
~/src/main/java/io.advantageous.qbit.example.recommendationengine/User
package io.advantageous.qbit.example.recommendationengine;
/* Domain object. */
public class User {
private final String userName;
public User(String userName){
this.userName = userName;
}
public String getUserName() {
return userName;
}
}
In a real scenario RecommendationService
is CPU bound intensive; it has to iterate through products and user data and pick a match. This is all done in real time based on the latest user activity, to the last second. What page did the users just view? What product did they just bookmark? What product did they buy? etc... This is real time analytics.
As you can see in the following code if per-say loadUser
has to look in a local cache, and if the user is not found, look in an off-heap cache and if not found it must ask for the user from the UserService which must check its caches and perhaps fallback to loading the user data from a database or from another services.
In other words, loadUser can potentially block on IO. this is very bad for most apps and shouldn't happen. When you block, you are blocking the thread that is handling all of the messages, events, method calls for this service. This will be fixed in a future example.
In this example RecommendationService
is very simple; it is just a POJO, simply loads a user and matches it with a set of recommendations.
~/src/main/java/io.advantageous.qbit.example.recommendationengine/RecommendationService
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.boon.Lists;
import io.advantageous.boon.cache.SimpleLRUCache;
import java.util.List;
public class RecommendationService {
private final SimpleLRUCache<String, User> users =
new SimpleLRUCache<>(10_000);
public List<Recommendation> recommend(final String userName) {
System.out.println("recommend called");
User user = users.get(userName);
if (user == null) {
user = loadUser(userName);
}
return runRulesEngineAgainstUser(user);
}
private User loadUser(String userName) {
return new User("bob"); //stubbed out... next example will use UserService
}
private List<Recommendation> runRulesEngineAgainstUser(final User user) {
return Lists.list(new Recommendation("Take a walk"), new Recommendation("Read a book"),
new Recommendation("Love more, complain less"));
}
}
A RecommendationService
has an interface, where others can call methods.
Calling methods on the client interface enqueues those method calls onto the ServiceQueue
for the service. You create a client interface from a ServiceQueue
as follows:
ServiceQueue recommendationServiceQueue = ...
/** Create the client interface from the recommendationServiceQueue.
RecommendationServiceClient recommendationServiceClient =
recommendationServiceQueue.createProxy(RecommendationServiceClient.class);
The ServiceQueue
manages threads/queues for a Service implementation so the service can be thread safe and fast.
~/src/main/java/io.advantageous.qbit.example.recommendationengine/RecommendationServiceClient
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.qbit.reactive.Callback;
import java.util.List;
/**
* @author rhightower
* on 2/20/15.
*/
public interface RecommendationServiceClient {
void recommend(final Callback<List<Recommendation>> recommendationsCallback,
final String userName);
}
The Callback is created as follows:
Callback<List<Recommendation>> callback = new Callback<List<Recommendation>>() {
@Override
public void accept(List<Recommendation> recommendations) {
System.out.println("recommendations " + recommendations);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
};
This is how it is done using pre Java 8 style of inner classes.
Now when we call the RecommendationServiceClient
if the callback succeeds we get a an accept
call, and if it fails we get an onError
call. We call it on one thread, and it calls us back on another thread when it finishes.
creating Callback
with a Lambda 8 expression is as follows:
recommendationServiceClient.recommend(recommendations ->
System.out.println("recommendations " + recommendations), "Rick");
Here is the Recommendation object or Domain object.
~/src/main/java/io.advantageous.qbit.example.recommendationengine/Recommendation
package io.advantageous.qbit.example.recommendationengine;
/* Domain object. */
public class Recommendation {
private final String recommendation;
public Recommendation(String recommendation) {
this.recommendation = recommendation;
}
@Override
public String toString() {
return "Recommendation{" +
"recommendation='" + recommendation + '\'' +
'}';
}
}
Here is our PrototypeMain
to run our program. First we create the service:
/* Create the service. */
RecommendationService recommendationServiceImpl =
new RecommendationService();
Then wrap the service in a queue that manages threads so the service can be thread safe and fast:
/* Wrap the service in a queue. */
ServiceQueue recommendationServiceQueue = serviceBuilder()
.setServiceObject(recommendationServiceImpl)
.build().startServiceQueue().startCallBackHandler();
Create a proxy interface:
/* Create a proxy interface for the service. */
RecommendationServiceClient recommendationServiceClient =
recommendationServiceQueue.createProxy(RecommendationServiceClient.class);
Call the service with the proxy:
/* Call the service with the proxy. */
recommendationServiceClient.recommend(out::println, "Rick");
then finally flush the call:
/* Flush the call. This can be automated, but is a core concept. */
flushServiceProxy(recommendationServiceClient);
Sys.sleep(1000);
Periodically, you need to tell QBit to flush what you have done so far. Every time you tell it to flush, you are sending all of the methods you called from the last time you flushed. There are ways to get QBit to auto-flush, which will be covered later. When you use a QBit queue, you can specify a batch size. The bigger the batch the less time doing thread hand-off and less synchronizing queue internals across multiple threads. When the queue gets requests beyond this amount, it will batch them. You can also periodically, flush the methods when you feel its appropriate like after a related unit of work. This is microbatching and QBit supports it at its core. Therefore periodically (unless you setup auto flush or a batch size of 1), you need to flush your method call queue.
Also in this example we used a Lambda expression to make Callbacks call the service:
List<String> userNames = list("Bob", "Joe", "Scott", "William");
userNames.forEach( userName->
recommendationServiceClient.recommend(recommendations -> {
System.out.println("Recommendations for: " + userName);
recommendations.forEach(recommendation->
System.out.println("\t" + recommendation));
}, userName)
);
flushServiceProxy(recommendationServiceClient);
Sys.sleep(1000);
~/src/main/java/io.advantageous.qbit.example.recommendationengine/PrototypeMain
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.service.ServiceQueue;
import java.util.List;
import static io.advantageous.boon.core.Lists.list;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
import static java.lang.System.out;
/**
* Created by rhightower on 2/20/15.
*/
public class PrototypeMain {
public static void main(String... args) {
/* Create the service. */
RecommendationService recommendationServiceImpl =
new RecommendationService();
/* Wrap the service in a queue. */
ServiceQueue recommendationServiceQueue = serviceBuilder()
.setServiceObject(recommendationServiceImpl)
.build().startServiceQueue().startCallBackHandler();
/* Create a proxy interface for the service. */
RecommendationServiceClient recommendationServiceClient =
recommendationServiceQueue.createProxy(RecommendationServiceClient.class);
/* Call the service with the proxy. */
recommendationServiceClient.recommend(out::println, "Rick");
/* Flush the call. This can be automated, but is a core concept. */
flushServiceProxy(recommendationServiceClient);
Sys.sleep(1000);
/* Lambdas gone wild. */
List<String> userNames = list("Bob", "Joe", "Scott", "William");
userNames.forEach( userName->
recommendationServiceClient.recommend(recommendations -> {
System.out.println("Recommendations for: " + userName);
recommendations.forEach(recommendation->
System.out.println("\t" + recommendation));
}, userName)
);
flushServiceProxy(recommendationServiceClient);
Sys.sleep(1000);
}
}
Here is the build file.
apply plugin: 'idea'
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = 1.8
version = '1.0'
repositories {
mavenLocal()
mavenCentral()
}
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
mainClassName = "io.advantageous.qbit.example.recommendationengine.PrototypeMain"
dependencies {
compile group: 'io.advantageous.qbit', name: 'qbit-jetty', version: '0.7.2'
compile group: 'javax.inject', name: 'javax.inject', version: '1'
compile('org.springframework.boot:spring-boot-starter-web:1.2.1.RELEASE') {
exclude module: 'spring-boot-starter-tomcat'
}
compile 'org.eclipse.jetty:jetty-webapp:9.+'
compile 'org.eclipse.jetty:jetty-jsp:9.+'
testCompile "junit:junit:4.11"
testCompile "org.slf4j:slf4j-simple:[1.7,1.8)"
}
With your terminal cd worker-callback-blocking
then gradle clean build
and finally gradle run
you should get the following:
Recommendations for: Bob
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Joe
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Scott
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: William
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
This was a brief introduction to Callbacks and how to use them with QBit. You have just built and tested a simple recommendation engine with QBit, see you in the next tutorial!
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting