Art of Conversation: Implicit Conversations – Part 2

You say it best, when you say nothing at all” – When You Say Nothing At All, Ronan Keating

In my last blog, I reintroduce conversation support in Vorpal. Conversation allows Vorpal application to keep states about ongoing covnersation. One of the side effect of conversation support is a notion call implicit conversation. 
What is an implicit conversation? It is when a Vorpal application
  • Either sends/receives an IQ get/set packet, and
  • Receives/sends an IQ result/error packet in response
So an implicit conversation is an exchange between a pair of corresponding IQ packets, having the same packet id; the conversation is started automatically when you sends or receives an IQ get or set and is terminated automatically when you receives or sends an IQ result or error packet.

Starting and Terminating Implicit Conversations

Note: the following examples makes heavy use of service discovery. If you’re not familiar with service discovry have a look at XEP-0030 and also this blog.
Lets look at how implicit conversation works with the following use case: assume that we are writing a service  that upon receiving a ‘chat’ message will send and disco#info to the server. When the reply comes back, the services will send a message back to the message sender with the number of features that the server supports.
Lets first look at the code to send the disco#info and receive the result

   @Body("{ignore}")

   private void handleMessage(@From JID from) { 

      ResponseContext iqCtx = new ResponseContext(ResponseContext.Type.IQ);   
      iqCtx.to(postStartEvt.getDomain()).add(IQ.Type.get)
            .add(new IQChildElement(“query”, “http://jabber.org/protoco/disco#info”);
      //Implicit conversation starts the moment we send the IQ packet  
      componentContext.send(iqCtx); 
   } 

   @Query(namespace=”http://jabber.org/protoco/disco#info”) 
   @IQType(type=IQ.Type.result) 
   private void discoInfoReply(@Feature List<FeatureSpecification> features) {
      //Do something 
   } //Implicit conversation ends the moment we exit the handle method 

In the example above, we have 2 different methods  handleMessage() and discoInfoReply()in 2 difference classes (one class annotated with @Message and the other with @IQ). On receiving a chat message, handleMessage() constructs a disco#info and sends it to the server. If every thing goes well, a reply will return to be handled by discoInfoReply() handler. 

An implicit conversation begins the moment we send an IQ get/set packet (the green lines); the implicit conversation resumes when we receive an IQ result/error reply and do not end until we exit the handler (the red lines). In the case where this is no handler to the IQ response, the implict conversation will resume and then terminates immediately. The only time that we have an incomplete implicit conversation is if we did not receive any reply.  More on this later. 
The reverse is also true viz. when our service receives a IQ get, the implicit conversation starts the moment the handler is invoked and ends when we exit the handler method.
Vorpal tracks IQ request/response packets by their packet id. In XMPP, IQ packets are required to have packet ids; furthermore an IQ request/response packet pair must have the same packet id. Vorpal uses this to track the implicit conversation.

Saving States in ConversationContext

Lets continue with our example. We have fulfilled the first part of our use case and that is by sending a disco#info to our server. However to send the result back to message sender we need to know the sender’s JID. But the sender’s JID is only available in handleMessage(). For this we will need to use ConversationContext

   @Message

   public class MessageHandler {
      @Inject ComponentContext componentContext;
      @Inject ConversationContext convCtx;
      @Body(“{ignore}”)
      private void handleMessage(@From JID from) { 
         //Save the send in the conversation context
         convCtx.setAttribute(“sender” from);

         ResponseContext iqCtx = new ResponseContext(ResponseContext.Type.IQ);   
         iqCtx.to(postStartEvt.getDomain()).add(IQ.Type.get)
               .add(new IQChildElement(“query”, “http://jabber.org/protoco/disco#info&#8221;);

         //Implicit conversation starts the moment we send the IQ packet

         componentContext.send(iqCtx); 

      }

   @IQ
   public class IQHandler {
      @Inject ComponentContext componentContext;

      @Inject ConversationContext convCtx;

      @Query(namespace=”http://jabber.org/protoco/disco#info&#8221;) 
      @IQType(type=IQ.Type.result) 
      private void discoInfoReply(@Feature List<FeatureSpecification> features) {
         JID from = (JID)convCtx.getAttribute(“sender”);
         ResponseContext msgCtx = new ResponseContext(ResponseContext.Type.Message);
         msgCtx.to(from)
               .add(“There are ” + features.size() + ” features”);
         componentContext.send(msgCtx);
      //Implicit conversation ends the moment we exit the handle method
When we receive the chat message, we save the send’s JID in a ConversationContext. A ConversationContext is like a HttpSession object. So when we send the disco#info, the ConversationContext will be associated with the implicit message. Upon receiving a reply, Vorpal will reinject the same ConversationContext object based on the id of the IQ packet.
Note: Observant reader may recall that in handleMessage() the implicit conversation doesn’t start until we send the IQ packet (the green line). So the ConversationContext object in this case is said to be an unbounded context as oppose to a bounded one in discoInfoReply().

Inheriting Conversations

ComponentContext.send() allows you to send more than 1 packet. What if in handleMessage() method, we send more than one disco#info packet to different JID entities eg. conference.mydomain and pubsub.mydomain like the following code snippet

   componentContext.send(toConference, toPubsub);

where toConference and toPubsub are disco#info packets. Firstly, Vorpal will start two separate implicit conversation since the two IQ packets will have different ids. Secondly both the implicit conversations will have their own separate ConversationContext but these two ConversationContexts will inherit all the attributes from the existing convCtx. Conversation inheritance applies to all implicit conversations irregardless of whether a ConversationContext is bound or unbound.
Lets consider another scenario. Assume that instead of sending back the number of features that a XMPP server supports, we want to return a list of all Jabber entities discoverable through disco#items. The algorithm to do that is roughly like this:
  1. Send a disco#info to the Jabber entity. If this is the start, then it’ll be the JID of the XMPP server
  2. Check if the replies contains disco#items
  3. Send a get disco#item to the Jabber entity. Again the first JID will be the XMPP server
  4. Save a list of all the item when the disco#item reply returns. 
  5. For each JID in <item>, repeat from step 1 until there are no more disco#items to be found
The following code kicks off this process. when we receives a message

   @Message

   public class MessageHandler {

      @Inject ConversationContext convCtx;

      @Body(“{ignore}”)
      private void handleMessage(@From JID from) { 

         ResponseContext iqCtx = new ResponseContext(ResponseContext.Type.IQ);
         iqCtx.add(IQ.Type.get).to(postStartEvt.getDomain())
              .add(new IQChildElement(“query”, “http://jabber.org/protocol/disco#info&#8221;);
         //Setup the conversation context
         convCtx.setAttribute(“requestor”, from);
         convCtx.setAttribute(“jids”, new HashSet<JID>());
         convCtx.setAttribute(“disco#info”, new AtomicInteger(1));
         convCtx.setAttribute(“disco#items”, new AtomicInteger(0));
         postStartEvt.getComponentContext().send(iqCtx);
   }

The only important thing here is that we setup a collection to hold our discovered JIDs. Also we create 2 AtomicInteger to count the number of disco#info and disco#items that we have sent.  This is how the counters work: every time we send a get disco#info we increment the count. When we receive a reply for disco#info, we decrement the counter. When the counter reaches zero, we know that we have completed our search.  The same goes for the disco#items counter. We also keep the client’s JID for the reply.
The following class shows the real work of collating the Jabber entities.

   @IQ

   public class CollateJID {


      @Inject ConversationContext convCtx;

      @Query(namespace=”http://jabber.org/protocol/disco#info&#8221;)
      public ResponseContext discoInfoResult(@From JID from) {
         //Decrement the disco#info counter cause we have just got a reply
         discoInfo(-1);
         ResponseContext iqCtx = new ResponseContext(ResponseContext.Type.IQ);
         iqCtx.add(IQ.Type.get).to(from)
               .add(new IQChildElement(“query”, “http://jabber.org/protocol/disco#items&#8221;);

         //Decrement the disco#items counter cause we have just got a reply
         discoItems(1);

         return (iqCtx);
      }

      @Query(namespace=”http://jabber.org/protocol/disco#info&#8221;)
      public ResponseContext discoInfoResult() {
         //Decrement the disco#info counter cause we have just got a reply
         discoInfo(-1);
         //Check to see if 
         if (shouldEnd())
            return (createReply());
         return (null);        
      }
      @Query(namespace=”http://jabber.org/protocol/disco#items&#8221;)
      public ResponseContext discoItemsResult(@Item List<ItemSpecification> jids) {
         //Decrement the disco#items counter cause we have just got a reply
         discoItems(-1);
         if (jids.isEmpty() && shouldEnd())
            return (createReply());
         
         Set<JID> jidSet = (Set<JID>)convCtx.getAttribute(“jids”);
         ResponseContext iqCtx = ResponseContext(ResponseContext.Type.IQ);
         iqCtx.add(IQ.Type.get)
               .add(new IQChildElement(“query”, “http://jabber.protocol.disco#info&#8221;));
         for (ItemSpecification j: jids) {
            iqCtx.to(j.getJid());
            jidSet.add(j.getJid);
         }
         //Increment the disco#info counter. We are about to send a bunch of these
         discoInfo(jids.size());
         return (iqCtx);
      }
      //Helper methods
      private int discoInfo(int i) {
         return (((AtomicInteger)convCtx.getAttribute(“disco#info”)).addAndGet(i));
      }
      private int discoItems(int i) {
         return (((AtomicInteger)convCtx.getAttribute(“disco#items”)).addAndGet(i));
      }
      private boolean shouldEnd() {
         return ((discoInfo(0) <= 0) && (discoItems(0) <= 0));
      }
      private ResponseContext createReply() {
         Set<JID> jidSet = (Set<JID>)convCtx.getAttribute(“jids”);
         ResponseContext reply = new ResponseContext(ResponseContext.Type.Message);
         reply.to((JID)convCtx.getAttribute(“requestor”))
               .add(jidSet.toString());
         return (reply);
      }
   }

The code is long but its acutally quite simple. After sending the initial disco#info to the server (in handleMessage()), the returns the result. This is handle by our IQ handler class CollateJID above. The disco#info reply can produce 2 different response; if the reply does not contain disco#items feature (meaning the server does not have any Jabber entities), then this will be handled by discoInfoResult() method (in blue). We decrement the disco#info counter in the conversation context and checks if we should end. 
If the disco#info reply do contain disco#items feature, then we send a disco#items to the sender of the reply packet. At the same time we decrement disco#info and increment disco#items counter. This is handled by discoInfoResult() (in red).
When we receive the disco#items reply, for every JIDs in the reply we send need to further send a disco#info packet. This is handle by discoItemsResult() (in green). After that, the entire service discovery process repeats itself.
What is interesting to note is the conversation inheretance happening here. Once we have initialized the initial conversation context object with the sender’s JID and the counters, these values are inherited in all subsequent implicit conversations. For example, when we are in discoInfoResult() (the red one) in response to a disco#info reply,  Vorpal will inject the previously created conversation context into the handler. Now when we now send out disco#items, Vorpal will create a new conversation context for this implicit conversation. But since there is an existing conversation context, the new conversation context for the outgoing disco#items packet will inherit all the values.
What happens if there is no response to a previously sent disco#items or disco#info packet? Wouldn’t Vorpal be holding all these objects in conversation context? Memory leak? All conversations have timeouts unless you turn them off. When timeout occurs for a particular conversation, the conversation context object for that conversation will be destroyed. We will look at timeout and handling them in a future blog.
If you’re testing implicit conversation and find a bug or have issues or questions, please post them to the forum.
Advertisements
%d bloggers like this: