-
Notifications
You must be signed in to change notification settings - Fork 140
[DOC] Working with Service Pools working with SOLRJ from a service pool
In a truly reactive word, one can expect that all APIs are async. However, at times we have to integrate with legacy services and legacy APIs like JDBC.
There are times when you will need worker pools. If you are dealing with IO and the API is not async, then you will want to wrap the API in a service that you can access from a Service pool.
In this example, we will use SOLRJ API to access SOLR.
public class SolrServiceImpl implements SolrService {
/**
* Create SolrCalypsoDataStore with config file.
*
* @param solrConfig solrConfig
*/
public SolrServiceImpl(final SolrConfig solrConfig, ...) {
logger.info("SOLR Calypso Exporter Service init {}", solrConfig);
healthServiceAsync.register(HEALTH_NAME, 20, TimeUnit.SECONDS);
this.solrConfig = solrConfig;
connect();
}
...
/**
* Connect to solr.
*/
private void connect() {
...
}
@Override
public void storeEvent(Event event) {
store(event);
}
@Override
public void storeTimeSeries(TimeSeries timeSeries) { store(timeSeries);}
@Override
public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
callback.accept(doGet(queryParams));
}
private boolean store(final Object data) {
logger.info("store():: importing calypso data event into solr {}",
data);
if (connectedToSolr) {
SolrInputDocument doc = SolrServiceHelper.getSolrDocument(data);
try {
UpdateResponse ur = client.add(doc);
if (solrConfig.isForceCommit()) {
client.commit();
}
} catch (Exception e) {
...
}
return true;
} else {
...
return false;
}
}
/**
* Proxy the request to solr
* @param queryParams query params
* @return
*/
public String doGet(@RequestParam(value = "q", required = true) String queryParams) {
queryParams = queryParams.replaceAll("\\n", "");
logger.debug("Processing query params: {} ", queryParams);
String solrQueryUrl = this.solrConfig.getSolrQueryUrl() + queryParams;
logger.info("solr request Built {} ", solrQueryUrl);
String result = null;
try {
result = IOUtils.toString(new URI(solrQueryUrl));
} catch (IOException | URISyntaxException e) {
logger.error("Failed to get solr response for queryUrl {} ", solrQueryUrl, e);
}
return result;
}
@QueueCallback(QueueCallbackType.SHUTDOWN)
public void stop() {
logger.info("Solr Client stopped");
try {
this.client.close();
this.connectedToSolr = false;
} catch (IOException e) {
logger.warn("Exception while closing the solr client ", e);
}
}
}
Pretty simple. Mainly for an example. Now we want to access this from multiple threads since SOLR can block.
To do this we will use a RoundRobinServiceWorkerBuilder
which creates a RoundRobinServiceWorker
. To get more background on workers in QBit read sharded service workers and service workers.
A RoundRobinServiceWorker
is a start-able service dispatcher (Startable
, ServiceMethodDispatcher
) which can be registered with a ServiceBundle
. A ServiceMethodDispatcher
is an object that can dispatch method calls to a service.
final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder();
final CassandraService cassandraService = new CassandraService(config.cassandra);
/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
.roundRobinServiceWorkerBuilder().setWorkerCount(16);
/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(()
-> new SolrServiceImpl(config.solr));
/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();
/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();
/* Create other end points and register them with service endpoint server. */
final SolrServiceEndpoint solrServiceEndpoint = new SolrServiceEndpoint(solrWorkers);
final EventStorageService eventStorageService = new EventStorageService(cassandraService);
//final EventManager eventManager = managedServiceBuilder.getEventManager(); In 0.8.16+
final EventManager eventManager = QBit.factory().systemEventManager();
final IngestionService ingestionService = new IngestionService(eventManager);
managedServiceBuilder.getEndpointServerBuilder().setUri("/").build()
.initServices( cassandraService,
eventStorageService,
ingestionService,
solrServiceEndpoint
)
.startServer();
Notice this code that creates a RoundRobinServiceWorkerBuilder
.
/* Create the round robin dispatcher with 16 threads. */
final RoundRobinServiceWorkerBuilder roundRobinServiceWorkerBuilder = RoundRobinServiceWorkerBuilder
.roundRobinServiceWorkerBuilder().setWorkerCount(16);
Above we are creating the builder and setting the number of workers for the round robin dispatcher. The default is to set the number equal to the number of available CPUs. Next we need to tell the builder how to create the service impl objects as follows:
/* Register a callback to create instances. */
roundRobinServiceWorkerBuilder.setServiceObjectSupplier(()
-> new SolrServiceImpl(config.solr));
NOTE: Note that you use RoundRobinServiceWorkerBuilder
when the services are stateless (other than connection state) and you use ShardedServiceWorkerBuilder
if you must maintain sharded state (caches or some such).
A ServiceBundle
knows how to deal with a collection of addressable ServiceMethodDispatcher
s. Thus to use the RoundRobinServiceWorker
we need to use a service bundle. Therefore, we create a service bundle and register the service worker with it.
/* Build and start the dispatcher. */
final ServiceMethodDispatcher serviceMethodDispatcher = roundRobinServiceWorkerBuilder.build();
serviceMethodDispatcher.start();
/* Create a service bundle and register the serviceMethodDispatcher with the bundle. */
final ServiceBundle bundle = managedServiceBuilder.createServiceBundleBuilder().setAddress("/").build();
bundle.addServiceConsumer("/solrWorkers", serviceMethodDispatcher);
final SolrService solrWorkers = bundle.createLocalProxy(SolrService.class, "/solrWorkers");
bundle.start();
Service bundles do not auto flush, and we are using an interface from a service bundle from our SolrServiceEndpoint
instance. Therefore, we should use a Reactor
. A QBit Reactor
is owned by a service that is siting behind a service queue (ServiceQueue
). You can register services to be flushed with a reactor
, you can register for repeating jobs with the reactor
, and you can coordinate callbacks with the reactor
. The reactor
has a process method that needs to be periodically called during idle times, when batch limits (queue is full) are met and when the queue is empty. We do that by calling the process method as follows:
@RequestMapping(value = "/storage/solr", method = RequestMethod.ALL)
public class SolrServiceEndpoint {
private final SolrService solrService;
private final Reactor reactor;
public SolrServiceEndpoint(final SolrService solrService) {
this.solrService = solrService;
reactor = ReactorBuilder.reactorBuilder().build();
reactor.addServiceToFlush(solrService);
}
@OnEvent(IngestionService.NEW_EVENT_CHANNEL)
public void storeEvent(final Event event) {
solrService.storeEvent(event);
}
@OnEvent(IngestionService.NEW_TIMESERIES_CHANNEL)
public void storeTimeSeries(final TimeSeries timeSeries) {
solrService.storeTimeSeries(timeSeries);
}
/**
* Proxy the request to solr
*
* @param queryParams
* @return
*/
@RequestMapping(value = "/get", method = RequestMethod.GET)
public void get(final Callback<String> callback, final @RequestParam(value = "q", required = true) String queryParams) {
solrService.get(callback, queryParams);
}
@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
public void process() {
reactor.process();
}
}
Notice that the process
method of SolrServiceEndpoint
uses the QueueCallBack
annotation and enums (@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT}
), and then all it does it call reactor.process
. In the constructor, we registered the solrService
service proxy with the reactor
.
public SolrServiceEndpoint(final SolrService solrService) {
this.solrService = solrService;
reactor = ReactorBuilder.reactorBuilder().build();
reactor.addServiceToFlush(solrService);
}
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting