GHads mind

developers thoughts & finds

Posts Tagged ‘MongoDB

Java CyclicBarrier and Queue usage

leave a comment »

UPDATE: See below for an update about a race-condition I didn’t think of in the first place…

For updating MongoDB from OracleDB I needed a way with multiple threads to overcome networklatency to our MongoDB Server Cluster. Doing writes/updates with 4 threads resulted in a 3-4 times better performance than using only one thread. But how to implement such an update behavior in a good way using JDK6 concurrent classes/utils?

Basicly one update should do the following steps:

  1. Determine all documents to update (partial updates also possible)
  2. Flag all those documents with a unique tag (using UUID for example)
  3. Run updates concurrently and for each updated document remove the update tag
  4. When all update threads are done, remove all still tagged documents and wait for the next update

This can be achieved by combining a CyclicBarrier, a Queue and an Executor also allowing Thread/Object reuse as long as the update progress is guarded in such a way that only one update runs at one time (using Lock for example).

Here is a basic example code that shows how the update works:

public class CyclicBarrierQueueTesting
{

	private static BlockingQueue<String> queue;

	private static CyclicBarrier barrier;

	private static String updateTag;

	private static String poison = "+++ stop +++";

	public static void main(String[] args)
	{
		// prepare update thread pool
		int numberOfThreads = 4;
		queue = new LinkedBlockingQueue<String>();
		barrier = new CyclicBarrier(numberOfThreads, new Runnable()
		{
			public void run()
			{
				// delete all not updated with updateTag
				// ...
				log("Update done, remove not updated: updateTag= " + updateTag);
				updateTag = null;
			}
		});
		ExecutorService executor = Executors.newFixedThreadPool(4);
		for (int i = 0; i < numberOfThreads; i++)
		{
			executor.submit(new Runner(i, poison, queue, barrier));
		}

		// import all
		barrier.reset();
		List<String> sqlPerLanguge = Collections.nCopies(47, "language");
		updateTag = UUID.randomUUID().toString();
		log("Update all, flagged all with updateTag: updateTag= " + updateTag);
		for (String language : sqlPerLanguge)
		{
			// create a sql query per language
			try
			{
				queue.put(language);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
		// add poison for every thread
		for (int i = 0; i < numberOfThreads; i++)
		{
			try
			{
				queue.put(poison);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}

		// wait for next update (NOT needed in real circumstances)
		while (updateTag != null)
		{
			try
			{
				Thread.sleep(1000);
			}
			catch (InterruptedException e)
			{
			}
		}

		// update some pks
		barrier.reset();
		List<String> somePKs = Collections.nCopies(50, "pk");
		updateTag = UUID.randomUUID().toString();
		log("Update some pks, flagged all products for pks with updateTag: updateTag= " + updateTag);
		for (String pk : somePKs)
		{
			// create a sql query per 10 products
			try
			{
				queue.put(pk);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}
		// add poison for every thread
		for (int i = 0; i < numberOfThreads; i++)
		{
			try
			{
				queue.put(poison);
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
		}

		// wait for next update (NOT needed in real circumstances)
		while (updateTag != null)
		{
			try
			{
				Thread.sleep(1000);
			}
			catch (InterruptedException e)
			{
			}
		}

		// shutdown
		log("shutdown");
		executor.shutdownNow();
	}

	public static void log(String str)
	{
		System.out.println(str);
	}

	private static class Runner
			implements Runnable
	{

		private int id;
		private String poison;
		private BlockingQueue<String> queue;
		private CyclicBarrier barrier;

		public Runner(int id, String poison, BlockingQueue<String> queue, CyclicBarrier barrier)
		{
			this.id = id;
			this.poison = poison;
			this.queue = queue;
			this.barrier = barrier;
		}

		public void run()
		{
			boolean run = true;
			while (run)
			{
				try
				{
					try
					{
						String query = queue.take();
						if (query.equals(poison))
						{
							barrier.await();
						}
						else
						{
							log("Thread executes update: id= " + id);
							Thread.sleep(100);
						}
					}
					catch (BrokenBarrierException e)
					{
						e.printStackTrace();
						run = false;
					}
				}
				catch (InterruptedException e)
				{
					run = false;
				}
			}
		}
	}
}

Let’s go top down over the code…

First we need to fix the number of threads and init the objects we want to use: A LinkedBlockingQueue and a CyclicBarrier with a Runnable provided. This Runnable will be executed as soon as all Threads have called barrier.await() indicating the work for one update is done. So then the not updated documents can be removed. Also we init an Executor and start the Threads which will do the update.

Next we want to do a complete update. The way I provide information to the Threads is via SQL-Strings so each Thread executes the query and iterates the ResultSet for updating the MongoDB documents. Here I’m “creating” 47 SQL-Queries. In our production code this is the point where our products are partitionated by language. This way our Products tables can be queried by multiple Threads without overlapping results. Don’t forget to reset the barrier, create an update tag and (not shown here) update all MongoDB documents with the update tag. Now the queries are added into the queue and we also add a special “poison” String per Thread. This String tells the Thread that the update is done when it reaches the end of the queue.

Now all there’s left to do is waiting until the update tag has been resetted. In production code we return from the method as the rest runs async. Of course we use a Lock mechanism to ensure only one update is in progress but in the demo code we just wait. Next is an update on some of the documents which works exactly like described above, only the SQL-Queries change.

The next interesting point is the updating Thread. Here we have a run loop that breaks when interupted by executor.shutdownNow(). First thing the thread does is trying to take on SQL-Query from the queue. As long as the queue is empty this blocks the Thread so we have a clean wait. If there is something in the queue and a Thread has taken the next String, we must compare against the “poison”. If it is the poisen, the thread can stop and tell the barrier so. Else the thread executes the SQL-Query and updates the MongoDB documents and takes next from queue.

That’s it. A clean implementation with helpful classes from the concurrent package. No notify/wait, no Thread.run and total reuse with Threads waiting for work. Basicly just another way of the producer/consumer pattern with a little twist: a barrier for all consumers which when reached executes a Runnable. Easy.

Enjoy and greetings,
Gerhard Balthasar

— UPDATE —

Unfortuatly I overlooked a race-condition when updating the first time into an empty collection. Due to the nature of the find-modify-update way when run with multiple threads, two or more threads can query for the same document and will find none. So both threads will create a document resulting in multiple documents with the same identifier. To prevent this behaviour we need to implement a locking mechanism:

	private ConcurrentMap<String, CountDownLatch> locks = new ConcurrentHashMap<String, CountDownLatch>();

	protected boolean lock(String pk)
	{
		// try to insert new latch and receive prevously inserted latch
		CountDownLatch latch = locks.putIfAbsent(pk, new CountDownLatch(1));
		// if there was no latch before, the current thread can pass
		if (latch == null)
		{
			return false;
		}
		// else the thread must wait
		try
		{
			latch.await();
		}
		catch (InterruptedException e)
		{
			// ignore
		}
		return true;
	}

	protected void unlock(String pk)
	{
		CountDownLatch latch = locks.get(pk);
		if (latch != null)
		{
			latch.countDown();
			locks.remove(pk);
		}
	}

It is used this way:

	DBObject next = collection.findOne(product);
	if (next == null)
	{
		// test if another thread already creates a new document
		wasLocked = lock(pk);
		if (wasLocked)
		{
			// if this thread was locked, another thread has created the document thus this thread is now able to find it
			next = collection.findOne(product);
		}
		else
		{
			// first thread to create the document
			created = true;
		}
	}
	// set all fields and save (incl. waiting for save complete across all ReplicaSets with getLastError())
	// ...
	// if this thread was not locked unlock now to release waiting threads for the same document
	if (created && !wasLocked)
	{
		unlock(pk);
	}

With this approach the first Thread passes the lock, every other Thread must wait upon the latch. When the first Thread has saved the document it releases all other waiting Threads that then issue a new find to catch the new document and continue to update it.

And oh… how to test for duplicates? A simple MapReduce plus count in the shell does the job:

    db.assortment.mapReduce(function () { emit(this.pk, 1); }, function (k, vals) { return Array.sum(vals); }, { out: "duplicates" })
    db.duplicates.count({ "value": { $gte: 2 }})

Big thanx to csanz’s posterous!

Greetz,
GHad
— UPDATE —

Written by ghads

April 12, 2011 at 3:01 pm

MongoDB and Java enums

leave a comment »

Well, there is no way in MongoDB/BSON to de-/serialize Java enums. At least I found none but to implement the Interface DBObject. Having no default implementation I started experimenting with implementing the Interface directly at the enum which would be a massive overhead when using multiple enums. So I decided to create a simple Wrapper like DBRef which does the work for you:

/**
 *
 * Use for put into document: document.put(key, DBEmum.of(SomeEnum.VALUE));
 * Use for get from document: DBEnum.to(SomeEnum.class, document.get(key));
 *
 */
public class DBEnum<T extends Enum<T>>
		implements DBObject
{

	private static final String KEY = "_enum";

	public static <U extends Enum<U>> DBEnum<U> of(U value)
	{
		return new DBEnum<U>(value);
	}

	public static <U extends Enum<U>> U to(Class<U> c, Object o)
	{
		if (o instanceof DBObject)
		{
			Object value = ((DBObject) o).get(KEY);
			if (value == null)
			{
				return null;
			}
			return Enum.valueOf(c, value.toString());
		}
		return null;
	}

	private final T value;

	private DBEnum(T value)
	{
		this.value = value;
	}

	public void markAsPartialObject() { }

	public boolean isPartialObject() { return false; }

	public Object put(String s, Object o) { return null; }

	public void putAll(BSONObject bsonObject) { }

	public void putAll(Map map) { }

	public Object get(String s)
	{
		return value.name();
	}

	public Map toMap()
	{
		return Collections.singletonMap(KEY, value.name());
	}

	public Object removeField(String s) { return null; }

	public boolean containsKey(String s)
	{
		return KEY.equals(s);
	}

	public boolean containsField(String s)
	{
		return KEY.equals(s);
	}

	public Set<String> keySet()
	{
		return Collections.singleton(KEY);
	}

	public String toString()
	{
		return "{ \"" + KEY + "\" : \"" + value.name() + "\" }";
	}
}

Using it this way (example main)…

		Mongo mongo = new Mongo();
		DB db = mongo.getDB("test");
		DBCollection collection = db.getCollection("enum");

		DBObject next = new BasicDBObject("key", DBEnum.of(SomeEnum.VALUE_1));
		collection.insert(next);

		next = collection.findOne(new BasicDBObject("key", DBEnum.of(SomeEnum.VALUE_1)));
		System.out.println(next);

		SomeEnum enumValue = DBEnum.to(SomeEnum.class, next.get("key"));
		System.out.println(enumValue);

… prints the expected results and creates the document:

{
	"_id" : ObjectId("4da421708db0535ebbd175c3"),
	"key" : {
		"_enum" : "VALUE_1"
	}
}

Only real downer is the type must be used explictly. And maybe someone wants to implement this directly into the MongoDB Java driver (and others) so the key could turn to “$enum” and become ‘official’…

Greetings,
GHad

Written by ghads

April 12, 2011 at 2:15 pm

Testing MapReduce (MongoDB) in Java

leave a comment »

Yesterday a co-worker of mine thought about using MapReduce in MongoDB for calculating visibility of products for our company shop. Not convinced that an operation that basicly just groups is the right tool for the job I wanted a way to test MapReduce operations without the hassle to setup a MongoDB with example data for each co-worker and using Javascript from our Java environment (think of debugging…). So I thought about providing a tiny implementation in Java that just behaves like MapReduce in MongoDB (no paralization). This way all of my co-workers can play with their operations and check the practicability of their solution.

So today I quickly coded the following class, which imho also helps to explain the way MapReduce works to newbies:

public abstract class MapReduce<In, Out> {

	public static <Input, Output> Map<Object, Output> execute(MapReduce<Input, Output> operation, Collection<Input> input) {
		if (operation == null || input == null || input.isEmpty()) throw new IllegalArgumentException();

		// map input to emit values
		for (Input in : input) {
			operation.map(in);
		}

		// reduce emitted values
		Map<Object, Output> result = new HashMap<Object, Output>();
		for (Map.Entry<Object, Collection<Output>> entry : operation.emits.entrySet()) {
			Object key = entry.getKey();
			Output reduced = operation.reduce(key, entry.getValue());
			result.put(key, reduced);
		}
		return result;
	}

	private Map<Object, Collection<Out>> emits = new HashMap<Object, Collection<Out>>();

	protected void emit(Object key, Out value) {
		Collection<Out> forKey = emits.get(key);
		if (forKey == null) {
			forKey = new ArrayList<Out>();
			emits.put(key, forKey);
		}
		forKey.add(value);
	}

	public abstract void map(In input);

	public abstract Out reduce(Object key, Collection<Out> emits);

}

So we have an abstract class that defines a “map” and a “reduce” method which must be implemented by the operation. We have an internal method “emit” which must be called from the map method (map phase) to add an entry for the reduce phase per key/unique value (grouping). And we have a static method “execute” to call the abstract methods for a collection of input elements, first for each element the map-method then for all emited values the reduce-method per key.

To test your MapReduce operation the implementation is easy:

public class MapReduce_Test {

	public static class Customer {
		public String name;
	}

	public static class GroupByName extends MapReduce<Customer, Integer> {

		@Override
		public void map(Customer input) {
			emit(input.name, 1);
		}

		@Override
		public Integer reduce(Object key, Collection<Integer> emits) {
			int count = 0;
			for (Integer i : emits) {
				count = count + i;
			}
			return count;
		}
	}

	public static void main(String[] args) {
		// create some customers
		List<Customer> input = new ArrayList<Customer>();
		for (int i = 0; i < 1000; i++) {
			Customer in = new Customer();
			in.name = "name" + (i % 6);
			input.add(in);
		}

		// execute MapReduce operation
		Map<Object, Integer> result = MapReduce.execute(new GroupByName(), input);

		// show result
		System.out.println(result);
	}

}

Here we’re just group by customers name and count the number of occurencies for each name. The output of this Test is: {name1=167, name5=166, name2=167, name3=167, name4=166, name0=167}

So for my co-workers it’s easier to test their operations for MapReduce and if the operation gets the desired output for the given input, we can now transform the map/reduce methods from Java to JavaScript saving JavaScript debug time and headscratching woes 😉

Greetz,
GHad

Written by ghads

February 17, 2011 at 11:17 am