Gerard Davison is a Senior Principal Software Engineer working at Oracle in the UK on SOAP and REST tooling. Currently he is contributing in the area of WADL generation and client generation in the Jersey project and is maintaining the Abbot swing automation project. He also maintain a small holding of Hudson nodes run all those tests. He graduated from the University of Reading with a degree in Human Cybernetic and can't help looking for feedback loops. Gerard is a DZone MVB and is not an employee of DZone and has posted 32 posts at DZone. You can read more from them at their website. View Full User Profile

Lambda'ery WebSocket code (from UKTECH13 presentation)

12.21.2013
| 2434 views |
  • submit to reddit

At UKTECH13 I was asked to post the source code to the WebSocket used in my presentation, primarily because it was using JDK 8 constructs that were unfamiliar to many. One of the very nice things about the changes to the languages and the supporting library changes is the lack of if statements in the code.

I would note that in a real world chat application that it unlikely different rooms would have different resource URIs but for the purposes of the presentation this made sense.

package websocket;

import static java.util.Collections.emptySet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/chat/{room}")
public class ChatService {

  private static final Set<Session> EMPTY_ROOM = emptySet();

  static final ConcurrentMap<String, Set<Session>> rooms =
    new ConcurrentHashMap<>(); 


  @OnOpen
  public void onOpen(Session peer, @PathParam("room") String room) {
    rooms.computeIfAbsent(room,
                          s -> new CopyOnWriteArraySet<Session>()).add(peer);
  }

  @OnClose
  public void onClose(Session peer, @PathParam("room") String room) {

    rooms.getOrDefault(room, EMPTY_ROOM).remove(peer);
    
  }

  @OnError
  public void onError(Session peer, Throwable th,
                      @PathParam("room") String room) {
    System.out.println("Peer error " + room + " " + th);
  }


  @OnMessage
  public void message(String message, Session peer,
                      @PathParam("room") String room) {

    // Send a message to all peers in a room who are not the current
    // peer and are still open. Send the message asynchronously to ensure
    // that the first client is not hung up. 

    rooms.getOrDefault(room, EMPTY_ROOM).parallelStream()
         .filter(s -> s != peer && s.isOpen())
         .forEach(s -> s.getAsyncRemote().sendObject(message));
  };

}

Whilst working on my presentation it became apparent that it was also possible to use the "openSessions" and "getUserProperties" method to store discrimination data against the Session. I don't have enough experience yet to say which is the better design for a particular case.

package websocket;


import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(value = "/chat/{room}")
public class ChatService {

  private static final String ROOM_PROPERTY = "ROOM";


  @OnOpen
  public void onOpen(Session peer, @PathParam("room") String room) {
    peer.getUserProperties().put(ROOM_PROPERTY, room);
  }

  @OnClose
  public void onClose(Session peer, @PathParam("room") String room) {
    
    // No need to tidy up and data is store against peer
  }

  @OnError
  public void onError(Session peer, Throwable th,
                      @PathParam("room") String room) {
    System.out.println("Peer error " + room + " " + th);
  }


  @OnMessage
  public void message(String message, Session peer,
                      @PathParam("room") String room) {

    peer.getOpenSessions().parallelStream()
         .filter(s -> room.equals(s.getUserProperties().get(ROOM_PROPERTY)))
         .filter(s -> s != peer && s.isOpen())
         .forEach(s -> s.getAsyncRemote().sendObject(message));
  };

}


Published at DZone with permission of Gerard Davison, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Tags: