Using Server Sent Events with Vorpal

bridge

I came across the following 2 blogs (here and here) on a CDI framework for server sent event (SSE) several months ago. I downloaded the source then tried it with Vorpal running on Glassfish 3.1.x. The result was a really long exception stack.

4 months later.

After publishing part 4 of on conversations, I decided to see if I have any luck with the SSE framework. I wasn’t really expecting it to work. Well it did. What is even more surprising is that it worked with Vorpal. 

What I’m going to do in this blog is explain how the SSE framework works and more importantly how you can use it with Vorpal. Its really easy.

Setup the push event stream

Firstly create a notification handler. The notification handler will maintain the connection between the browser and the web application. The following is a simple notification handler

@ServerSentEvents(“/notifications”)

public class NotificationHandler extends ServerSentEventHandler {

   public void sendMessage(String msg) {

      try {

         connection.sendMessage(msg);

      } catch (IOException ex) {

      }

   }

A few points to note: the notification handler must extend ServerSentEventHandler; this class is from the framework. You also have to annotate the class with @ServerSentEvents and specify the URL of the event stream. In this case it is notifications. More on this later.

Now you need to write a HTML/JSP/JSF/whatever page to invoke this event stream. Here is a snippet of the HTML page along with a snippet of Javascript that loads loads the event stream

<html xmlns="http://www.w3.org/1999/xhtml"
      xmlns:h="http://java.sun.com/jsf/html">
    <h:head>
        <title>XMPP with SSE</title>
        <script src="resources/app.js"></script>
    </h:head>
    <h:body>
        <h1>Using Vorpal with SSE</h1>       
    </h:body>
</html>

The Javascript that creates the event stream is in app.js. Here is the code

var url = ‘http://’ + document.location.host

      + “/xmppsse/notifications”;

eventSource = new EventSource(url);

eventSource.onmessage = {

   document.body.innerHTML += msg + ’<br>’;

};

The important thing here is to get the event stream URL right. If our example, this is made up of the application context of our web application (in blue) and the value that you’ve specified in @ServerSentEvent annotation (in green).

 

Pushing data to the browser

We now hook the event handler to a Vorpal application. I did a screencast on developing a customer query service which allows you to use a Jabber client to query a database, which you can find here and the source here.

Lets extend this so that whenever someone performs a query, the result is also pushed to the browser using SSE.

@Message

public class MessageHandler {

   @EJB CustomerManager mgr;

   @Inject @ServerSentEventContext(“/notifications”)

        ServerSentEventHandlerContext<NotificationHandler> ctx;

   @Body(“{id}”)

   private String handle(@Named(“id”) int id) {

      //Assume we find every record and no NPE

      String record = mgr.find(id).toString();

      //Loop through all handlers

      for (NotificationHandler nh: ctx.getHandlers())

         try {

            nh.sendMessage(record);

         } catch (Exception ex) {

            nh.close();

         }

      return (record);

   }

After we got the record from CustomerManager, which uses JPA, we loop through all the registered handlers for an event stream channel and pushes the record to them. You have to get the correct handler to push to.

In our example, we are pushing to all browser registered on notifications. So we inject an instance of ServerSentEventHandlerContext with qualifying it with @ServerSentEventContext(“notifications”).

Example code

You can download the example source from here. I’ve bundled the SSE library into the download which is a NetBeans project.

Use the following steps to setup, deploy and run the example

  1. Configure an external component on your Jabber server. The example uses customer for subdomain and AlwaysRight for shared secret (Caps!). You can always change these settings. I use Openfire for the Jabber server
  2. Make sure that you’ve installed your Glassfish with Jabberwocky container.
  3. Deploy the project to Glassfish either from NetBeans or using asadmin command. It’s probably easier if you deploy if from NetBeans
  4. Open your browser and point to http://yourserver:port/xmppsse. I’ve tested with Firefox and Chrome, latest versions.
  5. Open your Jabber client, open a chat with query@customer.your_jabber_server. Send 1
  6. You should now see your reply display in your Jabber client and in your browser.

The source code for the SSE can be found here if you wish to look at it.

Part of the code (the app.js) is taken from Bhakti Mehta’s blog. Big thanks.

Till next time.

Art of Conversation: Miscellaneous Topics – Part 4

Programmatically looking up Beans

As you may know, you can use Vorpal client to create client side application. I talked about using client side Vorpal here. For the most part, Vorpal client has the same programming model as its server counterpart.

Vorpal relies heavily on CDI for resources; this is not a problem if you’re on the server side, but on the client side you have to lookup the CDI instances yourself. The trick is this is to get a reference to either BeanManager or WeldContainer.

Which of those 2 objects you get depends largely on how you bootstrap CDI in the Java SE environment. See this for the various ways. Since BeanManager is standard Java EE API, we’ll use BeanManager.

One way that I’ve found that works is to get CDI to inject an instance of BeanManager by listening to the container starting up event. Then you can cache a reference of BeanManager in a singleton. The following shows how we go about doing this

@ApplicationScoped

public class MySingletons {

   @Inject private BeanManager bm;

   @Inject ComponentContext ctx;

  

   private static MySingletons instance = null;

  

   @PostConstruct private void init() {

      instance = this;

   }

   private void containerInitialized(

         @ObservesContainerInitialized ciEvt) { }

   public BeanManager getBeanManager() {

      return (bm);

   }

   public ComponentContext getComponentContext() {

      return (ctx);

   }

   public static MySingletons getInstance() {

      return (instance);

   }

}

During container startup we use MySingletons to listen to the container initializing and use this opportunity to hold a reference to BeanManager. Same with ComponentContext. Now wherever you need to access either BeanManager or ComponentContext, you can use the static method MySingletons.getInstance() .

Note: Vorpal client is suppose to be CDI implementation agnostic, but I’ve only ever tested it with Weld.

Client side conversation setup

When you’re working with conversation on the client side, you have make sure that you are not working with conversation artifacts from a previous processing cycle.

On the server, before Vorpal calls your message handler, it initializes the component context and populate it with the appropriate bindings and meta data that your message handler will need to process a message correctly. Server side is mostly reactive processing to incoming messages and Vorpal uses this behaviour to perform a lot of mundane task under the hood.

However on the client side, message processing is mostly proactive viz. you click on a button and your application uses Vorpal client to send out a message. Since there is no initializing the component context; you may actually be picking up artifacts from the previous processing cycle. What you have to do is to clear the component context before you perform your processing by calling ComponentContext.startProcessingCycle().

The following code shows you how you clear the component context on every button click

public void actionPerformed(ActionEvent aEvt) {

   String cmd = aEvt.getActionCommand();

   switch (cmd) {

      case “New Conversation”:

         ComponentContext compCtx = MySingletons

               .getInstance().getComponentContext();

         compCtx.startProcessingCycle();

         Conversation conv = lookup(Conversation.class

               , new NamedQualifier(“__conversation__”));

         ConversationContext convCtx = lookup(

               ConversationContext.class);

         conv.begin();

         ResponseContext respCtx =

               new ResponseContext(…);

                    

         compCtx.send(respCtx);

         break;

      …

   }

}

private <T> T lookup(Class<T> type

      , Annotation… qualifiers) {

   T instance = null;

   BeanManager bm = MySingletons.getInstance();

   Bean<T> bean = (Bean<T>)bm.resolve(

         bm.getBeans(type, qualifiers));

   if (null != bean) {

      CreationalContext<T> ctx =

            bm.createCreationalContext(bean);

      instance = bean.create(ctx);

   }

   return (instance);

}

The above shows how you can lookup an instance of Conversation and ConversationContext programmatically. I’ll leave the NamedQualifier class as an exercise for the reader. Hint: use AnnotationLiteral

If you’re just interested in just creating a new conversation but not about associating any states with it then you can manually add a ThreadID to ResponseContext like so:

case “ New Conversation”:

   ComponentContext compCtx = MySingletons

         .getInstance().getComponentContext();

   compCtx.startProcessingCycle();

   ResponseContext respCtx = new ResponseContext(…);

   respCtx.add(ThreadID.generate());

       …

   compCtx.send(respCtx);

   break;

which is a lot simpler. I’ve blog about this in my previously.

Warning: you should not use ComponentContext.startProcessingCycle() when you are developing server side components or bad things will happen.

Turning off conversation inheritance

Vorpal allows conversation states to be inherited viz. if you create a new conversation in the context of an existing conversation, then that new conversation will inherit all the states from the existing ConversationContext.

If you want to suppress the new conversation from inheriting the states, add a property call com.kenai.jabberwocky.framework.property.inheritConversationContext to the existing conversation’s ComponentContext and set the value to false. This will prevent new conversation from inheriting the existing conversation’s state.

//ConversationContext from an existing conversation

convCtx.setAttribute(

      FrameworkProperty.INHERIT_CONVERSATION_CONTEXT

      , false);

//Outgoing packet will not inherit conversation states

ResponseContext respCtx = new ResponseContext(…);

respCtx.add(ThreadID.generate());

   …

compCtx.send(respCtx);

The com.kenai.jabberwocky.framework.property.inheritConversationContext is not a permanent attribute as it is cleared after every processing cycle. So you will have to set it again on the next processing cycle if you wish to further suppress conversation inheritance.

The September 2012 bundles supports the latest conversation features. You can download them here. I’ve just uploaded a new bundle with bug fixes.

Till next time.

Art of Conversation: Explicit Conversation – Part 3

In my previous blog entry, I talked about implicit conversations. A quick recap: implicit conversations are conversations that are created for you by the Vorpal framework whenever you send or receive an IQ get or set. These conversation are terminated when you send or receive an IQ result or error.

Conversation gives you the ability to store states either in a ConversationContext object or in the message handler class annotated with @ConversationScoped. For example, you can associate some state with an outgoing IQ get packet; so when the reply returns, you can look at the state know what to do with it. When Vorpal creates another implicit conversation in the context of an ongoing conversation, the newly created conversation will inherit the states from the ongoing conversation. This is known as conversation inheritance.

In this blog, we will introduce explicit conversation, viz conversation that are directly under the control of the application; explicit conversation share lots of similar properties as implicit conversations but build on these concepts.

What are explicit conversations?

In XMPP, conversation between 2 Jabber entities are marked with <thread>; see this document. You will typically find <thread> element in message. This allows both parties to track the packets between the 2 conversing entities. Below is an example of a pair of message exchange between a patient and his psychiatrist.

<message from=”normanb@arkham” to=”eliza@arkham” type=”chat”>
<body>I want my mother</body>
<thread>1234567890</thread>
</message>

<message from=”eliza@arkham” to=”normanb@arkham” type=”chat”>
<body>Why do you need your mother?</body>
<thread>1234567890</thread>
</message>

To see how to use explicit conversation, lets write s service, call Eliza (which is a bot), that provides psychiatric help.

@ConversationScoped
@Message
@To(“eliza@{__subdomain__}”)
public class ElizaService {
   @Inject @Named(“__conversation__”) Conversation conv;
   private Eliza eliza = null;

   @PostConstruct private void init() {
      conv.begin();
      //Initialize Eliza
  
   eliza = new Eliza();
        
   }

   @Body(“{body}”)
   private String handleMessage(@Named(“body”) String body) {
      if (“bye”.equals(body))
         conv.end();
      return (eliza.process(body));
   }
}

Assume we receive a new message with a thread id of 1234567890; also assume that this is the first time that Eliza service is seeing this new thread id.

So what we want to do is associate an instance of Eliza object with this thread. We will continue to use the same Eliza instance for messages with the same thread id until the sender decides to terminate the conversation.

To do that we annotate the message hander with @com.kenai.jabberwocky.framework.ConversationScoped. Do note that the @ConversationScoped is from com.kenai.jabberwocky.framework package and not from javax.enterprise.context. Any class that is annotated with this will be associated with the same conversation for the life time of that conversation.

We also inject an instance of Conversation object into the handler. The Conversation object is use to start and terminate a conversation. For explicit conversation, if you do not explicitly start them, the conversation will not begin. It is also important that we qualify the conversation with @Named(“__conversation__”) or else the appserver will try to inject a Conversation instance from JSF instead of from Vorpal.

This is how the code works: after the handler has been instantiated; the @PostConstruct method will start the conversation by calling conv.begin(). Since ElizaService class is annotated with @ConversationScoped, this particular instance will be associated with this thread id for the life time of the conversation.

After invoking @PostConstruct, normal method processing occurs viz. handleMessage() will be called. When we receive an ‘bye‘ in the message body, we terminate the conversation by calling conv.end(). After that the ElizaService instance will be discarded. A new instance will be created even when the next message contains the same thread id.

Once you have started a conversation, any handlers annotated with @ConversationScoped that process messages from this open conversation will also be automatically associated with the conversation. This is known as a conversation group.

@ConversationScoped
@Message
@To(“eliza@{__subdomain__}”)
@DataForm
   public class Configure {

For example if the following is the one of the handler that is invoked after the patient has started normanb@arkham has started the conversation, then Configure object will be associated with the conversation. When the conversation terminates, all objects in the conversation group will be released.

Using ConversationContext

The above can be rewritten using ConversationContext. A CovnersationContext is a map and behaves like HttpSession. You can also use this to store the conversation state. The following shows ElizaService rewritten using ConversationContext

@Message
@To(“eliza@{__subdomain__}”)
public class ElizaService {
   @Inject @Named(“__conversation__”) Conversation conv;
   @Inject ConversationContext convCtx;

   @PostConstruct private void init() {
      conv.begin();
      Eliza eliza = new Eliza();
      convCtx.setAttribute(“elizaInstance”, eliza);
   }

@Body(“{body}”)
private String handleMessage(@Named(“body”) String body) {
   Eliza eliza = (Eliza)convCtx.getAttribute(“elizaInstance”);
   if (“bye”.equals(body))
      conv.end();
   return (eliza.process(body));
   }
}

You can use a combination of @ConversationScoped objects and ConversationContext to hold your conversation state.

How do you decide which to use? If you have a really complex object then @ConversationScoped objects are the way to go; but if you decide to share conversation states with all handlers (including those that are not annotated with @ConversationScoped) then you should consider ConversationContext.

Creating new conversation threads

Vorpal does not support nested conversations; furthermore a message handler can only deal with 1 conversation at a time. Let say we have the following use case; in the course of normanb@arkham‘s conversation with Eliza, Eliza would like to start a new conversation thread on a new topic with normanb. In other words, normanb is have 2 open conversation with Eliza (2 unique thread id). To do this, you need to manually insert a new thread id into an outgoing message packet. The following code shows how this is done

@Message
@To(“eliza@{__subdomain__}”)
public class ElizaService {
   @Inject @Named(“__conversation__”) Conversation conv;
   @Inject ConversationContext convCtx;

   @PostConstruct private void init() {
      conv.begin();
      Eliza eliza = new Eliza();
      convCtx.setAttribute(“elizaInstance”, eliza);
   }

   @Body(“{body}”)
   private ResponseContext handleMessage(@Named(“body”) String body) {
      Eliza eliza = (Eliza)convCtx.getAttribute(“elizaInstance”);
      ResponseContext reply = new ResponseContext(
      ResponseContext.Type.Message);

      if (“bye”.equals(body))
         conv.end();
      else if (body.toLowerCase().startsWith(“new topic”)) {
         body = body.substring(9).trim();
         //Insert a new ThreadID into the response
        
responseContext.add(ThreadID.generate());
      }           
      responseContext.add(eliza.process(body));
      return (responseContext);
   }
}

Assume that when the response from normanb starts with the phrase ‘new topic‘ then we want to start a new conversation thread. Under normal circumstances, if there are no open conversation, then what we do is inject an instance of Conversation object in, call begin(). The out going message will then be marked with a randomly generated thread id.

However in the situation above, we already are in the context of an open conversation; invoking begin() again will result in an IllegalStateException. In this situation, to let Vorpal know that the out going message is under a new conversation, you have to manually add the <thread> element into the message packet. You can either manipulate the Message packet or use ResponseContext as show above to add a thread id.

When Vorpal sends out the message, it’ll create a new a Conversation and a ConversationContext objects for this new conversation; it then
automatically starts the conversation for you viz. calls Conversation.begin(). This is the only time an explicit conversation behaves like an implicit conversation. You’ll still need to manually terminate it. The ConversationContext will inherit all the values from the existing open conversation.

This new way of creating conversation works in all handlers including IQ and Presence; for example, you can create a new conversation with another Jabber entity when you are handling a disco#info. The new conversation’s ConversationContext will inherit all the values from the ConversationContext of the currently opened implicit conversation.

The only restriction is that this works with messages (<message>) only.

Flash Conversations

Inheriting conversation only works when a new conversation is created and started in the context of an existing conversation; for example in the case of implicit conversation (eg. sending out disco#info get in a disco#item result handler), the out going IQ packet’s conversation is automatically started by Vorpal so it’ll inherit the conversation states of the existing conversation. Similar situation with inserting a new thread id into messages described above.

There are cases where you would like to inherit the conversation states from an existing conversation but you only wish to start the conversation in the next packet that you receive. Here is an example

  1. fred@myjabber sends a message to chess@playground.myjabber to create a chess game
  2. chess@playground.myjabber randomly generates a game room name abcde. Saves abcde@conference.myjabber and fred@myjabber in ConversationContext.
  3. sends a presence to abcde@conference.myjabber to create a chat room to host the game
  4. conference.myjabber returns after creating the room
  5. chess@playground.myjabber now sends a direct invitation to fred@myjabber. Also updates ConversationContext with any other data need for the game

Since the invitation to fred@myjabber is a message (<message>), no conversation will be started. What Vorpal does at this point is this; if there is an existing conversation, it’ll take all the states in the ConversationContext and save that in the flash conversation map. Vorpal constructs a pseudo thread id made up of the ‘to’ and ‘from’ attributes from the message.

Sometime in the future, when fred@myjabber confirms the chess game, Vorpal will now pickup the existing ConversationContext (and Conversation) object from flash and starts the conversation.

So flash conversations are conversations that are not started but have the potential to start sometime in the near future; its model after Ruby on Rails’ flash. If a flash conversation is not started within a certain time period (default is 3 mins), its discarded.

See Arkham in playground especially CreateChatroom.java and ConfirmChatroom.java.

You can find the latest bundle here.

%d bloggers like this: