layout | title | categories | parent | weight |
---|---|---|---|---|
post |
Map-Reduce Pattern - Executors Example |
SBP |
processing.html |
400 |
{% tip %}
Summary: the Map-Reduce Pattern - This example illustrates the usage of Executors Remoting (Service Executors) and Task Executors to process data in parallel.
Author: Shay Hassidim, Deputy CTO, GigaSpaces
Recently tested with GigaSpaces version: XAP 7.1
Last Update: Dec 2010
{% toc minLevel=1|maxLevel=1|type=flat|separator=pipe %}
{% endtip %}
The Map-Reduce pattern is a popular pattern used in distributed systems to process data in parallel. This example illustrates the usage of Executors Remoting (Service Executors) and Task Executors to execute your business logic on a remote process that is collocated with a space in a parallel manner.
- The Executors Remoting should be used when you would like to export service method(s) for remote clients to be invoked.
- The Task Executors should be used when you would like to transport business logic to the server side to be executed remotely.
In both cases, the business logic will be invoked with a collocated space.
- Download the example and extract the zip file. Open your IDE and import the project files.
- Set the project libraries to have the correct GigaSpaces libraries location. Make sure your project libraries list will include all the libraries located at
gigaspaces-xap\lib\required
.
{% toczone minLevel=2|maxLevel=2|type=flat|separator=pipe|location=top %} The example will illustrate a simple map-reduce implementation.
A client writing some Account objects into the Data Grid. Later it will calculate the average balance for all the Accounts via a DistributedTask
that is sent to each partition to be executed:
The Task will read all the Account objects at the collocated partition and calculate the average balance for these objects and return the result back to the client. The results sent from each partition back to the client will be aggregated (via the DistributedTask
reducer implementation at the client side) and the final result will be displayed.
The example code will have the following implemented:
- Task
- Client
The Task implements the DistributedTask
interface. It includes the execute
and the reduce
methods:
{% highlight java %} package org.test.executor;
import java.sql.Time; import java.util.List;
import org.openspaces.core.GigaSpace; import org.openspaces.core.executor.DistributedTask; import org.openspaces.core.executor.TaskGigaSpace; import com.gigaspaces.annotation.pojo.SpaceRouting; import com.gigaspaces.async.AsyncResult;
public class MyTask implements DistributedTask<Integer, Integer>{
@TaskGigaSpace
transient GigaSpace space;
public Integer execute() throws Exception {
Account templ = new Account();
Account accounts[] = space.readMultiple(templ , Integer.MAX_VALUE);
int total = 0;
for (Account account : accounts) {
total += account.getBalance();
}
Time t = new Time(System.currentTimeMillis());
System.out.println(t + " MyTask execute called at "+space.getSpace().getURL().getContainerName() +
" - total is:" + total );
return total/accounts.length;
}
int routing;
@SpaceRouting
public Integer routing() {
return routing;
}
public Integer reduce(List<AsyncResult<Integer>> results) throws Exception {
Integer total_result =0;
int partitions=0;
for (AsyncResult<Integer> result : results) {
if (result.getException() != null) {
throw result.getException();
}
partitions++;
int temp_result = result.getResult().intValue();
total_result += temp_result ;
}
return total_result/partitions;
}
} {% endhighlight %}
The client invokes the Task on the remote space in sync mode using the following:
{% highlight java %} space = new UrlSpaceConfigurer("jini:////space").space(); gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); AsyncFuture future =gigaSpace.execute(new MyTask()); Integer result = future.get(); {% endhighlight %}
The client invokes the Task on the remote space in A-sync mode using the following:
{% highlight java %} space = new UrlSpaceConfigurer("jini:////space").space(); gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); gigaSpace.execute(new MyTask(),new ExecutorTaskClientMain ()); {% endhighlight %}
ExecutorTaskClientMain implements AsyncFutureListener:
{% highlight java %} public void onResult(AsyncResult result) { System.out.println(new Time(System.currentTimeMillis()) + " - Client got Result:" + result.getResult() ); } {% endhighlight %}
Using IDE: Set your IDe to have the Following: Click Run. This will start the clustered space within your IDE.
Using CLI: To start the clustered space with 2 partitions run the following:
{% highlight java %} \gigaspaces-xap\bin\bin>puInstance -cluster schema=partitioned total_members=2 ..\deploy\templates\datagrid {% endhighlight %}
When you start the space make sure you see both partitions started before you run the client:
{% highlight java %} 2010-12-17 14:02:20,453 INFO [com.gigaspaces.core.common] - Space [space_container1:space] with url [/./space?cluster_schema=partitioned&total_members=2&id=1&schema=default&groups=gigaspaces-7.1.2-XAPPremium-ga&state=started] started successfully 2010-12-17 14:03:04,187 INFO [com.gigaspaces.core.common] - Space [space_container2:space] with url [/./space?cluster_schema=partitioned&total_members=2&id=2&schema=default&groups=gigaspaces-7.1.2-XAPPremium-ga&state=started] started successfully {% endhighlight %}
Run the Client Application (ExecutorTaskClientMain.java).
The ExecutorTaskClientMain requires the following as application arguments: Sync mode:
{% highlight java %} org.test.executor.ExecutorTaskClientMain sync {% endhighlight %}
A-Sync mode:
{% highlight java %} org.test.executor.ExecutorTaskClientMain async {% endhighlight %}
{% highlight java %} 14:14:16 MyTask execute called at space_container1 - total is:24500 14:14:16 MyTask execute called at space_container2 - total is:25000 14:14:17 MyTask execute called at space_container1 - total is:24500 14:14:17 MyTask execute called at space_container2 - total is:25000 14:14:18 MyTask execute called at space_container1 - total is:24500 14:14:18 MyTask execute called at space_container2 - total is:25000 14:14:19 MyTask execute called at space_container1 - total is:24500 14:14:19 MyTask execute called at space_container2 - total is:25000 14:14:20 MyTask execute called at space_container2 - total is:25000 14:14:20 MyTask execute called at space_container1 - total is:24500 14:14:21 MyTask execute called at space_container1 - total is:24500 14:14:21 MyTask execute called at space_container2 - total is:25000 14:14:22 MyTask execute called at space_container2 - total is:25000 14:14:22 MyTask execute called at space_container1 - total is:24500 14:14:23 MyTask execute called at space_container1 - total is:24500 14:14:23 MyTask execute called at space_container2 - total is:25000 14:14:24 MyTask execute called at space_container1 - total is:24500 14:14:24 MyTask execute called at space_container2 - total is:25000 14:14:25 MyTask execute called at space_container1 - total is:24500 14:14:25 MyTask execute called at space_container2 - total is:25000 {% endhighlight %}
{% highlight java %} Sync Executor example started Log file: C:\gigaspaces-xap-premium-7.1.2-ga\logs\2010-12-17~14.14-gigaspaces-service-207.172.165.179-6516.log 14:14:16 - Client calling MyTask execute sync 14:14:16 - Client got Result:495 14:14:17 - Client calling MyTask execute sync 14:14:17 - Client got Result:495 14:14:18 - Client calling MyTask execute sync 14:14:18 - Client got Result:495 14:14:19 - Client calling MyTask execute sync 14:14:19 - Client got Result:495 14:14:20 - Client calling MyTask execute sync 14:14:20 - Client got Result:495 14:14:21 - Client calling MyTask execute sync 14:14:21 - Client got Result:495 14:14:22 - Client calling MyTask execute sync 14:14:22 - Client got Result:495 14:14:23 - Client calling MyTask execute sync 14:14:23 - Client got Result:495 14:14:24 - Client calling MyTask execute sync 14:14:24 - Client got Result:495 14:14:25 - Client calling MyTask execute sync 14:14:25 - Client got Result:495 {% endhighlight %}
You can view the space operations statistics by running the \gigaspaces-xap\bin\gs-ui
:
{% endtoczone %}
{% toczone minLevel=2|maxLevel=2|type=flat|separator=pipe|location=top %} The example will have a clustered space with a collocated service running. A client will be invoking the service: The results created at each partition will be sent to the client. These will be aggregated via the reducer implementation at the client side and displayed:
This example illustrates simple Service Executors usage in Synchronous mode and Asynchronous mode. Your code should have the following implemented:
- Service Interface
- Service Implementation
- Service Result Reducer (Client Side)
- Client
The Service Interface includes 2 methods. One used to invoke the Service method in Synchronous mode and another used to invoke the Service method Asynchronous mode:
{% highlight java %} import com.gigaspaces.async.AsyncFuture;
public interface IDataProcessor { Integer processData(Object data); AsyncFuture asyncProcessData(Object data); } {% endhighlight %}
The Service Implementation includes some business logic for both of these methods:
{% highlight java %} import java.sql.Time; import org.openspaces.core.GigaSpace; import org.openspaces.core.cluster.ClusterInfo; import org.openspaces.core.cluster.ClusterInfoContext; import org.openspaces.core.context.GigaSpaceContext; import org.openspaces.remoting.RemotingService; import com.gigaspaces.async.AsyncFuture;
@RemotingService public class DataProcessorService implements IDataProcessor {
@ClusterInfoContext
public ClusterInfo clusteinfo;
@GigaSpaceContext
transient GigaSpace gigaSpace;
public AsyncFuture<Integer> asyncProcessData(Object data)
{
return null;
}
public Integer processData(Object data) {
Account templ = new Account();
Account accounts[] = gigaSpace.readMultiple(templ , Integer.MAX_VALUE);
int total = 0;
for (Account account : accounts) {
total += account.getBalance();
}
Time t = new Time(System.currentTimeMillis());
System.out.println(t + " MyTask execute called at "+gigaSpace.getSpace().getURL().getContainerName() + " - total is:" + total );
return total/accounts.length;
}
} {% endhighlight %}
The pu.xml
used to export the Service and start the space described below:
{% highlight xml %}
<context:component-scan base-package="org.test.executor"/>
os-core:giga-space-context/
<os-remoting:annotation-support />
<os-core:embedded-space id="space" name="space" />
<os-core:giga-space id="gigaSpace" space="space"/>
<os-remoting:service-exporter id="serviceExporter" />
{% endhighlight %}{% note %}
The context:component-scan
, os-remoting:service-exporter
and os-remoting:annotation-support
used to allow the system to locate classes annotated with RemotingService
and export these implicitly.
{% endnote %}
The Service Result Reducer is called at the client side and aggregates results sent from all invoked services (collocated with all space partitions).
The Reducer implements the RemoteResultReducer
interface:
{% highlight java %} import org.openspaces.remoting.RemoteResultReducer; import org.openspaces.remoting.SpaceRemotingInvocation; import org.openspaces.remoting.SpaceRemotingResult;
public class DataProcessorServiceReducer implements RemoteResultReducer<Integer, Integer>{
public Integer reduce(SpaceRemotingResult<Integer>[] results, SpaceRemotingInvocation sri) throws Exception {
int total_result =0;
for (int i =0 ;i<results.length ; i++)
{
int temp_result = results[i].getResult().intValue();
total_result += temp_result ;
}
return total_result/results.length ;
}
} {% endhighlight %}
The client invokes the service in Synchronous mode using the following:
{% highlight java %} IJSpace space = new UrlSpaceConfigurer("jini:////space").space(); GigaSpace gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); IDataProcessor dataProcessor = new ExecutorRemotingProxyConfigurer (gigaSpace, IDataProcessor.class).broadcast(new DataProcessorServiceReducer()).proxy(); Integer result = dataProcessor.processData("A" + count); System.out.println(new Time(System.currentTimeMillis()) + " - Client got Result:" + result.intValue() ); {% endhighlight %}
- The client getting a proxy to a remote space
- The client constructing Service proxy using the
ExecutorRemotingProxyConfigurer
. TheDataProcessorServiceReducer
is used when constructing Service proxy . - The client invokes the service in Synchronous mode
- The result is being displayed.
Using IDE: Set your IDe to have the Following: Click Run. This will start the clustered space and the Services within your IDE.
Using CLI: To start the clustered space with 2 partitions and export the Service run the following:
{% highlight java %} \gigaspaces-xap\bin\bin>puInstance -cluster schema=partitioned total_members=2 \ExecutorExample\classes {% endhighlight %}
Where the \ExecutorExample\classes
should include the processing unit pu.xml under META-INF\spring\pu.xml
and relevant Service class files.
When you start the space make sure you see both partitions started before you run the client:
{% highlight java %} 2010-12-17 14:02:20,453 INFO [com.gigaspaces.core.common] - Space [space_container1:space] with url [/./space?cluster_schema=partitioned&total_members=2&id=1&schema=default&groups=gigaspaces-7.1.2-XAPPremium-ga&state=started] started successfully 2010-12-17 14:03:04,187 INFO [com.gigaspaces.core.common] - Space [space_container2:space] with url [/./space?cluster_schema=partitioned&total_members=2&id=2&schema=default&groups=gigaspaces-7.1.2-XAPPremium-ga&state=started] started successfully {% endhighlight %}
Run the Client Application (ExecutorClientMain.java) using the following:
{% highlight java %} org.test.executor.ExecutorClientMain sync {% endhighlight %}
{% highlight java %} 14:12:18 MyTask execute called at space_container2 - total is:25000 14:12:18 MyTask execute called at space_container1 - total is:24500 14:12:19 MyTask execute called at space_container1 - total is:24500 14:12:19 MyTask execute called at space_container2 - total is:25000 14:12:20 MyTask execute called at space_container1 - total is:24500 14:12:21 MyTask execute called at space_container2 - total is:25000 14:12:22 MyTask execute called at space_container2 - total is:25000 14:12:22 MyTask execute called at space_container1 - total is:24500 14:12:23 MyTask execute called at space_container2 - total is:25000 14:12:23 MyTask execute called at space_container1 - total is:24500 14:12:24 MyTask execute called at space_container2 - total is:25000 14:12:24 MyTask execute called at space_container1 - total is:24500 14:12:25 MyTask execute called at space_container2 - total is:25000 14:12:25 MyTask execute called at space_container1 - total is:24500 14:12:26 MyTask execute called at space_container2 - total is:25000 14:12:26 MyTask execute called at space_container1 - total is:24500 14:12:27 MyTask execute called at space_container2 - total is:25000 14:12:27 MyTask execute called at space_container1 - total is:24500 {% endhighlight %}
{% highlight java %} Sync Service Executor example started Log file: C:\gigaspaces-xap-premium-7.1.2-ga\logs\2010-12-17~14.12-gigaspaces-service-207.172.165.179-6472.log 14:12:18 - Client calling sync dataProcessor 14:12:18 - Client got Result:495 14:12:19 - Client calling sync dataProcessor 14:12:19 - Client got Result:495 14:12:20 - Client calling sync dataProcessor 14:12:21 - Client got Result:495 14:12:22 - Client calling sync dataProcessor 14:12:22 - Client got Result:495 14:12:23 - Client calling sync dataProcessor 14:12:23 - Client got Result:495 14:12:24 - Client calling sync dataProcessor 14:12:24 - Client got Result:495 14:12:25 - Client calling sync dataProcessor 14:12:25 - Client got Result:495 14:12:26 - Client calling sync dataProcessor 14:12:26 - Client got Result:495 14:12:27 - Client calling sync dataProcessor 14:12:27 - Client got Result:495 {% endhighlight %}
You can view the space operations statistics by running the \gigaspaces-xap\bin\gs-ui
:
{% endtoczone %}