Chapter 1. Inbound communcations with JCA

1. Introduction

//TODO

2. Configuring the inbound communications

2.1. Resources

Before configuring the JCA container, we need first to configure the resource adapter and some resources needed by inbound communications.

The resource adapter must be configured as bean in the context of Spring as described below:

<beans>
  <bean id="activeMQResourceAdapter" class="org.activemq.ra.ActiveMQResourceAdapter">
    <property name="config" value="/broker.xml"/>
  </bean>

  <bean id="joramResourceAdapter" class="org.objectweb.joram.client.connector.JoramAdapter">
    <property name="serverId" value="0"/>
    <property name="serverName" value="localhost"/>
    <property name="serverPort" value="16010"/>
  </bean>
  (...)
</beans>

In order to initialize the JCA resources, an instance of the interface BootstrapContext must be configured as described below with the support of Jencks:

<beans>
  (...)
  <bean id="jencksBootstrapContext" class="org.jencks.factory.BootstrapContextFactoryBean">
    <property name="threadPoolSize" value="15"/>
  </bean>
  (...)
</beans>

2.2. JCA container

The JCA container can be now configured with the resources previously configured:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="activeMQResourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="jencksBootstrapContext"/>
</beans>

Moreover, the support providers an abstraction in order to specify how to create the activation spec for a JMS provider. The abstraction corresponds to the ActivationSpecCreator interface:

public interface ActivationSpecCreator {
  ActivationSpec createActivationSpec(
           MessageEndpointDescriptor descriptor) throws Exception;
}

The support provides two build-in implementations for ActiveMQ and Joram, respectively ActiveMQActivationSpecCreator and JoramActivationSpecCreator. The following code shows how to configure the feature:

<beans>
  (...)
  <bean id="activeMQActivationSpecCreator"
        class="org.springframework.jca.inbound.endpoint.activemq.ActiveMQActivationSpecCreator"/>

  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    (...)
    <property name="activationSpecCreator" ref="activeMQActivationSpecCreator"/>
    (...)
</beans>

We will see then more advanced configured of this container and the ways to configure message listeners and how to manage transactions.

2.3. Autodetection

The support offers the possibility to configure automatically different.

Firstly, you can use the autodetectProvider in order to configure automatically the MessageEndpointsConfigurer class for a JMS provider. At the moment, only the configuration of the creation of the ActivationSpec for the provider detected is supported. In this case, you don't need anymore to set the property activationSpecCreator.

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="autodetectProvider" value="true"/>
    (...)
  </bean>
</beans>

Endly, the autodetect property must be used with an implementation of the MessageEndpointInfoAssembler interface which must support autodetection.

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="autodetect" value="true"/>
    (...)
  </bean>
</beans>

3. Configuration of JMS listeners

The JCA support for inbound communications provides several strategies in order to configure JMS listeners on the JCA container. You can both configure it explicitly or leave the container detect the metadatas of the JMS listeners available in the context of Spring.

3.1. in the configuration of Spring

Once the configuration of the JCA container done, you can specify the destinations to listen. You can define their names and theirs types. The types javax.jms.Queue and javax.jms.Topic are supported.

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="endpoints">
      <map>
        <entry key="javax.jms.Queue:test.queue"><ref local="myMessageListener"/></entry>
      </map>
    </property>
  </bean>

  <bean id="myMessageListener"
        class="org.springframework.jca.inbound.SimpleMessageListener"/>
</beans>

3.2. with meta datas

As you have seen before, the support offers the possibilities to autodetect the JMS listeners available in an application context of Spring. The JCA container uses the metadatas provided by each listener in order to configure them. The support defines the interface AutodetectCapableMessageEndpointInfoAssembler which indicates the beans for these. The autodectection listeners must be activate in order to allow the support to find the JMS message listeners with metadatas.

The support provides four implementations of this interface for different scenarios:

Table 1.1. Different implementations of the interface AutodetectCapableMessageEndpointInfoAssembler

ImplementationUsage
BeanNameMessageEndpointAssemblernot set
InterfaceMessageEndpointAssemblernot set
AnnotationMessageEndpointAssemblernot set
EJB3MessageEndpointAssemblernot set

The first implementation, the class BeanNameMessageEndpointAssembler, uses the names of message listeners in the Spring context in order to configure the destinations and their types. The following code shows how to configure this strategy:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="autodetect" value="true"/>
    <property name="assembler">
        <bean class="org.springframework.jca.inbound.assembler.BeanNameMessageEndpointAssembler"/>
    </property>
  </bean>

  <bean id="javax.jms.Queue:test.queue"
        class="org.springframework.jca.inbound.SimpleMessageListener"/>
</beans>

With the second implementation, the class InterfaceMessageEndpointAssembler, the message listeners needs to implement the MessageEndpointDescriptorProvider interface. This interface provides the getDescriptor method which gets the metadatas for the listener. So this approach is intrusive because the listener is aware of Spring. The following code shows how to configure this strategy:

public class InterfaceMessageListener
          implements MessageListener,MessageEndpointDescriptorProvider {

  public void onMessage(Message message) { (...) }

  public MessageEndpointDescriptor getDescriptor() {
    MessageEndpointDescriptor descriptor=new MessageEndpointDescriptor();
    descriptor.setDestinationName("test.queue1");
    descriptor.setDestinationType("javax.jms.Queue");
    return descriptor;
  }
}

and its configuration:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="autodetect" value="true"/>
    <property name="assembler">
      <bean class="org.springframework.jca.inbound.assembler.InterfaceMessageEndpointAssembler"/>
    </property>
  </bean>
</beans>

The third implementation, the class AnnotationMessageEndpointAssembler, the message specifies its metadatas with an annotation and its properties. So this approach is also intrusive because the listener is aware of Spring. The following code shows how to configure this strategy:

import org.springframework.jca.inbound.assembler.MessageEndpoint;

@MessageEndpoint(destinationName="test.queue2",destinationType="javax.jms.Queue")
public class AnnotedMessageListener extends CallDetectorMessageListener {
  public void onMessage(Message message) { (...) }
}

and its configuration:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="autodetect" value="true"/>
    <property name="assembler">
      <bean class="org.springframework.jca.inbound.assembler.AnnotationMessageEndpointAssembler"/>
    </property>
  </bean>
</beans>

In the next section, we will describe the use of the last implementation which allows to configure EJB3 message driven beans.

3.3. use Message Driven Beans

The version 3.0 of the EJB specification allows to use annotations in order to configure Message Driven Bean. The support of inbound communications of Spring is able to write these informations in order to register a JMS message listener for a destination.

Here is a sample of MDB based on annotations:

@MessageDriven(activateConfig =
{
  @ActivationConfigProperty(propertyName="destinationType",
    propertyValue="javax.jms.Queue"),
  @ActivationConfigProperty(propertyName="destination",
    propertyValue="test.queue")
})
public class EJB3MessageListener implements MessageListener {
  public void onMessage(Message message) { (...) }
}

The support offers a dedicated implementation of the AutodetectCapableMessageEndpointInfoAssembler class in order to use the informations of the MessageDriven annotation. Here is the configuration of this approach:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="autodetect" value="true"/>
    <property name="assembler">
      <bean class="org.springframework.jca.inbound.assembler.EJB3MessageEndpointAssembler"/>
    </property>
  </bean>
</beans>

4. Interceptions

The version 1.5 of the JCA specification offers two possibility to intercept during the delivery of a message as described in the section 12.5.6 and with the figures 12.7 and 12.8. With the option A, the JCA container have the responsability to control the execution of the message delivery using a proxy in front of the message endpoint. On the other hand, with the option B, the resource adapter can do some treatments thanks to the beforeDelivery / afterDelivery / release methods on the MessageEndpoint interface.

These two features are very useful in order to manage transactions, either with the transaction support of Spring or in a custom way.

4.1. AOP interception

This kind of interception corresponds to the option A. In this case, the JCA container puts a proxy in front of the message endpoint in order to make some interceptions based on the AOP paradigm. Note that these interceptions only occur on the method of the MessageEndpoint which delivers the message.

In order to configure the strategy, you can use the interceptors property of the MessageEndpointsConfigurer class as described below.

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="interceptors">
      <list>
        <ref local="myInterceptor"/>
      </list>
    </property>
  </bean>
</beans>

Note that, if no interceptor is configured, the JCA container puts no proxy in front of the message endpoint.

4.2. Message endpoint interception

This kind of interception corresponds to the option B but it is not supported by all the connectors. As a matter of fact, they are responsible to call the methods beforeDelivery, afterDelivery, realease during their treatments according to the JCA specification. The internal endpoint used by the conteneur delagates these calls to the configured interceptors.

public interface MessageEndpointInterceptor {
  void onBeforeDelivery(Method method,XAResource xaResource,
                                             MessageEndpointContext context);
  void onAfterDelivery(XAResource xaResource,MessageEndpointContext context);
  void onRelease(XAResource xaResource,MessageEndpointContext context);
}

In order to configure the strategy, you can use the messageEndpointInterceptors property of the MessageEndpointsConfigurer class as described below.

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="messageEndpointInterceptors">
      <list>
        <ref local="myMessageEndpointInterceptor"/>
      </list>
    </property>
  </bean>
</beans>

Here is the code of the configured interceptor:

public class MyMessageEndpointInterceptor implements MessageEndpointInterceptor {
  public void onBeforeDelivery(Method method,XAResource xaResource,
                                             MessageEndpointContext context) {
    System.out.println("onBeforeDelivery called");
  }

  public void onAfterDelivery(XAResource xaResource,MessageEndpointContext context) {
    System.out.println("onAfterDelivery called");
  }

  public void onRelease(XAResource xaResource,MessageEndpointContext context) {
    System.out.println("onRelease called");
  }
}

5. Transactions

The support of inbound communications of Spring supports both local and global transactions based. It is based on the support of transactions in Spring using the PlatformTransactionManager interface.

5.1. Local transactions

In order to configure this strategy, you can either use the transactional interceptor of Spring in front of the message endpoint or the message listener. The following code shows the configuration in front of the message listener:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="endpoints">
      <map>
        <entry key="javax.jms.Queue:test.queue"><ref local="myMessageListener"/></entry>
      </map>
    </property>
  </bean>

  <bean id="transactionManager" class="...">
    (...)
  </bean>

  <bean id="myMessageListener"
        class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
    <property name="transactionManager" ref="transactionManager"/>
    <property name="target" ref="myMessageListenerTarget"/>
    <property name="transactionAttributes">
        <props>
            <prop key="onMessage">PROPAGATION_REQUIRED</prop>
        </props>
    </property>
  </bean>

  <bean id="myMessageListenerTarget"
        class="org.springframework.jca.inbound.SimpleMessageListener"/>
</beans>

Note that, if you use the autodectection, this strategy can not be used. To proxy the message listener, it is mandatory to use the auto proxy creator support.

The following code shows the configuration in front of the message endpoint:

<beans>
  (...)
  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="endpoints">
      <map>
        <entry key="javax.jms.Queue:test.queue"><ref local="myMessageListener"/></entry>
      </map>
    </property>
    <property name="interceptors">
      <list>
        <ref local="transactionInterceptor"/>
      </list>
    </property>
  </bean>

  <bean id="transactionManager" class="...">
    (...)
  </bean>

  <bean id="transactionInterceptor"
        class="org.springframework.transaction.interceptor.TransactionInterceptor">
    <property name="transactionManager" ref="transactionManager"/>
    <property name="transactionAttributes">
        <props>
            <prop key="onMessage">PROPAGATION_REQUIRED</prop>
        </props>
    </property>
  </bean>

  <bean id="myMessageListener"
        class="org.springframework.jca.inbound.SimpleMessageListener"/>
</beans>

5.2. Global transactions

The message listener configurer must be configured with the transaction manager used in order to be able to enlist and delist the xa resource associated with the JMS session. In this case, the resource adapter gives an instance of the xa resource. The following code shows how to configure this:

<beans>
  (...)

  <bean id="transactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="..."/>
    <property name="userTransaction" ref="..."/>
  </bean>

  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    (...)
    <property name="transactionManager" ref="transactionManager"/>
    (...)
</beans>

Once the bean configured, you can set a transaction interceptor of Spring dedicated to JTA in order to demark the transaction. The interceptor must be configured in front of the endpoint and not in front of the message listener. Otherwise, the message will not be included in the JTA transaction. The following code shows how to configure this:

<beans>
  (...)
  <bean id="jtaTransactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="transactionManager"/>
    <property name="userTransaction" ref="userTransaction"/>
  </bean>

  <bean id="transactionInterceptor"
        class="org.springframework.transaction.interceptor.TransactionInterceptor">
    <property name="transactionManager" ref="jtaTransactionManager"/>
    <property name="transactionAttributes">
        <props>
            <prop key="onMessage">PROPAGATION_REQUIRED</prop>
        </props>
    </property>
  </bean>

  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="transactionManager" ref="transactionManager"/>
    <property name="endpoints">
      <map>
        <entry key="javax.jms.Queue:test.queue"><ref local="myMessageListener"/></entry>
      </map>
    </property>
    <property name="interceptors">
      <list>
        <ref local="transactionInterceptor"/>
      </list>
    </property>
  </bean>

  <bean id="myMessageListener"
        class="org.springframework.jca.inbound.SimpleMessageListener"/>
</beans>

You can see that you have two places where specify the JTA transaction manager. If you set the checkTransactionManager to true, the support allow you to specify either the transactionManager or the JtaTransactionManager. It populates the other property automatically. The following code shows the configuration of this mechanism:

<beans>
  (...)
  <bean id="transactionManager"
        class="TransactionManagerImpl">
    (...)
  </bean>

  <bean id="configurer" class="org.springframework.jca.inbound.MessageEndpointsConfigurer">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecCreator" ref="activationSpecCreator"/>
    <property name="bootstrapContext" ref="bootstrapContext"/>
    <property name="checkTransactionManager" value="true"/>
    <property name="transactionManager" ref="transactionManager"/>
    <property name="endpoints">
      <map>
        <entry key="javax.jms.Queue:test.queue"><ref local="myMessageListener"/></entry>
      </map>
    </property>
  </bean>

  <bean id="myMessageListener"
        class="org.springframework.jca.inbound.SimpleMessageListener"/>
</beans>