Distributed Threadpool using Apache Ignite

Distributed Threadpool using Apache Ignite

Take your Isolated Threadpool to Distributed Threadpool within 5 minutes.

What is Apache Ignite?

Apache Ignite is a distributed database for high-performance computing with in-memory speed.

Apache Ignite's database utilizes RAM as the default storage and processing tier, thus, belonging to the class of in-memory computing platforms. The disk tier is optional but, once enabled, will hold the full data set whereas the memory tier will cache full or partial data set depending on its capacity.

Major Features include Distributed SQL, Multi-tier Storage, Compute Grid, ML, etc. In this blog post, we will see how we can quickly use Apache Ignite inbuilt Executor Service to process tasks within the ThreadPool in a distributed way.

Pre-requisites

  • Maven for dependency management.
  • JDK 8+
  • IDE of your choice. (We are using Spring Tool Suite.)

Setup

As part of the local setup, we just simple single dependency as below to use Apache Ignite Core APIs to run Ignite Node. We will use the latest stable version v2.11.0

<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-core</artifactId>
    <version2.11.0</version>
</dependency>

Threadpool Task Processing w/o Apache Ignite

To demonstrate, we will implement our own java.lang.Runnable Class for debugging purposes. We will create Threadpool with the size of 16 (we will come to this later, why we are using 16 as the initial size)

MyRunnable.java

package in.virendraoswal;

import org.apache.ignite.lang.IgniteRunnable;

public class MyRunnable implements IgniteRunnable {
    private static final long serialVersionUID = -8491962424023801613L;
    private int _index;

    MyRunnable(int index) {
        this._index = index;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
        }
        System.out.println(String.format("Processed %d", _index));
    }
}

Our MyRunnable class does basic Sleep of 500ms simulating processing and then prints Thread Name and some debugging console logs.

As we can see we have implemented org.apache.ignite.lang.IgniteRunnable which extends java.lang.Runnable so that we can use the same Class MyRunnable across Ignite and Non-Ignite Threadpool.

ThreadPoolWithoutIgnite.java

Our Threadpool without Apache Ignite is pretty straightforward Vanilla implementation of executor Service.

package in.virendraoswal;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolWithoutIgnite {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(16);

        long start = System.currentTimeMillis();

        for (int i = 1; i <= 50; i++) {
            service.submit(new MyRunnable(i));
        }

        service.shutdown();
        service.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println(String.format("Time taken %dms.", (System.currentTimeMillis() - start)));
    }
}

Logic for above implementation:

  1. Create thread pool of 16 threads.
  2. Generate dummy 50 tasks and submit.
  3. Wait for task completion.
  4. Output time taken to process all tasks.

Output for above program ThreadPoolWithoutIgnite.java

We can see it took more than 2 seconds to process all tasks (~2.1 secs in this case).

1-ThreadpoolWithoutIgnite-Output.JPG

Distributed Threadpool with Apache Ignite

As part of this setup, we will create 2 Apache Ignite Nodes (effectively 2 JVMs) which form an Apache Ignite Cluster internally and process the same MyRunnable tasks across both JVMs i.e. Distributed Threadpool Processing.

IgniteNode1.java

package in.virendraoswal;

import java.util.Collections;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

public class IgniteNode1 {
    public static void main(String[] args) throws IgniteException, InterruptedException {
        IgniteConfiguration firstCfg = new IgniteConfiguration();
        firstCfg.setIgniteInstanceName("first");
        // Explicitly configure TCP discovery SPI to provide list of initial nodes
        // from the first cluster.
        TcpDiscoverySpi firstDiscoverySpi = new TcpDiscoverySpi();
        // Initial local port to listen to.
        firstDiscoverySpi.setLocalPort(48500);
        // Changing local port range. This is an optional action.
        firstDiscoverySpi.setLocalPortRange(20);
        TcpDiscoveryVmIpFinder firstIpFinder = new TcpDiscoveryVmIpFinder();
        // Addresses and port range of the nodes from the first cluster.
        // 127.0.0.1 can be replaced with actual IP addresses or host names.
        // The port range is optional.
        firstIpFinder.setAddresses(Collections.singletonList("127.0.0.1:48500..48520"));
        // Overriding IP finder.
        firstDiscoverySpi.setIpFinder(firstIpFinder);
        // Overriding discovery SPI.
        firstCfg.setDiscoverySpi(firstDiscoverySpi);
        // Starting a node.
        Ignite ignite = Ignition.start(firstCfg);
    }
}

IgniteNode1.java Implementation Logic

  1. Initialize IgniteConfiguration class specifying Name to uniquely identify Node.
  2. Define TcpDiscoverySpi API so Nodes with particular IP range and port form cluster out of the box using TcpDiscoveryVmIpFinder on Startup of Node.
  3. Start Ignite node with configuration defined.

IgniteNode1 Startup Logs

As you can see Ignite Node has started, Ignite Topology shows servers=1 which represents currently only one node that is part of the cluster.

2-IgniteNode1-Startup.JPG

We will now start Node2, which then will form a cluster and also distribute tasks to be processed.

IgniteNode2.java

package in.virendraoswal;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;

public class IgniteNode2 {
    public static void main(String[] args) throws IgniteException, InterruptedException {
        IgniteConfiguration firstCfg = new IgniteConfiguration();
        firstCfg.setIgniteInstanceName("second");

        // Explicitly configure TCP discovery SPI to provide list of initial nodes
        // from the first cluster.
        TcpDiscoverySpi firstDiscoverySpi = new TcpDiscoverySpi();

        // Initial local port to listen to.
        firstDiscoverySpi.setLocalPort(48500);

        // Changing local port range. This is an optional action.
        firstDiscoverySpi.setLocalPortRange(20);

        TcpDiscoveryVmIpFinder firstIpFinder = new TcpDiscoveryVmIpFinder();

        // Addresses and port range of the nodes from the first cluster.
        // 127.0.0.1 can be replaced with actual IP addresses or host names.
        // The port range is optional.
        firstIpFinder.setAddresses(Collections.singletonList("127.0.0.1:48500..48520"));

        // Overriding IP finder.
        firstDiscoverySpi.setIpFinder(firstIpFinder);

        // Overriding discovery SPI.
        firstCfg.setDiscoverySpi(firstDiscoverySpi);

        // Starting a node.
        Ignite ignite = Ignition.start(firstCfg);

        ExecutorService service = ignite.executorService();

        long start = System.currentTimeMillis();

        for (int i = 1; i <= 50; i++) {
            service.submit(new MyRunnable(i));
        }
        service.shutdown();
        service.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println(String.format("Time taken %dms.", (System.currentTimeMillis() - start)));

        ignite.close();
    }

}

IgniteNode2.java implemenation Logic

  1. All steps (1-3) defined above in IgniteNode1.java are repeated.
  2. We consume ExecutorService API from Ignite Object created.
  3. Submit 50 MyRunnable tasks to Threadpool.
  4. Wait for Tasks to be processed.
  5. Capture Time taken to process all tasks.

IgniteNode2.java startup logs

As we can see on startup of IgniteNode2.java, we can see Apache Ignite Topology updated with servers=2 representing Ignite Cluster now contains 2 nodes which can be used for processing.

3-IgniteNOde2-Startup.JPG

IgniteNode1.java startup logs will be updated to show the same topology of 2 nodes.

Processing Logs

IgniteNode1.java

As we can see MyRunnable task distributed my IgniteNode2 were processed in IgniteNode1 as well as per below logs.

4-IgnoteNode1-processing.JPG

IgniteNode2.java

IgniteNode2 also processes half of the tasks that are part of threadpool, the below logs demonstrate the same. Also if you see the time taken to process the same tasks on the same machine with the same thread pool size is now reduced by 50% to 1 second.

5-ignitenode2-processiong.JPG

Few Points to remember:

  • If see logs of processing tasks for both nodes, they processed 25 each. This is due to Apache Ignite Cluster processes task in Round Robin Fashion which can we override though.
  • We used a Threadpool Size of 16 without Apache Ignite in the first part of the demonstration, because the Public Threadpool Size of Apache Ignite is 16. So we can judge performance on similar lines.

Voila, with a few changes you can convert your Threadpool to Distributed ThreadPool with Apache Ignite. More nodes you add to the cluster will automatically achieve better horizontal scaling without having to worry about the distribution of tasks.

Resources:

Thank you for reading, If you have reached it so far, please like the article, It will encourage me to write more such articles. Do share your valuable suggestions, I appreciate your honest feedback and suggestions!

I would love to connect with you at Twitter | LinkedIn.

Did you find this article valuable?

Support Virendra Oswal by becoming a sponsor. Any amount is appreciated!