Friday, September 10, 2010

Create a TIBCO BW Adapter for Oracle AQ using XA

Advanced Queuing (AQ) is the queuing mechanism provided by the Oracle database. AQ queues can be accessed by PL/SQL or by java, using the AQAPI or AQJMS libraries. The payload on an Oracle AQ can be of type ADT (a custom data type defined in the database) or of type JMS_TEXT_MESSAGE (a provided data type). In this article, the example shows how to deal with JMS_TEXT_MESSAGE, but it can easily be applied for ADT.

The goal is to create an adapter in TIBCO BusinessWorks for AQ. The adapter needs to be able to both dequeue and enqueue, and the entire transaction must be XA compliant, to avoid the risk of losing messages. The enterprise service bus should not become owner of the dequeued message before the entire transaction (parsing etc.) has successfully been executed. Therefore, creating a simple bridge from AQ to (in this case) EMS was not an option.

There are a couple of approaches that could be thought of to solve this. For instance, the AQJMS libraries could be integrated in the BW project and invoke functionality from a Java Code activity. However, to be able to use a transaction group in BW you need to have a connection configured in BW that is XA compliant. In the Java Code activity you can fetch this connection, but this isn't an OracleConnection, which makes it useless in combination with the AQ libraries. Invoking the DBMS_AQ packages on the Oracle database directly also doesn't work, because the record type isn't supported in BW. Then there's the option of calling custom SQL from TIBCO BW, using a DECLARE - BEGIN - END block in which the dequeue procedure could be called for instance. However, to get the data, BW needs a result set which is not available outside this block of code.

In Java, you can create a CallableStatement to call SQL on the Oracle database. To call the dequeue procedure on the database, this has to be a DECLARE - BEGIN - END block. Now, the trick is to get a result set from this block of code, which is possible using a cursor. Opening this cursor with a bind variable, it's possible to execute the CallableStatement and retrieve the value of the cursor in Java as if it were a result set. In the code below, you can see how this looks like in the Java Activity in BW. This code is rather ugly because of the entire string being built within the source code, but it shows the full logic within the invoke() method of the Java Activity.

/* Available Variables: DO NOT MODIFY
  In  : Object Connection
  In  : String Schema
  In  : String Queue
  In  : String Consumer
  In  : int Timeout
  In  : String Navigation
  Out : String ToParty
  Out : String ToRole
  Out : String FromParty
  Out : String FromRole
  Out : String Action
  Out : String Service
  Out : String MessageID
  Out : String ReftoMessageID
  Out : String ConversationID
  Out : String BusinessMessage
* Available Variables: DO NOT MODIFY *****/
 
  Connection connection=((JavaConnectionAccessor)getConnection()).getDBConnection();

// Use a PL/SQL block to open the cursor
  CallableStatement cstmt;
  ResultSet cursor = null;
  String consumerString =  "dequeue_options.consumer_name := '" + Consumer + "'; ";
  
  if (Consumer != null && Consumer.length() > 0){
    consumerString = "dequeue_options.consumer_name := '"    + Consumer    + "'; ";
  }

  // Use a PL/SQL block to open the cursor
  cstmt = connection.prepareCall(
    "DECLARE "
    + "dequeue_options              DBMS_AQ.dequeue_options_t; "
    + "message_properties           DBMS_AQ.message_properties_t; "
    + "message                      SYS.AQ$_JMS_TEXT_MESSAGE; "
    + "message_header               SYS.AQ$_JMS_HEADER; "
    + "dequeue_msgid                RAW(16); "
    + "msg_text                     VARCHAR2(4000);"
    + "msg_lob                      CLOB;"
    + "BEGIN "
    + "dequeue_options.wait :=  " + Timeout + "; "
    + consumerString
    + "dequeue_options.navigation := DBMS_AQ." + Navigation + "; "
    
    + "DBMS_AQ.DEQUEUE(queue_name   => '" + Schema + "." 
                                          + Queue + "' "
    + "           ,dequeue_options    => dequeue_options "
    + "           ,message_properties => message_properties "
    + "           ,payload            => message "
    + "           ,msgid              => dequeue_msgid); "
    
    + "message_header := message.header; "
    
    + "IF message.text_len > 4000 THEN "
    + "     msg_lob := message.text_lob; "
    + "ELSE "
    + "     msg_text := message.text_vc; "
    + "END IF; "
    
    + "open ? for SELECT "
    + "message_header.get_string_property('ToParty')  as ToParty, "
    + "message_header.get_string_property('ToRole')            as ToRole, "
    + "message_header.get_string_property('FromParty')         as FromParty, "
    + "message_header.get_string_property('FromRole')          as FromRole, "
    + "message_header.get_string_property('Action')            as Action, "
    + "message_header.get_string_property('Service')           as Service, "
    + "message_header.get_string_property('MessageID')         as MessageID, "
    + "message_header.get_string_property('ReftoMessageID')    as ReftoMessageID, "
    + "message_header.get_string_property('ConversationID')    as ConversationID, "
    + "msg_lob as BusinessMessageLob, "
    + "msg_text as BusinessMessageText FROM dual; "
    + "END;"
  );
  try {
    cstmt.registerOutParameter(1, OracleTypes.CURSOR);
    cstmt.execute();
    cursor = ((OracleCallableStatement) cstmt).getCursor(1);

    // Use the cursor like a normal ResultSet
    cursor.next();
    ToParty = cursor.getString("ToParty");
    ToRole = cursor.getString("ToRole");
    FromParty = cursor.getString("FromParty");
    FromRole = cursor.getString("FromRole");
    Action = cursor.getString("Action");
    Service = cursor.getString("Service");
    MessageID = cursor.getString("MessageID");
    ReftoMessageID = cursor.getString("ReftoMessageID");
    ConversationID = cursor.getString("ConversationID");
    BusinessMessage = cursor.getString("BusinessMessageText");
  
    if (BusinessMessage == null || BusinessMessage.length() == 0) {
      Clob businessMessageClob = cursor.getClob("BusinessMessage");
      BusinessMessage = businessMessageClob.getSubString(1L,
      (int) businessMessageClob.length() );
    }
    cleanup(cstmt, cursor)
} catch(Exception  e){
  cleanup(cstmt, cursor);
  if (e.getMessage().contains("ORA-25228")) {
    //end of fetch has occured, queue seems to be empty
    BusinessMessage = null;
    Action = null;      
  } else {
    throw e;      
  }
}


Now the code above is the implementation of the invoke() method in the Java Activity. It describes how to dequeue a JMS_TEXT_MESSAGE. You can specify its IN and OUT parameters and adapt this to your specific needs. You'll also have to write the cleanup() method that closes the result set and the callable statement (in that order!). This code works is suitable for our JDBC XA Connection because prepareCall() can be called on a regular Connection and does not need an OracleConnection.

You may have noticed that there is something about "Navigation" in the code. Oracle AQ has two ways of determining the message to be dequeued: Navigation.FIRST_MESSAGE and Navigation.NEXT_MESSAGE. FIRST_MESSAGE navigation makes a snapshot of the queue table and checks what is the first message to be dequeued. NEXT_MESSAGE navigation does not make a snapshot of the queue table, but uses a snapshot that was created earlier by FIRST_MESSAGE navigation. Next message navigation performs better and Oracle recommends dequeuing one message by first message navigation and then a batch by next message navigation. This is what I programmed in the adapter. I created a process variable that is initialized with the string "FIRST_MESSAGE". When a message has been dequeued, it's replaced by "NEXT_MESSAGE". When no message has been dequeued, it's replaced by it's initial value "FIRST_MESSAGE". In this way, optimal navigation through AQ is implemented.














The most left assign activity is the one where "FIRST_MESSAGE" is assigned to the process variable Navigation. After this assign, we enter an infinite while loop. Within this loop, we create another group that is a Transaction group. It's configured as seen in the picture below.













The JDBC connection is configured as below, so it supports XA transactions on the Oracle database. To make this connection work, you need to add ojdbc14.jar to the tpcl/<version>/jdbc directory.












For transaction management, I took the JOTM transaction manager. I copied all the .jar files of JOTM to the tpcl/<version>/lib directory.