Tuesday 18 August 2015

Spring Cloud for Amazon Web Services - Listen to SQS (JAVA Config)

While using spring-cloud-aws project to listen to SQS, I read this 
The Spring Cloud AWS configuration is currently done using custom elements provided by Spring Cloud AWS namespaces. JavaConfig will be supported soon.  
Soon!! huh?!! Well, this is a problem since I did not want to use XML config in my project.
However, the documentation adds 
It is recommended to use the XML messaging namespace to create QueueMessagingTemplate as it will set a more sophisticated MessageConverter that converts objects into JSON when Jackson is on the classpath.
I needed to use custom JSON convertor to convert my message and I was not going to use xml config. So I decided to give it a go and create the java config I need. I went on to read the spring-cloud-aws project code and some of the examples online. Then, borrowed some code from here and there that helped me accomplish my task. Here is the code, hopefully, it will make someone's life a little bit easier.
package com.coffeebeans.config;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.aws.context.annotation.ConditionalOnMissingAmazonClient;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by muhamadto on 8/18/15.
 */

@Configuration
@ComponentScan("com.coffeebeans")
public class SqsConfig {

    @Value("${queue.endpoint}")
    private String queueEndPoint;

    @Value("${aws.region}")
    private String awsRegion;

    @Autowired
    MessageConverter messageConverter;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer messageListenerContainer = simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();
        messageListenerContainer.setMessageHandler(queueMessageHandler());
        return messageListenerContainer;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(){
        SimpleMessageListenerContainerFactory messageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
        messageListenerContainerFactory.setAmazonSqs(amazonSQS());
        messageListenerContainerFactory.setDeleteMessageOnException(false);
        return messageListenerContainerFactory;
    }

    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQS());
        QueueMessageHandler messageHandler = queueMessageHandlerFactory.createQueueMessageHandler();

        List<HandlerMethodArgumentResolver> list = new ArrayList<>();
        HandlerMethodArgumentResolver resolver = new PayloadArgumentResolver(this.messageConverter);
        list.add(resolver);
        messageHandler.setArgumentResolvers(list);
        return messageHandler;
    }


    @Lazy
    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingAmazonClient(AmazonSQS.class)
    public AmazonSQSAsync amazonSQS() {
        AmazonSQSAsyncClient amazonSQSAsyncClient  = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
        amazonSQSAsyncClient.setEndpoint(this.queueEndPoint);
        amazonSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(this.awsRegion)));
        return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient);
    }
}
Now annotate a method that belongs to any class in com.coffeebeans package with @MessageMapping("queue_name") where MessageMapping is org.springframework.messaging.handler.annotation.MessageMapping
For example

package com.coffeebeans.listener;

import com.coffeebeans.message.Message;
import org.springframework.messaging.handler.annotation.MessageMapping;

/**
 * Created by muhamadto on 8/18/15.
 */
public class QueueListener {
    @MessageMapping("queue_name")
    public void onMessage(Message message) throws Exception{
        System.out.println(message);
    }
}

As mentioned in the documentation and quoted above you will need Jackson in your class path.

3 comments:

  1. Really interested in how you have done this. I'm using Spring Boot and want to access both SNS and SQS. Unfortunately, I can't seem to get beyond some very basic problems initializing the environment ... initialization that I had hoped Spring Boot would have done for me. FOr example, I get the following startup error:

    org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.cloud.aws.context.support.io.ResourceLoaderBeanPostProcessor#0': Cannot resolve reference to bean 'amazonS3' while setting constructor argument; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'amazonS3': Invocation of init method failed; nested exception is java.lang.IllegalStateException: There is not EC2 meta data available, because the application is not running in the EC2 environment. Region detection is only possible if the application is running on a EC2 instance

    ReplyDelete
    Replies
    1. I'm not sure the problem in spring boot. I think it's in the way the region is being defined. Did this error happen on your machine or while working on EC2 instance? Is your code trying to auto detect the region?

      The above code is using @Value("${aws.region}") private String awsRegion;
      to get region name from configuration file.

      Delete
  2. Hi!
    I've just encountered this exact problem!!! :O
    "JavaConfig will be supported soon" - I guess that soon is a VERY relative concept...

    I was wondering if you had anything new to share, as a long time has passed since the time of this post.

    ReplyDelete