GHads mind

developers thoughts & finds

Posts Tagged ‘threads

Blocking parallel Runnable execution in Java

leave a comment »

Been quite busy lately… But finally a new post:

Ok, let’S get started. For a new approch of updating MongoDB parallel I’m partitioning a SQL-Query and want those queries to execute parallel with worker threads. But i have actions before and after the execution point so I must wait until the last of the workers is done. This can be done with the method of the previous post, but only in a completly asynchronoius way. What I want for this is a blocking way:

  • Put together a list of Runnable workers
  • Call a method to execute those workers parallel
  • The method blocks until all workers are finished

As bonus I do not want to use any concurrent specific constructs inside the threads. A direct call to on workers run() method should behave exactly like a call to the new method with this worker in the parameters list only.

	public static void execute(Collection<Runnable> workers)
			throws Exception
	{
		if (workers == null || workers.isEmpty()) {
			throw new IllegalArgumentException("null or empty");
		}
		final Exception[] exception = new Exception[]{ null };
		final CountDownLatch latch = new CountDownLatch(workers.size());
		final ExecutorService threadPool = Executors.newFixedThreadPool(workers.size() * 2);
		for (final Runnable worker : workers)
		{
			threadPool.submit(new Runnable()
			{
				public void run()
				{
					try
					{
						threadPool.submit(worker).get();
					}
					catch (Exception e)
					{
						exception[0] = e;
					}
					finally
					{
						latch.countDown();
					}
				}
			});
		}
		System.out.println("...now lock");
		latch.await();
		threadPool.shutdownNow();
		if (exception[0] != null) {
			throw exception[0];
		}
	}

Step by step: First the argument is checked to asure no null or empty list is processed. Next a final exception array is initialized to capture at least one exception if a worker thread fails. This can be pumped up to capture an exception per worker by creating an array with the size of the workers collection and throw a MultiException if any of the workers throwed an exception (this is just an axample how it works).

A final latch will be the blocking part. When the current thread reaches the latch.await() command it waits until the latch has been count down to 0. As the latch starts with the number of workers, when all workers are done the current thread unblocks and continues execution.

For worker execution a ThreadPool is created with the double number of worker threads. Why? For each worker there will be a thread that puts the worker into the executor (threadPool.submit(worker)) and waits for the worker to finish (the .get()). That’s what the iteration over the workers does and that’s why all vars are final. When the worker finished either through normal execution or by an exception, the latch is counted down. So when all workers are done, the execute-Method continues after the latch.await() line. Now the executor is shutdown and if any exception occured it is thrown.

That’s it: Any Runnable can be executed without the need to implement a special mechanism inside, as many workers as needed can be executed parallel and the execute-Method blocks until all workers are done. Mission accomplished 🙂

Greetz,
GHad

Written by ghads

December 14, 2011 at 12:18 pm

Java CyclicBarrier and Queue usage

leave a comment »

UPDATE: See below for an update about a race-condition I didn’t think of in the first place…

For updating MongoDB from OracleDB I needed a way with multiple threads to overcome networklatency to our MongoDB Server Cluster. Doing writes/updates with 4 threads resulted in a 3-4 times better performance than using only one thread. But how to implement such an update behavior in a good way using JDK6 concurrent classes/utils?

Basicly one update should do the following steps:

  1. Determine all documents to update (partial updates also possible)
  2. Flag all those documents with a unique tag (using UUID for example)
  3. Run updates concurrently and for each updated document remove the update tag
  4. When all update threads are done, remove all still tagged documents and wait for the next update

This can be achieved by combining a CyclicBarrier, a Queue and an Executor also allowing Thread/Object reuse as long as the update progress is guarded in such a way that only one update runs at one time (using Lock for example).

Here is a basic example code that shows how the update works:

public class CyclicBarrierQueueTesting
{

	private static BlockingQueue<String> queue;

	private static CyclicBarrier barrier;

	private static String updateTag;

	private static String poison = "+++ stop +++";

	public static void main(String[] args)
	{
		// prepare update thread pool
		int numberOfThreads = 4;
		queue = new LinkedBlockingQueue<String>();
		barrier = new CyclicBarrier(numberOfThreads, new Runnable()
		{
			public void run()
			{
				// delete all not updated with updateTag
				// ...
				log("Update done, remove not updated: updateTag= " + updateTag);
				updateTag = null;
			}
		});
		ExecutorService executor = Executors.newFixedThreadPool(4);
		for (int i = 0; i < numberOfThreads; i++)
		{
			executor.submit(new Runner(i, poison, queue, barrier));
		}

		// import all
		barrier.reset();
		List<String> sqlPerLanguge = Collections.nCopies(47, "language");
		updateTag = UUID.randomUUID().toString();
		log("Update all, flagged all with updateTag: updateTag= " + updateTag);
		for (String language : sqlPerLanguge)
		{
			// create a sql query per language
			try
			{
				queue.put(language);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
		// add poison for every thread
		for (int i = 0; i < numberOfThreads; i++)
		{
			try
			{
				queue.put(poison);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}

		// wait for next update (NOT needed in real circumstances)
		while (updateTag != null)
		{
			try
			{
				Thread.sleep(1000);
			}
			catch (InterruptedException e)
			{
			}
		}

		// update some pks
		barrier.reset();
		List<String> somePKs = Collections.nCopies(50, "pk");
		updateTag = UUID.randomUUID().toString();
		log("Update some pks, flagged all products for pks with updateTag: updateTag= " + updateTag);
		for (String pk : somePKs)
		{
			// create a sql query per 10 products
			try
			{
				queue.put(pk);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
		// add poison for every thread
		for (int i = 0; i < numberOfThreads; i++)
		{
			try
			{
				queue.put(poison);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}

		// wait for next update (NOT needed in real circumstances)
		while (updateTag != null)
		{
			try
			{
				Thread.sleep(1000);
			}
			catch (InterruptedException e)
			{
			}
		}

		// shutdown
		log("shutdown");
		executor.shutdownNow();
	}

	public static void log(String str)
	{
		System.out.println(str);
	}

	private static class Runner
			implements Runnable
	{

		private int id;
		private String poison;
		private BlockingQueue<String> queue;
		private CyclicBarrier barrier;

		public Runner(int id, String poison, BlockingQueue<String> queue, CyclicBarrier barrier)
		{
			this.id = id;
			this.poison = poison;
			this.queue = queue;
			this.barrier = barrier;
		}

		public void run()
		{
			boolean run = true;
			while (run)
			{
				try
				{
					try
					{
						String query = queue.take();
						if (query.equals(poison))
						{
							barrier.await();
						}
						else
						{
							log("Thread executes update: id= " + id);
							Thread.sleep(100);
						}
					}
					catch (BrokenBarrierException e)
					{
						e.printStackTrace();
						run = false;
					}
				}
				catch (InterruptedException e)
				{
					run = false;
				}
			}
		}
	}
}

Let’s go top down over the code…

First we need to fix the number of threads and init the objects we want to use: A LinkedBlockingQueue and a CyclicBarrier with a Runnable provided. This Runnable will be executed as soon as all Threads have called barrier.await() indicating the work for one update is done. So then the not updated documents can be removed. Also we init an Executor and start the Threads which will do the update.

Next we want to do a complete update. The way I provide information to the Threads is via SQL-Strings so each Thread executes the query and iterates the ResultSet for updating the MongoDB documents. Here I’m “creating” 47 SQL-Queries. In our production code this is the point where our products are partitionated by language. This way our Products tables can be queried by multiple Threads without overlapping results. Don’t forget to reset the barrier, create an update tag and (not shown here) update all MongoDB documents with the update tag. Now the queries are added into the queue and we also add a special “poison” String per Thread. This String tells the Thread that the update is done when it reaches the end of the queue.

Now all there’s left to do is waiting until the update tag has been resetted. In production code we return from the method as the rest runs async. Of course we use a Lock mechanism to ensure only one update is in progress but in the demo code we just wait. Next is an update on some of the documents which works exactly like described above, only the SQL-Queries change.

The next interesting point is the updating Thread. Here we have a run loop that breaks when interupted by executor.shutdownNow(). First thing the thread does is trying to take on SQL-Query from the queue. As long as the queue is empty this blocks the Thread so we have a clean wait. If there is something in the queue and a Thread has taken the next String, we must compare against the “poison”. If it is the poisen, the thread can stop and tell the barrier so. Else the thread executes the SQL-Query and updates the MongoDB documents and takes next from queue.

That’s it. A clean implementation with helpful classes from the concurrent package. No notify/wait, no Thread.run and total reuse with Threads waiting for work. Basicly just another way of the producer/consumer pattern with a little twist: a barrier for all consumers which when reached executes a Runnable. Easy.

Enjoy and greetings,
Gerhard Balthasar

— UPDATE —

Unfortuatly I overlooked a race-condition when updating the first time into an empty collection. Due to the nature of the find-modify-update way when run with multiple threads, two or more threads can query for the same document and will find none. So both threads will create a document resulting in multiple documents with the same identifier. To prevent this behaviour we need to implement a locking mechanism:

	private ConcurrentMap<String, CountDownLatch> locks = new ConcurrentHashMap<String, CountDownLatch>();

	protected boolean lock(String pk)
	{
		// try to insert new latch and receive prevously inserted latch
		CountDownLatch latch = locks.putIfAbsent(pk, new CountDownLatch(1));
		// if there was no latch before, the current thread can pass
		if (latch == null)
		{
			return false;
		}
		// else the thread must wait
		try
		{
			latch.await();
		}
		catch (InterruptedException e)
		{
			// ignore
		}
		return true;
	}

	protected void unlock(String pk)
	{
		CountDownLatch latch = locks.get(pk);
		if (latch != null)
		{
			latch.countDown();
			locks.remove(pk);
		}
	}

It is used this way:

	DBObject next = collection.findOne(product);
	if (next == null)
	{
		// test if another thread already creates a new document
		wasLocked = lock(pk);
		if (wasLocked)
		{
			// if this thread was locked, another thread has created the document thus this thread is now able to find it
			next = collection.findOne(product);
		}
		else
		{
			// first thread to create the document
			created = true;
		}
	}
	// set all fields and save (incl. waiting for save complete across all ReplicaSets with getLastError())
	// ...
	// if this thread was not locked unlock now to release waiting threads for the same document
	if (created && !wasLocked)
	{
		unlock(pk);
	}

With this approach the first Thread passes the lock, every other Thread must wait upon the latch. When the first Thread has saved the document it releases all other waiting Threads that then issue a new find to catch the new document and continue to update it.

And oh… how to test for duplicates? A simple MapReduce plus count in the shell does the job:

    db.assortment.mapReduce(function () { emit(this.pk, 1); }, function (k, vals) { return Array.sum(vals); }, { out: "duplicates" })
    db.duplicates.count({ "value": { $gte: 2 }})

Big thanx to csanz’s posterous!

Greetz,
GHad
— UPDATE —

Written by ghads

April 12, 2011 at 3:01 pm