Java Scatter Gather – CompletableFuture to create async thread and join results (allOf)

There are various options to implement a Scatter Gather it using Simple Thread management, Spring Reactor package or using plain old Java implementation.

And I choose to write this article with example of plain old Java CompletableFuture component because it will come handy without using external jars & utilities.

In this thread I will to cover up the definition and simple CompletableFuture implementation to create a Scatter-Gather implementation.

First of all, It is important to understand what Scatter-Gather is….

What is Scatter-Gather?

Scatter-Gather is a routing message processor in Mule ESB runtime that sends a request message to multiple targets concurrently. It then collects the responses from all routes and aggregates them back into a single response.

Use-case Example

When your application needs to process various similar tasks independently then if we use sequential processing for same. As a result response time increases and hence customer drop-outs.

Certainly we can improve it by scattering or executing the various tasks with parallel threads and finally when all threads are complete, then we merge the output and send back to the calling function.

In contrast to sequential execution of activities, Scatter-Gather – Scatters the task execution and Gather the result before moving forward.

Java Scatter Gather CompletableFuture
Java Scatter Gather

Real World Example :

Real time example could be sites like www.trivago.com or www.skyscanner.com where these price comparison sites connects to various service providers using parallel threads and merge it together before showing it on user’s browser.

I hope the understanding is clear and now I will put a very simple example on how to create this with simple Java utility.

How to write Scatter-Gather with plain Java?

This is a running example and it’s source can also be downloaded from my public github repos at https://github.com/gupta1vipin/java-scatter-gather

InputPojo.java is the input object to be processed during async method execution

package com.scattergather.demo;
public class InputPojo {
	private String name;
	private Integer age;
	public InputPojo(String namex, int agex) {
		this.name=namex;
		this.age=agex;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Integer getAge() {
		return age;
	}
	public void setAge(Integer age) {
		this.age = age;
	}
	@Override
	public String toString() {
		return "MyPojo [name=" + name + ", age=" + age + "]";
	}
}

OutputPojo.java is the expected output object (could be replaced with any object)

package com.scattergather.demo;
public class OutputPojo{
	public OutputPojo(String value) {
		this.value = value;
	}
	private String value;
	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}
        @Override
	public String toString() {
		return "MyResult [value=" + value + "]";
	}
}

ScatterGatherUtil.java processes the implementations. One needs to override perform method to provide own implementation.

In perform method, write the code which you want to run as scatter-gather for a list of objects

package com.scattergather.demo;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

/**
 * The Class ScatterGatherUtil. Provides raw implementation to execute
 * process/function asynchronously. With functions like
 * {@link #processAsync(List)}, utility initiate a new thread to process a
 * request, waits for completion of all async process and finally returns the
 * data as a list by combining all results.
 *
 * One needs to override {@link #perform(Object)} function to provide a behavior
 * to the functionality to be executed in a scatter-gather mode.
 *
 * <b>Note - This utility needs improvement to handle exception handling and
 * timeout scenarios </b>
 *
 * @param <O> the generic type, Output type object
 * @param <I> the generic type, Input Type Object
 *
 * @author Vipin Gupta
 */
public abstract class ScatterGatherUtil<O, I> {

	/**
	 * Process async function accepts inputs as a list of provided objects to
	 * execute and combine result as a scatter gather function.
	 *
	 * Please note that {@link #perform(Object)} must be overridden before
	 * calling this function.
	 *
	 * @param inputList the input list
	 * @return the lists
	 * @throws InterruptedException the interrupted exception
	 * @throws ExecutionException the execution exception
	 */
	public List<O> processAsync(final List<I> inputList) throws InterruptedException, ExecutionException{

		final List<CompletableFuture<O>> futures = inputList.stream()
				        .map(inputObj -> performAsCompletableFuture(inputObj))
				        .collect(Collectors.toList());

		final CompletableFuture<Void> allFutures = CompletableFuture.allOf(
				futures.toArray(new CompletableFuture[futures.size()])
		);
		
		final CompletableFuture<List<O>> allOfFutures = allFutures.thenApply(v -> {
			   return futures.stream()
			           .map(future -> future.join())
			           .collect(Collectors.toList());
		});

		return allOfFutures.get();
	}

	/**
	 * Perform as completable future.
	 *
	 * @param inputObject the input object
	 * @return the completable future
	 */
	protected CompletableFuture<O> performAsCompletableFuture(final I inputObject) {
		CompletableFuture<O> result = CompletableFuture.supplyAsync(() -> perform(inputObject));
		result.thenRun(  
				() -> System.out.println("Computation finished for thread : "+Thread.currentThread().getId()));
		return result;
	}

	/**
	 * Perform function executes the behavior provided to it by it's
	 * implementation class.
	 *
	 * @param inputObject the input object
	 * @return the output type
	 */
	public abstract O perform(I inputObject);

}

ScatterGatherTest.java is the test calls which gives a sample implementation of ScatterGatherUtil utility.

package com.scattergather.demo;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class ScatterGatherTest {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		testMe();
	}
	private static void testMe() throws InterruptedException, ExecutionException {
		ScatterGatherUtil<OutputPojo, InputPojo> testMe = new ScatterGatherUtil<OutputPojo, InputPojo>() {
			
			@Override
			public OutputPojo perform(InputPojo inputPojo) {
				try {
					System.out.println("new Thread started..." +Thread.currentThread().getId());
					if(inputPojo.getAge()==1 || inputPojo.getAge()==3 || inputPojo.getAge()==6) {				TimeUnit.SECONDS.sleep(inputPojo.getAge());
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return new OutputPojo(inputPojo.getName());
			}
		};
		List<InputPojo> inputList = ProvideMeData.prepareInputData();
		List<OutputPojo> outputList = testMe.processAsync(inputList);
		System.out.println(outputList);
	}
}
class ProvideMeData {
	 public static List<InputPojo> prepareInputData() {
			final InputPojo mp1 =  new InputPojo("mp1", 1);
			final InputPojo mp2 =  new InputPojo("mp2", 2);
			final InputPojo mp3 =  new InputPojo("mp3", 3);
			final InputPojo mp4 =  new InputPojo("mp4", 4);
			final InputPojo mp5 =  new InputPojo("mp5", 5);
			final InputPojo mp6 =  new InputPojo("mp6", 6);

			List<InputPojo> inputList = new ArrayList<InputPojo>();
			inputList.add(mp1);
			inputList.add(mp2);
			inputList.add(mp3);
			inputList.add(mp4);
			inputList.add(mp5);
			inputList.add(mp6);
			return inputList;
		}
}

This is a running example and it’s source can also be downloaded from my public github repos at https://github.com/gupta1vipin/java-scatter-gather

If you like this blog on how to build Java Scatter Gather with CompletableFuture. Give it a like.

Furthermore, Read more around Java 8, 9, 10, 11 features. Check http://www.hybriscx.com/category/java-11/ and http://www.hybriscx.com/java-8/

Hope you find it helpful. Thanks!

2880cookie-checkJava Scatter Gather – CompletableFuture to create async thread and join results (allOf)

Leave a Reply

Your email address will not be published.