The goal is to create an adapter in TIBCO BusinessWorks for AQ. The adapter needs to be able to both dequeue and enqueue, and the entire transaction must be XA compliant, to avoid the risk of losing messages. The enterprise service bus should not become owner of the dequeued message before the entire transaction (parsing etc.) has successfully been executed. Therefore, creating a simple bridge from AQ to (in this case) EMS was not an option.
There are a couple of approaches that could be thought of to solve this. For instance, the AQJMS libraries could be integrated in the BW project and invoke functionality from a Java Code activity. However, to be able to use a transaction group in BW you need to have a connection configured in BW that is XA compliant. In the Java Code activity you can fetch this connection, but this isn't an OracleConnection, which makes it useless in combination with the AQ libraries. Invoking the DBMS_AQ packages on the Oracle database directly also doesn't work, because the record type isn't supported in BW. Then there's the option of calling custom SQL from TIBCO BW, using a DECLARE - BEGIN - END block in which the dequeue procedure could be called for instance. However, to get the data, BW needs a result set which is not available outside this block of code.
In Java, you can create a CallableStatement to call SQL on the Oracle database. To call the dequeue procedure on the database, this has to be a DECLARE - BEGIN - END block. Now, the trick is to get a result set from this block of code, which is possible using a cursor. Opening this cursor with a bind variable, it's possible to execute the CallableStatement and retrieve the value of the cursor in Java as if it were a result set. In the code below, you can see how this looks like in the Java Activity in BW. This code is rather ugly because of the entire string being built within the source code, but it shows the full logic within the invoke() method of the Java Activity.
/* Available Variables: DO NOT MODIFY In : Object Connection In : String Schema In : String Queue In : String Consumer In : int Timeout In : String Navigation Out : String ToParty Out : String ToRole Out : String FromParty Out : String FromRole Out : String Action Out : String Service Out : String MessageID Out : String ReftoMessageID Out : String ConversationID Out : String BusinessMessage * Available Variables: DO NOT MODIFY *****/ Connection connection=((JavaConnectionAccessor)getConnection()).getDBConnection(); // Use a PL/SQL block to open the cursor CallableStatement cstmt; ResultSet cursor = null; String consumerString = "dequeue_options.consumer_name := '" + Consumer + "'; "; if (Consumer != null && Consumer.length() > 0){ consumerString = "dequeue_options.consumer_name := '" + Consumer + "'; "; } // Use a PL/SQL block to open the cursor cstmt = connection.prepareCall( "DECLARE " + "dequeue_options DBMS_AQ.dequeue_options_t; " + "message_properties DBMS_AQ.message_properties_t; " + "message SYS.AQ$_JMS_TEXT_MESSAGE; " + "message_header SYS.AQ$_JMS_HEADER; " + "dequeue_msgid RAW(16); " + "msg_text VARCHAR2(4000);" + "msg_lob CLOB;" + "BEGIN " + "dequeue_options.wait := " + Timeout + "; " + consumerString + "dequeue_options.navigation := DBMS_AQ." + Navigation + "; " + "DBMS_AQ.DEQUEUE(queue_name => '" + Schema + "." + Queue + "' " + " ,dequeue_options => dequeue_options " + " ,message_properties => message_properties " + " ,payload => message " + " ,msgid => dequeue_msgid); " + "message_header := message.header; " + "IF message.text_len > 4000 THEN " + " msg_lob := message.text_lob; " + "ELSE " + " msg_text := message.text_vc; " + "END IF; " + "open ? for SELECT " + "message_header.get_string_property('ToParty') as ToParty, " + "message_header.get_string_property('ToRole') as ToRole, " + "message_header.get_string_property('FromParty') as FromParty, " + "message_header.get_string_property('FromRole') as FromRole, " + "message_header.get_string_property('Action') as Action, " + "message_header.get_string_property('Service') as Service, " + "message_header.get_string_property('MessageID') as MessageID, " + "message_header.get_string_property('ReftoMessageID') as ReftoMessageID, " + "message_header.get_string_property('ConversationID') as ConversationID, " + "msg_lob as BusinessMessageLob, " + "msg_text as BusinessMessageText FROM dual; " + "END;" ); try { cstmt.registerOutParameter(1, OracleTypes.CURSOR); cstmt.execute(); cursor = ((OracleCallableStatement) cstmt).getCursor(1); // Use the cursor like a normal ResultSet cursor.next(); ToParty = cursor.getString("ToParty"); ToRole = cursor.getString("ToRole"); FromParty = cursor.getString("FromParty"); FromRole = cursor.getString("FromRole"); Action = cursor.getString("Action"); Service = cursor.getString("Service"); MessageID = cursor.getString("MessageID"); ReftoMessageID = cursor.getString("ReftoMessageID"); ConversationID = cursor.getString("ConversationID"); BusinessMessage = cursor.getString("BusinessMessageText"); if (BusinessMessage == null || BusinessMessage.length() == 0) { Clob businessMessageClob = cursor.getClob("BusinessMessage"); BusinessMessage = businessMessageClob.getSubString(1L, (int) businessMessageClob.length() ); } cleanup(cstmt, cursor) } catch(Exception e){ cleanup(cstmt, cursor); if (e.getMessage().contains("ORA-25228")) { //end of fetch has occured, queue seems to be empty BusinessMessage = null; Action = null; } else { throw e; } }
Now the code above is the implementation of the invoke() method in the Java Activity. It describes how to dequeue a JMS_TEXT_MESSAGE. You can specify its IN and OUT parameters and adapt this to your specific needs. You'll also have to write the cleanup() method that closes the result set and the callable statement (in that order!). This code works is suitable for our JDBC XA Connection because prepareCall() can be called on a regular Connection and does not need an OracleConnection.
You may have noticed that there is something about "Navigation" in the code. Oracle AQ has two ways of determining the message to be dequeued: Navigation.FIRST_MESSAGE and Navigation.NEXT_MESSAGE. FIRST_MESSAGE navigation makes a snapshot of the queue table and checks what is the first message to be dequeued. NEXT_MESSAGE navigation does not make a snapshot of the queue table, but uses a snapshot that was created earlier by FIRST_MESSAGE navigation. Next message navigation performs better and Oracle recommends dequeuing one message by first message navigation and then a batch by next message navigation. This is what I programmed in the adapter. I created a process variable that is initialized with the string "FIRST_MESSAGE". When a message has been dequeued, it's replaced by "NEXT_MESSAGE". When no message has been dequeued, it's replaced by it's initial value "FIRST_MESSAGE". In this way, optimal navigation through AQ is implemented.
The most left assign activity is the one where "FIRST_MESSAGE" is assigned to the process variable Navigation. After this assign, we enter an infinite while loop. Within this loop, we create another group that is a Transaction group. It's configured as seen in the picture below.
The JDBC connection is configured as below, so it supports XA transactions on the Oracle database. To make this connection work, you need to add ojdbc14.jar to the tpcl/<version>/jdbc directory.
For transaction management, I took the JOTM transaction manager. I copied all the .jar files of JOTM to the tpcl/<version>/lib directory.