HTML5 Zone is brought to you in partnership with:

Jean-Francois Arcand works for Ning.com. Previously he has worked for Sun Microsystems where he created Grizzly (NIO Framework) , Atmosphere and was a significant contributor to the GlassFish Application Server. Jean-Francois is a DZone MVB and is not an employee of DZone and has posted 23 posts at DZone. You can read more from them at their website. View Full User Profile

Real Time Twitter Search via Websocket or Comet

10.17.2010
| 6615 views |
  • submit to reddit

Currently Twitter supports a streaming API described as follows:

"The Twitter Streaming API allows high-throughput near-realtime access to various subsets of public and protected Twitter data."

Unfortunately, no such API is available when it is time to execute real time search. Thanks to the freshly released Atmosphere Framework 0.6.2, let's create such an API in less than 25 lines. As an extra, let's use the Atmosphere JQuery Plugin to select the best transport for communicating with this API: Websocket, http-streaming or long-polling.

The Server Side

What we need to accomplish:

  1. Provide a URI that can be used to send search requests (hashtag)
  2. Based on a keyword/hashtag, connect and poll the Twitter search API every second (or more) to collect the JSON result. Since we want to support thousand of clients, let's make sure never block waiting for the result, and instead be fully asynchronous.
  3. Broadcast/Push back the JSON object to our set of suspended responses. By suspended here I means a connection which use long-polling, http-streaming or Websocket. As you might be aware, Atmosphere hides such details to your application so you don't have to focus on how the transport works, but on your application itself.
  4. Provide a URI for stopping the real time search.

Now let's build it. If you can't wait, you can read the entire code here and here. For our application, let's use the atmosphere-annotations and atmosphere-jersey modules:

@Path("/search/{tagid}")
@Singleton
public class TwitterFeed {

    private final AsyncHttpClient asyncClient = new AsyncHttpClient();
    private final ConcurrentHashMap<String, Future<?>> futures
                 = new ConcurrentHashMap<String, Future<?>>();

    @GET
    public SuspendResponse<String>
             search(final @PathParam("tagid") Broadcaster feed
                    final @PathParam("tagid") String tagid) {

We want our class to be invoked when the request takes the form of /search/{hashtag} and the hashtag can be anything you want to search on. Next we define our first method by asking Jersey to inject a Broadcaster and the hashtag (tagid) from the URL. For those of you are aren't familiar with Atmosphere concept, a Broadcaster is an object that can be used to broadcast/push back events to the browser. A Broadcaster contains the list of connections that has been suspended, independently of the transport used: long-polling, streaming or Websocket. Hence a Broadcaster can be used to broadcast realtime events. For our current application, we will create one Broadcaster per hashtag. The next step is to configure our Broadcaster to poll the Twitter Search API by simply doing:

if (feed.getAtmosphereResources().size() == 0) {
   Future<?> future = feed.scheduleFixedBroadcast(new Callable<String>() {

           private final AtomicReference<String> refreshUrl = new AtomicReference<String>("");

           public String call() throws Exception {
              String query = null;
              if (!refreshUrl.get().isEmpty()) {
                  query = refreshUrl.get();
              } else {
                  query = "?q=" + tagid;
              }
              asyncClient.prepareGet(
                  "http://search.twitter.com/search.json"  + query)
                       .execute(new AsyncCompletionHandler <Integer>()) {

                          @Override
                          public Object onCompleted(Response response) throws Exception {
                            String s = response.getResponseBody();
                            JSONObject json = new JSONObject(s);
                            refreshUrl.set(json.getString("refresh_url"));
                            feed.broadcast(s).get();
                            return response.getStatusCode();
                          }
                    });
                    return "OK";
                }
            }, 1, TimeUnit.SECONDS);

            futures.put(tagid, future);
        }

First, we query our Broadcaster (feed) to see if there is already a connection who asked for the real time search. If none, then we invoke Broadcaster.scheduleFixedBroadcast(..) with a Callable. That Callable will be executed every second. Inside the callable we use my other active open source project AsynHttpClient, which allows the Callable to be executed asynchronously, e.g we send the request but we don't block waiting for the response.

The AsyncHttpClient will take care of calling back the AsyncCompletionHandler once the Twitter API has sent us the entire response. We could have streamed the response, but to make the sample simple we just use an AsyncCompletionHandler that buffer the entire JSON response.

From the JSON object we get the refresh_url value which we will use next time we query the Twitter Search API in order to receive only the new results instead of the entire set. Next we just need to tell Atmosphere to suspend the connection and use this Broadcaster:

return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(feed)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();

Finally we just broadcast the result as it is, so our client can use JSON to read the response. We do store the Future returned so later we can stop the real time Broadcast:

    @GET
    @Path("stop")
    public String stopSearch(
           final @PathParam("tagid") Broadcaster feed,
           final @PathParam("tagid") String tagid) {                 

            // Resume all connections associated with an hashtag
            feed.resumeAll();
            futures.get(tagid).cancel(true);
            return "DONE";
        }
    }

To stop a real time search, we just issue /search/{#hashtag}/stop. That's all we need to do on the server side. Our server side application is now able to receive Websocket, long-polling or http-streaming requests.

The client side

On the client side, you can use any existing Javascript library supporting Websocket or Comet to query the real time API. It is as simple as

/search/#hashtag                for subscribing to real time update (try it with WebSocket!)

/search/#hashtag/stop    to stop the real time update

But the easiest way is to use the Atmosphere's JQuery Plugin (of course :-)), which supports Websocket and Comet. More important, the Plugin is able to detect the best transport to use based on what the client and the server supports. As an example, if the application is deployed in Tomcat and you use Chrome, you can let the Plugin find the best transport or specify the one you want to use it.  This is as simple as:

   $.atmosphere.subscribe(document.location.toString() + 'search/' + hashtag              
              callback,
              $.atmosphere.request = {transport: 'Websocket'});

What we do above is to invoke the subscribe method by passing the URL containing the hashtag, a callback and some request properties. The Plugin will invoke the Callback as soon as the real time search starts on the server. The callback looks like:

function callback(response) {
   if (response.transport != 'polling' &&
           response.state != 'connected' &&
           response.state != 'closed') {

           if (response.status == 200) {
               var data = response.responseBody;

               try {
                    var result =  $.parseJSON(incompleteMessage + data);  
                    incompleteMessage = "";

                     var i = 0;
                     for (i = result.results.length -1 ; i > -1; i--){
                          $('ul').prepend($('<li></li>').text("["
                               + response.transport + "] "
                               + result.results[i].from_user + " "
                               + result.results[i].text));
                   }
              } catch (err) {
                    incompleteMessage = data;
              }

The critical piece of code above is the parseJSON(incompleteMessage + data);. The size of the the data sent back by the server may vary between servers so we may not get the entire JSON object in one invocation of the callback. So we need to wrap that call inside a try/catch (since parseJSON will fail)  and make sure next time the callback gets invoked we append the previously received data. That scenario will happen only if you search for a popular hashtag and you get a large number of response (try #Nordiques !!!).

That's it for the client side. You can download the sample here and deploy it in any webserver supporting comet and or websocket or none of them!! The interface is simple and demonstrate the transport's auto detection.

Any cool and better designed interface is welcomed :-) Finally, you can get more information about this sample by reading my JavaOne talk.

For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum (no subscription needed), or follow the team or myself and tweet your questions there! You can also checkout the code on Github.

 Content from jfarcand.wordpress.com

Published at DZone with permission of Jean-Francois Arcand, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)