Distributed Threadpool using Apache Ignite
Take your Isolated Threadpool to Distributed Threadpool within 5 minutes.
What is Apache Ignite?
Apache Ignite is a distributed database for high-performance computing with in-memory speed.
Apache Ignite's database utilizes RAM as the default storage and processing tier, thus, belonging to the class of in-memory computing platforms. The disk tier is optional but, once enabled, will hold the full data set whereas the memory tier will cache full or partial data set depending on its capacity.
Major Features include Distributed SQL, Multi-tier Storage, Compute Grid, ML, etc.
In this blog post, we will see how we can quickly use Apache Ignite inbuilt Executor Service to process tasks within the ThreadPool in a distributed way.
Pre-requisites
- Maven for dependency management.
- JDK 8+
- IDE of your choice. (We are using Spring Tool Suite.)
Setup
As part of the local setup, we just simple single dependency as below to use Apache Ignite Core APIs to run Ignite Node. We will use the latest stable version v2.11.0
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version2.11.0</version>
</dependency>
Threadpool Task Processing w/o Apache Ignite
To demonstrate, we will implement our own java.lang.Runnable Class for debugging purposes. We will create Threadpool with the size of 16 (we will come to this later, why we are using 16 as the initial size)
MyRunnable.java
package in.virendraoswal;
import org.apache.ignite.lang.IgniteRunnable;
public class MyRunnable implements IgniteRunnable {
private static final long serialVersionUID = -8491962424023801613L;
private int _index;
MyRunnable(int index) {
this._index = index;
}
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
}
System.out.println(String.format("Processed %d", _index));
}
}
Our MyRunnable class does basic Sleep of 500ms
simulating processing and then prints Thread Name and some debugging console logs.
As we can see we have implemented org.apache.ignite.lang.IgniteRunnable
which extends java.lang.Runnable
so that we can use the same Class MyRunnable across Ignite and Non-Ignite Threadpool.
ThreadPoolWithoutIgnite.java
Our Threadpool without Apache Ignite is pretty straightforward Vanilla implementation of executor Service.
package in.virendraoswal;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolWithoutIgnite {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(16);
long start = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
service.submit(new MyRunnable(i));
}
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
System.out.println(String.format("Time taken %dms.", (System.currentTimeMillis() - start)));
}
}
Logic for above implementation:
- Create thread pool of 16 threads.
- Generate dummy 50 tasks and submit.
- Wait for task completion.
- Output time taken to process all tasks.
Output for above program ThreadPoolWithoutIgnite.java
We can see it took more than 2 seconds to process all tasks (~2.1 secs in this case).
Distributed Threadpool with Apache Ignite
As part of this setup, we will create 2 Apache Ignite Nodes (effectively 2 JVMs) which form an Apache Ignite Cluster internally and process the same MyRunnable tasks across both JVMs i.e. Distributed Threadpool Processing.
IgniteNode1.java
package in.virendraoswal;
import java.util.Collections;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
public class IgniteNode1 {
public static void main(String[] args) throws IgniteException, InterruptedException {
IgniteConfiguration firstCfg = new IgniteConfiguration();
firstCfg.setIgniteInstanceName("first");
// Explicitly configure TCP discovery SPI to provide list of initial nodes
// from the first cluster.
TcpDiscoverySpi firstDiscoverySpi = new TcpDiscoverySpi();
// Initial local port to listen to.
firstDiscoverySpi.setLocalPort(48500);
// Changing local port range. This is an optional action.
firstDiscoverySpi.setLocalPortRange(20);
TcpDiscoveryVmIpFinder firstIpFinder = new TcpDiscoveryVmIpFinder();
// Addresses and port range of the nodes from the first cluster.
// 127.0.0.1 can be replaced with actual IP addresses or host names.
// The port range is optional.
firstIpFinder.setAddresses(Collections.singletonList("127.0.0.1:48500..48520"));
// Overriding IP finder.
firstDiscoverySpi.setIpFinder(firstIpFinder);
// Overriding discovery SPI.
firstCfg.setDiscoverySpi(firstDiscoverySpi);
// Starting a node.
Ignite ignite = Ignition.start(firstCfg);
}
}
IgniteNode1.java Implementation Logic
- Initialize
IgniteConfiguration
class specifying Name to uniquely identify Node. - Define
TcpDiscoverySpi
API so Nodes with particular IP range and port form cluster out of the box usingTcpDiscoveryVmIpFinder
on Startup of Node. - Start Ignite node with configuration defined.
IgniteNode1 Startup Logs
As you can see Ignite Node has started, Ignite Topology shows servers=1 which represents currently only one node that is part of the cluster.
We will now start Node2, which then will form a cluster and also distribute tasks to be processed.
IgniteNode2.java
package in.virendraoswal;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
public class IgniteNode2 {
public static void main(String[] args) throws IgniteException, InterruptedException {
IgniteConfiguration firstCfg = new IgniteConfiguration();
firstCfg.setIgniteInstanceName("second");
// Explicitly configure TCP discovery SPI to provide list of initial nodes
// from the first cluster.
TcpDiscoverySpi firstDiscoverySpi = new TcpDiscoverySpi();
// Initial local port to listen to.
firstDiscoverySpi.setLocalPort(48500);
// Changing local port range. This is an optional action.
firstDiscoverySpi.setLocalPortRange(20);
TcpDiscoveryVmIpFinder firstIpFinder = new TcpDiscoveryVmIpFinder();
// Addresses and port range of the nodes from the first cluster.
// 127.0.0.1 can be replaced with actual IP addresses or host names.
// The port range is optional.
firstIpFinder.setAddresses(Collections.singletonList("127.0.0.1:48500..48520"));
// Overriding IP finder.
firstDiscoverySpi.setIpFinder(firstIpFinder);
// Overriding discovery SPI.
firstCfg.setDiscoverySpi(firstDiscoverySpi);
// Starting a node.
Ignite ignite = Ignition.start(firstCfg);
ExecutorService service = ignite.executorService();
long start = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
service.submit(new MyRunnable(i));
}
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
System.out.println(String.format("Time taken %dms.", (System.currentTimeMillis() - start)));
ignite.close();
}
}
IgniteNode2.java implemenation Logic
- All steps (1-3) defined above in IgniteNode1.java are repeated.
- We consume ExecutorService API from Ignite Object created.
- Submit 50 MyRunnable tasks to Threadpool.
- Wait for Tasks to be processed.
- Capture Time taken to process all tasks.
IgniteNode2.java startup logs
As we can see on startup of IgniteNode2.java, we can see Apache Ignite Topology updated with servers=2 representing Ignite Cluster now contains 2 nodes which can be used for processing.
IgniteNode1.java startup logs will be updated to show the same topology of 2 nodes.
Processing Logs
IgniteNode1.java
As we can see MyRunnable task distributed my IgniteNode2 were processed in IgniteNode1 as well as per below logs.
IgniteNode2.java
IgniteNode2 also processes half of the tasks that are part of threadpool, the below logs demonstrate the same. Also if you see the time taken to process the same tasks on the same machine with the same thread pool size is now reduced by 50% to 1 second.
Few Points to remember:
- If see logs of processing tasks for both nodes, they processed 25 each. This is due to Apache Ignite Cluster processes task in Round Robin Fashion which can we override though.
- We used a Threadpool Size of 16 without Apache Ignite in the first part of the demonstration, because the Public Threadpool Size of Apache Ignite is 16. So we can judge performance on similar lines.
Voila, with a few changes you can convert your Threadpool to Distributed ThreadPool with Apache Ignite. More nodes you add to the cluster will automatically achieve better horizontal scaling without having to worry about the distribution of tasks.
Resources:
Thank you for reading, If you have reached it so far, please like the article, It will encourage me to write more such articles. Do share your valuable suggestions, I appreciate your honest feedback and suggestions!