Edit | Attach | New | Raw | Delete | History | Print | Tools

Two Phase Commit With Tomcat Spring JMS And JDBC

The configuration of two phase commit (2PC) has to be planned carefully. The environment (defined by the installation instructions) is a lightweight application server. This environment leaves the proper configuration in your hand. No fancy defaulting known by full blown JEE servers. This document describes the special configuration properties for 2PC. The basic configuration is describes in Tomcat Spring ActiveMQ MySQL JMX Integration. The frameworks used have changed by version. But this document should be true for older versions too.

Atomikos 3.7

Tomcat 6.0.32

Spring Framework 3.0.5

Spring Integration 2.0.4

Installation

Follow the installation instructions from Tomcat Spring ActiveMQ MySQL JMX Integration.

SaveMessage

This bean stores the incoming message onto a database. This class has no transaction logic. The transaction handling is defined within the spring flow.
import java.io.StringWriter;
import java.util.List;

import javax.sql.DataSource;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.util.Assert;

import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.transformer.Transformer;

public class SaveMessage implements Transformer, InitializingBean
{
   private JdbcTemplate jdbcTemplate = null;

   // Start Bean Section

   public void setDataSource(DataSource dataSource)
   {
      this.jdbcTemplate = new JdbcTemplate(dataSource);
   }

   // End Bean Section

   @Override
   public final Message<?> transform(Message<?> message) throws MessagingException
   {
      try {
         Object payload = message.getPayload();
         if (payload instanceof String) {
            String xmlMessage = (String) payload;
            insert(xmlString);
         }
         return (message);
      } catch (Exception e) {
         throw new MessagingException(message, e.getMessage() + " " + e.getCause(), e);
      }
   }

   public void insert(String xmlString)
   {
      StringWriter writer = new StringWriter();

      try {
         writer.append("INSERT INTO TRADE (Message) values ('");
         writer.append(xmlString);
         writer.append("')");

         jdbcTemplate.update(writer.toString());
      } catch (Exception e) {
      }
   }

   @Override
   public void afterPropertiesSet() throws Exception
   {
      Assert.notNull(jdbcTemplate, "dataSource object must not be null");
   }
}

2pc.xml

Here is a sample application that demonstrates how you can run TransactionsEssentials in a web application after it has been globally installed. The global installation already configured TransactionsEssentials. Therefore the J2ee version of manager and transactions can be used.

The application starts with an JMS listener. The property sessionTransacted has to be set to "true" to start an transaction for every incoming JMS message. This differs to a "real" J2EE server because a J2EE server sets session transacted to true by default. The property sessionAcknowledgeMode has to be set to 0. Now only the transaction can commit the incoming message for shure. Spring uses auto commit as the default for sessionAcknowledgeMode even if sessionTransacted is set to true. Spring documentation assumes your JMS provider does not use sessionAcknowledgeMode if sessionTransacted is set to true. But this is not true for every JMS provider.

The simple database bean stores the JMS message payload onto a database. This bean demonstrates an database statement under transaction control where the transaction is started by an JMS listener.

The last activity is to send an incoming message onto the output queue. The property sessionTransacted has to be set to "true" to participate in an existing transaction (from an incoming message). The property sessionAcknowledgeMode has to be set to 0. Now only the transaction can commit the outgoing message for sure.

<beans   xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsd="http://www.w3.org/2001/XMLSchema"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:spi="http://www.springframework.org/schema/integration"
      xmlns:jee="http://www.springframework.org/schema/jee"
      xmlns:util="http://www.springframework.org/schema/util"
      xmlns:context="http://www.springframework.org/schema/context"
      xmlns:spi-jms="http://www.springframework.org/schema/integration/jms"
      xmlns:spi-xml="http://www.springframework.org/schema/integration/xml"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd 

http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd 

http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/integration/xml 

http://www.springframework.org/schema/integration/xml/spring-integration-xml-2.0.xsd http://www.springframework.org/schema/jee 

http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context 

http://www.springframework.org/schema/context/spring-context-3.0.xsd">
   <spi:channel id="inputChannel"/>
   <spi:channel id="outputChannel"/>

   <!-- Configure JTA transaction manager -->
   <bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.J2eeTransactionManager"/>
   <bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction"/>
   <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
      <property name="transactionManager" ref="AtomikosTransactionManager"/>
      <property name="userTransaction" ref="AtomikosUserTransaction"/>
   </bean>

   <!-- JMS Input Endpoint -->
   <bean id="defaultListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="receiveTimeout" value="10000"/>
      <property name="connectionFactory" ref="connectionFactory"/>
      <property name="destination" ref="inputQueueDestination"/>
      <property name="transactionManager" ref="transactionManager"/>
      <property name="sessionTransacted" value="true"/>
      <property name="sessionAcknowledgeMode" value="0"/>
   </bean>
   <bean id="jmsInput" class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
      <constructor-arg ref="defaultListenerContainer"/>
      <constructor-arg ref="messageListener"/>
   </bean>
   <bean id="messageListener" class="com.westlb.traderouter.jms.listener.ChannelPublishingJmsMessageListener">
      <property name="requestChannel" ref="inputChannel"/>
      <property name="expectReply" value="false"/>
   </bean>

   <!-- JDBC Operation -->
   <bean id="saveMessage" class="SaveMessage">
      <property name="dataSource" ref="trdrDataSource"/>
   </bean>
   <spi:transformer id="saveMessageTransformer" ref="saveMessage" input-channel="inputChannel" output-channel="outputChannel"/>

   <!-- JMS Output Endpoint -->
   <spi-jms:outbound-channel-adapter id="outboundChannelAdapter" channel="outputChannel" jms-template="jmsProducerTemplate"/>

   <bean id="jmsOutputTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="connectionFactory"/>
      <property name="defaultDestination" ref="outputQueueDestination"/>
      <property name="deliveryPersistent" value="true"/>
      <property name="sessionTransacted" value="true"/>
      <property name="sessionAcknowledgeMode" value="0"/>
   </bean>

   <!-- JDBC Configuration -->
   <!-- Get database driver XA properties from file -->
   <util:properties id="jdbcConfiguration" location="classpath:jdbcconfiguration.properties"/>
   <!-- XA Pooled DataSource -->
   <bean id="trdrDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close">
      <property name="uniqueResourceName" value="DokaDataSourceTransaction"/>
      <property name="xaDataSourceClassName" value="org.apache.derby.jdbc.EmbeddedXADataSource40"/>
      <property name="xaProperties" ref="jdbcConfiguration"/>
      <property name="testQuery" value="values(1)"/>
   </bean>

   <!-- JMS Configuration -->
   <!-- Connection Pool -->
   <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close">
      <property name="uniqueResourceName" value="ConnectionTransaction"/>
      <property name="xaConnectionFactory" ref="xaQueueConnectionFactory"/>
   </bean>
   <!-- Queue Manager Resources -->
    <jee:jndi-lookup id="xaQueueConnectionFactory" jndi-name="java:comp/env/jms/XAQueueConnectionFactory" resource-ref="true"/>
    <!-- Queue Resources -->
    <jee:jndi-lookup id="inputQueueDestination" jndi-name="java:comp/env/jms/InputQueue" resource-ref="true"/>
    <jee:jndi-lookup id="outputQueueDestination" jndi-name="java:comp/env/jms/OutputQueue" resource-ref="true"/>
</beans>

Pitfalls

Be sure every needed property is set. Do not rely on documented default values! Your infrastructure could be different to the one from the documentation you have read. The outgoing message could be auto committed if sessionAcknowledgeMode is not 0. This could lead to memory leaks if the transaction tries to commit the message a second time.

The java file and spring flow are obfuscated from the original files. So there may be typos in there. I will fix them if I am notified. spacer

Copyright © 2014 Atomikos BVBA. Transaction Management for Extreme Transaction Processing and SOA Environments serving ISV, Commercial, OEM and Open Source Markets
Site map RSS ATOM