Atomikos 3.7
Tomcat 6.0.32
Spring Framework 3.0.5
Spring Integration 2.0.4
import java.io.StringWriter; import java.util.List; import javax.sql.DataSource; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.util.Assert; import org.springframework.integration.Message; import org.springframework.integration.MessagingException; import org.springframework.integration.transformer.Transformer; public class SaveMessage implements Transformer, InitializingBean { private JdbcTemplate jdbcTemplate = null; // Start Bean Section public void setDataSource(DataSource dataSource) { this.jdbcTemplate = new JdbcTemplate(dataSource); } // End Bean Section @Override public final Message<?> transform(Message<?> message) throws MessagingException { try { Object payload = message.getPayload(); if (payload instanceof String) { String xmlMessage = (String) payload; insert(xmlString); } return (message); } catch (Exception e) { throw new MessagingException(message, e.getMessage() + " " + e.getCause(), e); } } public void insert(String xmlString) { StringWriter writer = new StringWriter(); try { writer.append("INSERT INTO TRADE (Message) values ('"); writer.append(xmlString); writer.append("')"); jdbcTemplate.update(writer.toString()); } catch (Exception e) { } } @Override public void afterPropertiesSet() throws Exception { Assert.notNull(jdbcTemplate, "dataSource object must not be null"); } }
Here is a sample application that demonstrates how you can run TransactionsEssentials in a web application after it has been globally installed. The global installation already configured TransactionsEssentials. Therefore the J2ee version of manager and transactions can be used.
The application starts with an JMS listener. The property sessionTransacted has to be set to "true" to start an transaction for every incoming JMS message. This differs to a "real" J2EE server because a J2EE server sets session transacted to true by default. The property sessionAcknowledgeMode has to be set to 0. Now only the transaction can commit the incoming message for shure. Spring uses auto commit as the default for sessionAcknowledgeMode even if sessionTransacted is set to true. Spring documentation assumes your JMS provider does not use sessionAcknowledgeMode if sessionTransacted is set to true. But this is not true for every JMS provider.
The simple database bean stores the JMS message payload onto a database. This bean demonstrates an database statement under transaction control where the transaction is started by an JMS listener.
The last activity is to send an incoming message onto the output queue. The property sessionTransacted has to be set to "true" to participate in an existing transaction (from an incoming message). The property sessionAcknowledgeMode has to be set to 0. Now only the transaction can commit the outgoing message for sure.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:spi="http://www.springframework.org/schema/integration" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context" xmlns:spi-jms="http://www.springframework.org/schema/integration/jms" xmlns:spi-xml="http://www.springframework.org/schema/integration/xml" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/integration/xml http://www.springframework.org/schema/integration/xml/spring-integration-xml-2.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <spi:channel id="inputChannel"/> <spi:channel id="outputChannel"/> <!-- Configure JTA transaction manager --> <bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.J2eeTransactionManager"/> <bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction"/> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="AtomikosTransactionManager"/> <property name="userTransaction" ref="AtomikosUserTransaction"/> </bean> <!-- JMS Input Endpoint --> <bean id="defaultListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="receiveTimeout" value="10000"/> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="inputQueueDestination"/> <property name="transactionManager" ref="transactionManager"/> <property name="sessionTransacted" value="true"/> <property name="sessionAcknowledgeMode" value="0"/> </bean> <bean id="jmsInput" class="org.springframework.integration.jms.JmsMessageDrivenEndpoint"> <constructor-arg ref="defaultListenerContainer"/> <constructor-arg ref="messageListener"/> </bean> <bean id="messageListener" class="com.westlb.traderouter.jms.listener.ChannelPublishingJmsMessageListener"> <property name="requestChannel" ref="inputChannel"/> <property name="expectReply" value="false"/> </bean> <!-- JDBC Operation --> <bean id="saveMessage" class="SaveMessage"> <property name="dataSource" ref="trdrDataSource"/> </bean> <spi:transformer id="saveMessageTransformer" ref="saveMessage" input-channel="inputChannel" output-channel="outputChannel"/> <!-- JMS Output Endpoint --> <spi-jms:outbound-channel-adapter id="outboundChannelAdapter" channel="outputChannel" jms-template="jmsProducerTemplate"/> <bean id="jmsOutputTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="outputQueueDestination"/> <property name="deliveryPersistent" value="true"/> <property name="sessionTransacted" value="true"/> <property name="sessionAcknowledgeMode" value="0"/> </bean> <!-- JDBC Configuration --> <!-- Get database driver XA properties from file --> <util:properties id="jdbcConfiguration" location="classpath:jdbcconfiguration.properties"/> <!-- XA Pooled DataSource --> <bean id="trdrDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="DokaDataSourceTransaction"/> <property name="xaDataSourceClassName" value="org.apache.derby.jdbc.EmbeddedXADataSource40"/> <property name="xaProperties" ref="jdbcConfiguration"/> <property name="testQuery" value="values(1)"/> </bean> <!-- JMS Configuration --> <!-- Connection Pool --> <bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="ConnectionTransaction"/> <property name="xaConnectionFactory" ref="xaQueueConnectionFactory"/> </bean> <!-- Queue Manager Resources --> <jee:jndi-lookup id="xaQueueConnectionFactory" jndi-name="java:comp/env/jms/XAQueueConnectionFactory" resource-ref="true"/> <!-- Queue Resources --> <jee:jndi-lookup id="inputQueueDestination" jndi-name="java:comp/env/jms/InputQueue" resource-ref="true"/> <jee:jndi-lookup id="outputQueueDestination" jndi-name="java:comp/env/jms/OutputQueue" resource-ref="true"/> </beans>
Be sure every needed property is set. Do not rely on documented default values! Your infrastructure could be different to the one from the documentation you have read. The outgoing message could be auto committed if sessionAcknowledgeMode is not 0. This could lead to memory leaks if the transaction tries to commit the message a second time.
The java file and spring flow are obfuscated from the original files. So there may be typos in there. I will fix them if I am notified.
Atomikos now offers pre-configured Tomcat integration so you are certain that everything is correct for recovery and restart. It is part of our commercial offering. Interested? Start your free trial today:
Free Trial