GHads mind

developers thoughts & finds

Archive for December 2011

Blocking parallel Runnable execution in Java

leave a comment »

Been quite busy lately… But finally a new post:

Ok, let’S get started. For a new approch of updating MongoDB parallel I’m partitioning a SQL-Query and want those queries to execute parallel with worker threads. But i have actions before and after the execution point so I must wait until the last of the workers is done. This can be done with the method of the previous post, but only in a completly asynchronoius way. What I want for this is a blocking way:

  • Put together a list of Runnable workers
  • Call a method to execute those workers parallel
  • The method blocks until all workers are finished

As bonus I do not want to use any concurrent specific constructs inside the threads. A direct call to on workers run() method should behave exactly like a call to the new method with this worker in the parameters list only.

	public static void execute(Collection<Runnable> workers)
			throws Exception
	{
		if (workers == null || workers.isEmpty()) {
			throw new IllegalArgumentException("null or empty");
		}
		final Exception[] exception = new Exception[]{ null };
		final CountDownLatch latch = new CountDownLatch(workers.size());
		final ExecutorService threadPool = Executors.newFixedThreadPool(workers.size() * 2);
		for (final Runnable worker : workers)
		{
			threadPool.submit(new Runnable()
			{
				public void run()
				{
					try
					{
						threadPool.submit(worker).get();
					}
					catch (Exception e)
					{
						exception[0] = e;
					}
					finally
					{
						latch.countDown();
					}
				}
			});
		}
		System.out.println("...now lock");
		latch.await();
		threadPool.shutdownNow();
		if (exception[0] != null) {
			throw exception[0];
		}
	}

Step by step: First the argument is checked to asure no null or empty list is processed. Next a final exception array is initialized to capture at least one exception if a worker thread fails. This can be pumped up to capture an exception per worker by creating an array with the size of the workers collection and throw a MultiException if any of the workers throwed an exception (this is just an axample how it works).

A final latch will be the blocking part. When the current thread reaches the latch.await() command it waits until the latch has been count down to 0. As the latch starts with the number of workers, when all workers are done the current thread unblocks and continues execution.

For worker execution a ThreadPool is created with the double number of worker threads. Why? For each worker there will be a thread that puts the worker into the executor (threadPool.submit(worker)) and waits for the worker to finish (the .get()). That’s what the iteration over the workers does and that’s why all vars are final. When the worker finished either through normal execution or by an exception, the latch is counted down. So when all workers are done, the execute-Method continues after the latch.await() line. Now the executor is shutdown and if any exception occured it is thrown.

That’s it: Any Runnable can be executed without the need to implement a special mechanism inside, as many workers as needed can be executed parallel and the execute-Method blocks until all workers are done. Mission accomplished 🙂

Greetz,
GHad

Written by ghads

December 14, 2011 at 12:18 pm