GHads mind

developers thoughts & finds

Archive for April 2011

Java CyclicBarrier and Queue usage

leave a comment »

UPDATE: See below for an update about a race-condition I didn’t think of in the first place…

For updating MongoDB from OracleDB I needed a way with multiple threads to overcome networklatency to our MongoDB Server Cluster. Doing writes/updates with 4 threads resulted in a 3-4 times better performance than using only one thread. But how to implement such an update behavior in a good way using JDK6 concurrent classes/utils?

Basicly one update should do the following steps:

  1. Determine all documents to update (partial updates also possible)
  2. Flag all those documents with a unique tag (using UUID for example)
  3. Run updates concurrently and for each updated document remove the update tag
  4. When all update threads are done, remove all still tagged documents and wait for the next update

This can be achieved by combining a CyclicBarrier, a Queue and an Executor also allowing Thread/Object reuse as long as the update progress is guarded in such a way that only one update runs at one time (using Lock for example).

Here is a basic example code that shows how the update works:

public class CyclicBarrierQueueTesting
{

	private static BlockingQueue<String> queue;

	private static CyclicBarrier barrier;

	private static String updateTag;

	private static String poison = "+++ stop +++";

	public static void main(String[] args)
	{
		// prepare update thread pool
		int numberOfThreads = 4;
		queue = new LinkedBlockingQueue<String>();
		barrier = new CyclicBarrier(numberOfThreads, new Runnable()
		{
			public void run()
			{
				// delete all not updated with updateTag
				// ...
				log("Update done, remove not updated: updateTag= " + updateTag);
				updateTag = null;
			}
		});
		ExecutorService executor = Executors.newFixedThreadPool(4);
		for (int i = 0; i < numberOfThreads; i++)
		{
			executor.submit(new Runner(i, poison, queue, barrier));
		}

		// import all
		barrier.reset();
		List<String> sqlPerLanguge = Collections.nCopies(47, "language");
		updateTag = UUID.randomUUID().toString();
		log("Update all, flagged all with updateTag: updateTag= " + updateTag);
		for (String language : sqlPerLanguge)
		{
			// create a sql query per language
			try
			{
				queue.put(language);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
		// add poison for every thread
		for (int i = 0; i < numberOfThreads; i++)
		{
			try
			{
				queue.put(poison);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}

		// wait for next update (NOT needed in real circumstances)
		while (updateTag != null)
		{
			try
			{
				Thread.sleep(1000);
			}
			catch (InterruptedException e)
			{
			}
		}

		// update some pks
		barrier.reset();
		List<String> somePKs = Collections.nCopies(50, "pk");
		updateTag = UUID.randomUUID().toString();
		log("Update some pks, flagged all products for pks with updateTag: updateTag= " + updateTag);
		for (String pk : somePKs)
		{
			// create a sql query per 10 products
			try
			{
				queue.put(pk);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
		// add poison for every thread
		for (int i = 0; i < numberOfThreads; i++)
		{
			try
			{
				queue.put(poison);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}

		// wait for next update (NOT needed in real circumstances)
		while (updateTag != null)
		{
			try
			{
				Thread.sleep(1000);
			}
			catch (InterruptedException e)
			{
			}
		}

		// shutdown
		log("shutdown");
		executor.shutdownNow();
	}

	public static void log(String str)
	{
		System.out.println(str);
	}

	private static class Runner
			implements Runnable
	{

		private int id;
		private String poison;
		private BlockingQueue<String> queue;
		private CyclicBarrier barrier;

		public Runner(int id, String poison, BlockingQueue<String> queue, CyclicBarrier barrier)
		{
			this.id = id;
			this.poison = poison;
			this.queue = queue;
			this.barrier = barrier;
		}

		public void run()
		{
			boolean run = true;
			while (run)
			{
				try
				{
					try
					{
						String query = queue.take();
						if (query.equals(poison))
						{
							barrier.await();
						}
						else
						{
							log("Thread executes update: id= " + id);
							Thread.sleep(100);
						}
					}
					catch (BrokenBarrierException e)
					{
						e.printStackTrace();
						run = false;
					}
				}
				catch (InterruptedException e)
				{
					run = false;
				}
			}
		}
	}
}

Let’s go top down over the code…

First we need to fix the number of threads and init the objects we want to use: A LinkedBlockingQueue and a CyclicBarrier with a Runnable provided. This Runnable will be executed as soon as all Threads have called barrier.await() indicating the work for one update is done. So then the not updated documents can be removed. Also we init an Executor and start the Threads which will do the update.

Next we want to do a complete update. The way I provide information to the Threads is via SQL-Strings so each Thread executes the query and iterates the ResultSet for updating the MongoDB documents. Here I’m “creating” 47 SQL-Queries. In our production code this is the point where our products are partitionated by language. This way our Products tables can be queried by multiple Threads without overlapping results. Don’t forget to reset the barrier, create an update tag and (not shown here) update all MongoDB documents with the update tag. Now the queries are added into the queue and we also add a special “poison” String per Thread. This String tells the Thread that the update is done when it reaches the end of the queue.

Now all there’s left to do is waiting until the update tag has been resetted. In production code we return from the method as the rest runs async. Of course we use a Lock mechanism to ensure only one update is in progress but in the demo code we just wait. Next is an update on some of the documents which works exactly like described above, only the SQL-Queries change.

The next interesting point is the updating Thread. Here we have a run loop that breaks when interupted by executor.shutdownNow(). First thing the thread does is trying to take on SQL-Query from the queue. As long as the queue is empty this blocks the Thread so we have a clean wait. If there is something in the queue and a Thread has taken the next String, we must compare against the “poison”. If it is the poisen, the thread can stop and tell the barrier so. Else the thread executes the SQL-Query and updates the MongoDB documents and takes next from queue.

That’s it. A clean implementation with helpful classes from the concurrent package. No notify/wait, no Thread.run and total reuse with Threads waiting for work. Basicly just another way of the producer/consumer pattern with a little twist: a barrier for all consumers which when reached executes a Runnable. Easy.

Enjoy and greetings,
Gerhard Balthasar

— UPDATE —

Unfortuatly I overlooked a race-condition when updating the first time into an empty collection. Due to the nature of the find-modify-update way when run with multiple threads, two or more threads can query for the same document and will find none. So both threads will create a document resulting in multiple documents with the same identifier. To prevent this behaviour we need to implement a locking mechanism:

	private ConcurrentMap<String, CountDownLatch> locks = new ConcurrentHashMap<String, CountDownLatch>();

	protected boolean lock(String pk)
	{
		// try to insert new latch and receive prevously inserted latch
		CountDownLatch latch = locks.putIfAbsent(pk, new CountDownLatch(1));
		// if there was no latch before, the current thread can pass
		if (latch == null)
		{
			return false;
		}
		// else the thread must wait
		try
		{
			latch.await();
		}
		catch (InterruptedException e)
		{
			// ignore
		}
		return true;
	}

	protected void unlock(String pk)
	{
		CountDownLatch latch = locks.get(pk);
		if (latch != null)
		{
			latch.countDown();
			locks.remove(pk);
		}
	}

It is used this way:

	DBObject next = collection.findOne(product);
	if (next == null)
	{
		// test if another thread already creates a new document
		wasLocked = lock(pk);
		if (wasLocked)
		{
			// if this thread was locked, another thread has created the document thus this thread is now able to find it
			next = collection.findOne(product);
		}
		else
		{
			// first thread to create the document
			created = true;
		}
	}
	// set all fields and save (incl. waiting for save complete across all ReplicaSets with getLastError())
	// ...
	// if this thread was not locked unlock now to release waiting threads for the same document
	if (created && !wasLocked)
	{
		unlock(pk);
	}

With this approach the first Thread passes the lock, every other Thread must wait upon the latch. When the first Thread has saved the document it releases all other waiting Threads that then issue a new find to catch the new document and continue to update it.

And oh… how to test for duplicates? A simple MapReduce plus count in the shell does the job:

    db.assortment.mapReduce(function () { emit(this.pk, 1); }, function (k, vals) { return Array.sum(vals); }, { out: "duplicates" })
    db.duplicates.count({ "value": { $gte: 2 }})

Big thanx to csanz’s posterous!

Greetz,
GHad
— UPDATE —

Written by ghads

April 12, 2011 at 3:01 pm

MongoDB and Java enums

leave a comment »

Well, there is no way in MongoDB/BSON to de-/serialize Java enums. At least I found none but to implement the Interface DBObject. Having no default implementation I started experimenting with implementing the Interface directly at the enum which would be a massive overhead when using multiple enums. So I decided to create a simple Wrapper like DBRef which does the work for you:

/**
 *
 * Use for put into document: document.put(key, DBEmum.of(SomeEnum.VALUE));
 * Use for get from document: DBEnum.to(SomeEnum.class, document.get(key));
 *
 */
public class DBEnum<T extends Enum<T>>
		implements DBObject
{

	private static final String KEY = "_enum";

	public static <U extends Enum<U>> DBEnum<U> of(U value)
	{
		return new DBEnum<U>(value);
	}

	public static <U extends Enum<U>> U to(Class<U> c, Object o)
	{
		if (o instanceof DBObject)
		{
			Object value = ((DBObject) o).get(KEY);
			if (value == null)
			{
				return null;
			}
			return Enum.valueOf(c, value.toString());
		}
		return null;
	}

	private final T value;

	private DBEnum(T value)
	{
		this.value = value;
	}

	public void markAsPartialObject() { }

	public boolean isPartialObject() { return false; }

	public Object put(String s, Object o) { return null; }

	public void putAll(BSONObject bsonObject) { }

	public void putAll(Map map) { }

	public Object get(String s)
	{
		return value.name();
	}

	public Map toMap()
	{
		return Collections.singletonMap(KEY, value.name());
	}

	public Object removeField(String s) { return null; }

	public boolean containsKey(String s)
	{
		return KEY.equals(s);
	}

	public boolean containsField(String s)
	{
		return KEY.equals(s);
	}

	public Set<String> keySet()
	{
		return Collections.singleton(KEY);
	}

	public String toString()
	{
		return "{ \"" + KEY + "\" : \"" + value.name() + "\" }";
	}
}

Using it this way (example main)…

		Mongo mongo = new Mongo();
		DB db = mongo.getDB("test");
		DBCollection collection = db.getCollection("enum");

		DBObject next = new BasicDBObject("key", DBEnum.of(SomeEnum.VALUE_1));
		collection.insert(next);

		next = collection.findOne(new BasicDBObject("key", DBEnum.of(SomeEnum.VALUE_1)));
		System.out.println(next);

		SomeEnum enumValue = DBEnum.to(SomeEnum.class, next.get("key"));
		System.out.println(enumValue);

… prints the expected results and creates the document:

{
	"_id" : ObjectId("4da421708db0535ebbd175c3"),
	"key" : {
		"_enum" : "VALUE_1"
	}
}

Only real downer is the type must be used explictly. And maybe someone wants to implement this directly into the MongoDB Java driver (and others) so the key could turn to “$enum” and become ‘official’…

Greetings,
GHad

Written by ghads

April 12, 2011 at 2:15 pm