Oct 24, 2011

Apache Hama realtime processing

Hi there,

today is about realtime processing with Apache Hama.
One day, Edward told me about a guy, who told him, that he uses Hama for realtime processing.

At first this is quite a strange/new thing, because inherently BSP is used (just like MapReduce) for batch processing. It has several advantages over MapReduce, especially in graph and mathematical use cases.

I think this new "feature" is the greatest advantage over MapReduce.
Let me clarify a bit how it works.

At first you will have some tasks which are going to be our so called event collectors. In my example this will be a single master task.
Anyways, the trick is now that the event collectors are waiting for new events to come, or even poll for new events that happened, they do it in a while loop. Something which is possible in MapReduce though.

Now you can built a producer/consumer pattern on top of this. Which just says, your event collectors are messaging computation tasks to do some computation on the data we have just sent. This will allow you to do more complex stream analysis in near-realtime.
We will see this in an example a bit later.

Why is this better than a MapReduce job?
If you run a MapReduce job, you can straight poll for data available inside a while loop, too. But without a messaging system between the tasks you are forced to write your data into HDFS to make it available for a broader amount of tasks to parallelize your workload.
Since Hadoop has lots of job scheduling and setup overhead, this is not realtime anymore. That is now degenerating to batch processing.
For those of you who are familiar with Giraph, it is similar to that MapReduce Job, where tasks messaging with other MapReduce tasks. Sadly they just focused on graph computing and are strongly dependend on input from filesytem.

Example: Realtime Twitter Message Processing 
Yes, we can analyse Twitter data streams in BSP in realtime!
What do we need?
  • Twitter Library, in this case Twitter4J
  • Latest Hama, in this case this is a 0.4.0 snapshot. You can use 3.0 as well, with minor code changes.
Let's dive directly into it and look how to setup the job.

HamaConfiguration conf = new HamaConfiguration();
    // set the user we want to analyse
    conf.set("twitter.user.name", "tjungblut");
    // I'm always testing in localmode so I use 2 tasks.
    conf.set("bsp.local.tasks.maximum", "2");
    BSPJob bsp = new BSPJob(conf);
    bsp.setJobName("Twitter stream processing");

I think this is pretty standard, the trick is here to set the desired username of the guy who you want to analyse.
In my case this is my twitter nick "tjungblut".

I omit the setup method and the fields now, if you have questions on what I've done there, feel free to comment on this post.

The real (time) processing
Let's step directly to the processing and the mimic of the producer/consumer pattern.
The idea is simple: A master task is polling for new "Tweets" and is sending this directly to our computation tasks (fieldname is otherPeers, which contains all tasks but the master task).
This happens while our computation tasks are waiting for new "food" to arrive.
Once our computation tasks get a message, they can directly start with their computation.

Let's see how the master tasks is doing the job:

  public void bsp(BSPPeer bspPeer) throws IOException, KeeperException,
      InterruptedException {

    if (isMaster) {
      while (true) {
          // this should get us the least 20 tweets of this user
          List<Status> statuses = twitter.getUserTimeline(userName);
          for (Status s : statuses) {
            // deduplicate
            if (alreadyProcessedStatusses.add(s)) {
              System.out.println("Got new status from: "
                  + s.getUser().getName() + " with message " + s.getText());
              // we distribute messages to the other peers for
              // processing via user id partitioning
              // so a task gets all messages for a user
                  otherPeers[(int) (s.getUser().getId() % otherPeers.length)],
                  new LongMessage(s.getUser().getId(), s.getText()));
          // sync before we get new statusses again.
 ... // computation task stuff
Note: I've ommitted a lot of details (try/catchs) and pre-mature optimizations which can be found in the code.

As you can see the event collector (aka master task) is polling the twitter API to get the newest tweets of a given user.
Now the master is sending the new messages to our computation task.
Note that there is a simple trick to distribute the work equally to the tasks. In our case we have just a single user we are listening on, and two tasks. So this won't do anything but sending this directly to another task.
You can change this behaviour by either listening to the public timeline or changing the distribution of the message by using the message id instead of the user id. I hope you get the gist ;)

In short: We are listening to a specific user and therefore every message goes from the collector directly to the computation task. In our case we have only 2 tasks, so increasing the tasks will just cause one task to be idle the whole time.

Let's have a look at the slave task (aka computation task).

This is very simple:
// we are not the master task... so lets do this:
} else {
      while (true) {
        // wait for some work...
        LongMessage message = null;
        while ((message = (LongMessage) bspPeer.getCurrentMessage()) != null) {
          System.out.println("Got work in form of text: " + message.getData()
              + " for the userid: " + message.getTag().longValue());

As you can see, this is a pretty simple consumer.
You could now add some logic to it. For example to track the communication between a person and others: How often, how much and what content.

In my case, this looks like this:

Console output of the realtime processing job

Note that it directly came up after it has been send.
Now, this is a real cool thing!

If you would have unlimited access to the public timeline (sadly this is capped by 150 requests/h) and you have enough computational power in your cluster, you can do your own trending topics!

Twitters worldwide trending topics

Of course you can do everything else you want to.

I hope this has been quite "illuminating" for you and shows you how to enable realtime processing if you have Hama.

Of course you can checkout my sourcecode my github. The class we just talked about is available here:


Have fun and good luck!

Apache Hama upcoming features

Hi all,

for me it is a pleasure to bring you a couple new things and announcements in this blog post.

Apache Hama 4.0 is on its way, and I want to introduce several pieces of fancyness before we dive into the realtime processing (will be the follow up blog post).
  1. Revised BSP execution flow
  2. Multiple Tasks per groom
  3. YARN integration 
Revised BSP execution flow

The first point is a very good improvement. Writing BSP is totally convenient now.
Let's take a look at the implementation of a BSP in Hama 3.0:

class OldBSP extends BSP{

  public void bsp(BSPPeerProtocol arg0) throws IOException, KeeperException,
      InterruptedException {
    // TODO Auto-generated method stub
  public void setConf(Configuration conf) {
    // TODO Auto-generated method stub
  public Configuration getConf() {
    // TODO Auto-generated method stub
    return null;

You see it in my eclipse generated subclass. You have to override a plenty of methods.
Two of them (if you are not familiar with Hadoop) seem to be very strange. What is that configuration? And why do I need to set this in my code?

Well, this is now history. We have revised the design and now shipping with default implementations of every method in the BSP class.

Additionally we have added a setup and a cleanup method. Setup is now called before the computation starts, cleanup after your computation has been done.

Let's see:

public class NewBSP extends BSP{

  public void setup(BSPPeer peer) throws IOException, KeeperException,
      InterruptedException {
  public void bsp(BSPPeer peer) throws IOException, KeeperException,
      InterruptedException {

  public void cleanup(BSPPeer peer) {


It is a lot more intuitive isn't it? Now YOU can control the methods you need to override. And it is fully transparent when the methods are called.

And the best side-effect is that you can send messages and trigger a barrier sync while in setup!
This enables you now to send initial messages to other tasks and distributed information which hasn't been set in the configuration.
BTW: Your jobs configuration can now be obtained via peer.getConfiguration().

Multiple Tasks per groom

Yeah, we made the step to multitasking. In Hama 3.0 we only had a single task inside the groom.
This didn't really utilize the machines, because while executing a single BSP, other cores might be unused.
Like in Hadoop this is now configurable per host. So you can set the number of tasks which should be executed on a machine.

YARN integration

If you don't know what YARN actually is, let me clarify a bit. YARN stands for Yet Another Resource Negotiator.
This is Hadoops new architecture. If you want to learn more, have a look at Aruns slides here.

If you now know what the new Hadoop is, I'm proud to tell you that Apache Hama will be a part of it.
We implemented our own application to fit with the new YARN module and bring Hama to your Hadoop 23.0 cluster.
No more setup and configuration of additional daemons!

We managed to get a first BSP (Serialize Printing) running on a YARN cluster.

Serialize Printing on YARN
That is soo great!

We are still in development, so please follow HAMA-431 if you are interested.

Thanks for your attention, and please follow us on the mailing list! We are happy to answer your questions if you have one.