Enterprise Integration Zone is brought to you in partnership with:

Enthusiastic Java, Scala and Haskell programmer with a long history of large and successful systems. Known author, speaker, motivator and coach. Jan is a DZone MVB and is not an employee of DZone and has posted 26 posts at DZone. You can read more from them at their website. View Full User Profile

Websockets, AngularJS, RabbitMQ and C++ in Spring 4

09.24.2013
| 18036 views |
  • submit to reddit

In this post, I will walk you through a Spring Framework 4-based application that makes the most of WebSockets, RabbitMQ and OpenCV to implement a simple computer vision application. It counts coins that an iOS application sends in a video stream over a binary WebSocket.

The Result

Perhaps strangely, I will begin by showing you the completed result. You are seeing the iOS application talking to the Spring webapp over binary WebSocket; decoding the video, processing each frame in the computer vision code behind RabbitMQ, then receiving the responses and pushing the results to an AngularJS application.

SpringOne2GX from Cake Solutions Ltd. on Vimeo.

arch

Getting Started

Let’s begin with the RabbitMQ / computer vision components. This is a typical Spring AMQP task. At the highest level, we’ll be creating the RecogService andRecogServiceActivator. The RecogService is the entry point into our system. We submit chunks of the video stream (or full frames), drive them though Spring Integration, then Spring AMQP, then RabbitMQ and the C++ application, then back to Spring Integration and finally the response is received in the implementation RecogServiceActivator.

overallcli

Here is how many lines of code we arrive at:

class RecogService(recogChannel: MessageChannel) {

  private def sendWithContentType(contentType: String, 
                                  correlationId: CorrelationId, 
                                  chunk: ChunkData): Unit = {
    val message = MessageBuilder.
      withPayload(chunk).
      setCorrelationId(correlationId).
      setHeader("content-type", contentType).
      build()

    recogChannel.send(message)
  }

  def imageChunk(correlationId: CorrelationId)(chunk: ChunkData) = 
  	sendWithContentType(ContentTypes.`image/*`, correlationId, chunk)

  def mjpegChunk(correlationId: CorrelationId)(chunk: ChunkData) = 
  	sendWithContentType(ContentTypes.`video/mjpeg`, correlationId, chunk)

}

Spring Integration

I have already given away part of the core of the unicorn. It depends on the MessageChannel, which it uses to send the chunks of the input data. The channel sends the messages down a *chain* (illustrated below):

internalflow

In the first step, we decode the chunk, potentially resulting in multiple frames. In Scala code, we turn a single ChunkData into Collection[FrameData].

class ChunkDecoder(mjpegDecoder: MJPEGDecoder) {

  def decodeFrame(@Header correlationId: CorrelationId, 
                  @Header("content-type") contentType: String,
                  @Payload chunk: ChunkData): util.Collection[ImageData] = 
    contentType match {
      case `video/mjpeg` => decodeMJPEGFrames(correlationId, chunk)
      case `image/*`     => decodeSingleImage(correlationId, chunk)
    }

  private def decodeSingleImage(correlationId: CorrelationId, 
                                chunk: ChunkData): util.Collection[ImageData] = 
  	Collections.singletonList(chunk)

  private def decodeMJPEGFrames(correlationId: CorrelationId, 
                                chunk: ChunkData): util.Collection[ImageData] = 
  	mjpegDecoder.decodeFrames(correlationId, chunk)


}


By now, we’ve decoded as many frames as we could given the new chunk of data. However, the following components do not operate on collections of frames, but on a single frame. Therefore, we must split the Message of Collection[ImageData] into as many Messages of ImageData as there are elements in the collection. Spring Integration’s *splitter* splits a Message by default with a Collection[A]-type payload into multiple Messages, each containing an element of the original collection. We then take each decoded frame and pass it on to the AMQP outbound endpoint, remembering to map all outgoing and incoming headers. We also set the maximum timeout.

When we receive a response from the native code at the end of the RabbitMQ queues, we have an Array[Byte] that we needs to be converted to a String. Finally, we execute the onCoinResponse method of the recogServiceActivator bean. We represent the chain in XML as:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xsi:schemaLocation="...">

  <int:chain input-channel="recogRequest">
      <int:service-activator method="decodeFrame" ref="chunkDecoder"/>
      <int:splitter apply-sequence="false"/>
      <int-amqp:outbound-gateway exchange-name="sogx.exchange" 
          routing-key="sogx.recog.key"
          reply-timeout="250"
          mapped-reply-headers="*" mapped-request-headers="*"
          amqp-template="amqpTemplate"/>
      <int:object-to-string-transformer/>
      <int:service-activator ref="recogServiceActivator" 
                                method="onCoinResponse"/>
  </int:chain>

  <rabbit:connection-factory id="connectionFactory" 
    host="localhost" channel-cache-size="10" />
  <rabbit:template id="amqpTemplate"
    connection-factory="connectionFactory" />

  <rabbit:admin id="rabbitAdmin" 
    connection-factory="connectionFactory"/>

  <rabbit:queue name="sogx.recog.queue" declared-by="rabbitAdmin"/>

  <rabbit:direct-exchange name="sogx.exchange" declared-by="rabbitAdmin">
      <rabbit:bindings>
          <rabbit:binding queue="sogx.recog.queue" key="sogx.recog.key" />
      </rabbit:bindings>
  </rabbit:direct-exchange>
</beans>

I have included the minimal RabbitMQ configuration, assuming that the RabbitMQ server runs on localhost and does not require authentication.

NEXT PAGE >> Command-line Application

Published at DZone with permission of Jan Machacek, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)