Enterprise Integration Zone is brought to you in partnership with:

Enthusiastic Java, Scala and Haskell programmer with a long history of large and successful systems. Known author, speaker, motivator and coach. Jan is a DZone MVB and is not an employee of DZone and has posted 26 posts at DZone. You can read more from them at their website. View Full User Profile

Akka Client, C++ Server Through RabbitMQ

01.31.2013
| 4197 views |
  • submit to reddit

Over the next few weeks, I will tell you about all the discoveries I made in the project storm that made me skip blogging for three weeks. And today, we begin with some Akka AMQP client talking to RabbitMQ and C++ on the other end.

The motivation

I have some image processing code in a static library that I needed to use in my Akka application. First, I grabbed JNI. That turned out to be a mistake, mainly because of my bad C++ code, which loved SIG_SEGV. You can have as much actor supervision as you like, when the JVM is dead, it takes your actors with it. So, out with JNI, and in with AMQP. This allowed me to use treat the C++ application as if it were an actor. Even better, the data I was sending to the C++ code was binary in nature, so AMQP was an excellent fit again.

The Akka code

The Akka code is trivial. You just need to grab the AMQP client (my clone at https://github.com/janm399/amqp-client, build it and use it in your project.

object Main extends Application {
  // boot up Akka
  val actorSystem = ActorSystem()
  // prepare the AMQP connection factory
  val connectionFactory = new ConnectionFactory()
  connectionFactory.setHost("localhost")
  // connect to the AMQP exchange
  val amqpExchange = ExchangeParameters(name = "amq.direct", 
                                        exchangeType = "", passive = true)

  // create a "connection owner" actor, which will try and 
  // reconnect automatically if the connection ins lost
  val connection = actorSystem.actorOf(
                     Props(new ConnectionOwner(connectionFactory)))
  // make a RPC client
  val client = ConnectionOwner.createChildActor(
                     connection, Props(new RpcClient()))

  // mechanics
  implicit val timeout = Timeout(1000, TimeUnit.MILLISECONDS)

  (client ? Request(Publish(...) :: Nil))) onComplete {
    case response => ...
  }
}

All that we need to fill in is the message that we are sending and what we’re doing with the response. I shall leave the second to your imagination, but let’s for now focus on the message.

We will be sending a direct message–we wish to establish point-to-point-to-point communication. (When you ask for a response, your client will get its own private queue, where the server will place the response.) We will use RabbitMQ’s default direct exchange, amq.direct. In addition to the exchange, you need the routing key, so that RabbitMQ knows which queue it needs to place the request to. But what value do we use?

Let’s create the queue and give call it cppdemo. Then we need to bind the queue to some exchange and routing key. In our case, the message will reach the cppdemo queue when sent to the amq.direct exchange with the cppdemo.basic routing key.

Back to our Scala code, then

...
(client ? Request(Publish("amq.direct", "cppdemo.basic", ...) :: Nil)))
  onComplete {
    case response => ...
  }
...

The only thing that remains are the bytes that make up the message…

C++

And down the rabbit hole we go. Before we jump into the C++ code, and before we complete the Akka code, we need to do some setup.

Tooling

We shall use Boost and the RabbitMQ C and C++ clients at https://github.com/alanxz/rabbitmq-c and https://github.com/alanxz/SimpleAmqpClient. You will also need cmake. We will be needing static libraries for both the RabbitMQ clients, so we build rabbitmq-c by

cmake . -DBUILD_STATIC_LIBS=true
cmake --build .
sudo cmake --build . --target install

And follow on with the C++ SimpleAmqpClient

cmake . -DBUILD_SHARED_LIBS=false 
cmake --build .
sudo cmake --build . --target install

The main code

We’re now ready to write our C++ code. We will establish the connection to the RabbitMQ server and set up RCP server. We bind our server to the same queue and listen for a message that conforms to the following structure:

const int32_t message_signature = 0x1000acca;
// sorry, no k in hex!

typedef struct {
  int32_t signature;
  int32_t size1;
  int32_t size2;
  
} message_header_t;

In addition to the messages, we define some error type that is returned from our processing function.

struct ProcessingError: virtual boost::exception { };
typedef 
  boost::error_info<struct errinfo_message_, std::string const> errinfo_message;

Now, onwards to our main code, which I shall leave without the careful dissection. It is very easy and you should be able to follow it.

std::string process(BasicMessage::ptr_t request) {
  const amqp_bytes_t& bytes = request->getAmqpBody();
  if (bytes.len < sizeof(message_header_t)) 
    throw ProcessingError() << errinfo_message("message too small");
  const message_header_t* header = 
    static_cast<message_header_t*>(bytes.bytes);
  if (header->signature != message_signature) 
    throw ProcessingError() << errinfo_message("bad signature");
  
  // we're good.
  size_t totalSize = sizeof(message_header_t) + 
                     header->size1 + header->size2;
  if (bytes.len != totalSize) 
    throw ProcessingError() << errinfo_message("bad message size");

  return "it worked!";
}

int main() {
  try {
    Channel::ptr_t channel = Channel::Create();
    
    channel->BindQueue("cppdemo", "amq.direct", "cppdemo.basic");
    
    std::string tag;
    tag = channel->BasicConsume("cppdemo", "", true, true, false, 2);
    
    while (true) {
      // consume the message
      Envelope::ptr_t env = channel->BasicConsumeMessage(tag);
      BasicMessage::ptr_t request = env->Message();
      try {
        std::string body = process(request);
        BasicMessage::ptr_t response = BasicMessage::Create();
        channel->BasicPublish("amq.direct", request->ReplyTo(), body);
      } catch (ProcessingError &e) {
        const std::string* msg = 
          boost::get_error_info<errinfo_message>(e);
        std::cerr << (*msg) << std::endl;
      }      
    }
  } catch (std::runtime_error &e) {
    std::cout << "Error " << e.what() << std::endl;
  }
  
}

n short, we wait for messages; when one arrives, we check that its structure is OK and then we do some processing and send back a response. The brilliant thing is that we can start as many separate processes of this program as we like; if one dies (perhaps with my faviourite SIG_SEGV, no messages are lost and the other programs are still OK.) Simples!

Back to Scala

Back in the comfortable Scala world, we just have to construct the message correctly. Remember: we have the signature 0x1000acca, followed by two sizes and then bytes that add up to the both sizes. And so, we will do just that:

val os = new ByteArrayOutputStream()
os.write(0x1000face)
os.write(0x1)
os.write(0x1)

os.write(0xa)
os.write(0xb)
val request = Publish("amq.direct", "cppdemo.basic", os.toByteArray)
(client ? Request(request :: Nil)) onComplete {
  case Success(r: Response) => println("*** " + r.deliveries.head.body)
}

As I said before, I will leave how you process the response to you. You may wish to extract the body from the deliveries, or you may want to do some other funky processing–over to you!

Summary

So, the good news is that having to use native code in your JVM-based applications does not have to immediately mean JNI; and Akka is particularly good fit for messaging infrastructures. Drop in the necessary AMQP client code, build the C++ code and you’re all up and running.

Published at DZone with permission of Jan Machacek, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)