Tuesday 18 August 2015

Spring Cloud for Amazon Web Services - Listen to SQS (JAVA Config)

While using spring-cloud-aws project to listen to SQS, I read this 
The Spring Cloud AWS configuration is currently done using custom elements provided by Spring Cloud AWS namespaces. JavaConfig will be supported soon.  
Soon!! huh?!! Well, this is a problem since I did not want to use XML config in my project.
However, the documentation adds 
It is recommended to use the XML messaging namespace to create QueueMessagingTemplate as it will set a more sophisticated MessageConverter that converts objects into JSON when Jackson is on the classpath.
I needed to use custom JSON convertor to convert my message and I was not going to use xml config. So I decided to give it a go and create the java config I need. I went on to read the spring-cloud-aws project code and some of the examples online. Then, borrowed some code from here and there that helped me accomplish my task. Here is the code, hopefully, it will make someone's life a little bit easier.
package com.coffeebeans.config;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.aws.context.annotation.ConditionalOnMissingAmazonClient;
import org.springframework.cloud.aws.messaging.config.QueueMessageHandlerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by muhamadto on 8/18/15.
 */

@Configuration
@ComponentScan("com.coffeebeans")
public class SqsConfig {

    @Value("${queue.endpoint}")
    private String queueEndPoint;

    @Value("${aws.region}")
    private String awsRegion;

    @Autowired
    MessageConverter messageConverter;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer messageListenerContainer = simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();
        messageListenerContainer.setMessageHandler(queueMessageHandler());
        return messageListenerContainer;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(){
        SimpleMessageListenerContainerFactory messageListenerContainerFactory = new SimpleMessageListenerContainerFactory();
        messageListenerContainerFactory.setAmazonSqs(amazonSQS());
        messageListenerContainerFactory.setDeleteMessageOnException(false);
        return messageListenerContainerFactory;
    }

    @Bean
    public QueueMessageHandler queueMessageHandler() {
        QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
        queueMessageHandlerFactory.setAmazonSqs(amazonSQS());
        QueueMessageHandler messageHandler = queueMessageHandlerFactory.createQueueMessageHandler();

        List<HandlerMethodArgumentResolver> list = new ArrayList<>();
        HandlerMethodArgumentResolver resolver = new PayloadArgumentResolver(this.messageConverter);
        list.add(resolver);
        messageHandler.setArgumentResolvers(list);
        return messageHandler;
    }


    @Lazy
    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingAmazonClient(AmazonSQS.class)
    public AmazonSQSAsync amazonSQS() {
        AmazonSQSAsyncClient amazonSQSAsyncClient  = new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain());
        amazonSQSAsyncClient.setEndpoint(this.queueEndPoint);
        amazonSQSAsyncClient.setRegion(Region.getRegion(Regions.fromName(this.awsRegion)));
        return new AmazonSQSBufferedAsyncClient(amazonSQSAsyncClient);
    }
}
Now annotate a method that belongs to any class in com.coffeebeans package with @MessageMapping("queue_name") where MessageMapping is org.springframework.messaging.handler.annotation.MessageMapping
For example

package com.coffeebeans.listener;

import com.coffeebeans.message.Message;
import org.springframework.messaging.handler.annotation.MessageMapping;

/**
 * Created by muhamadto on 8/18/15.
 */
public class QueueListener {
    @MessageMapping("queue_name")
    public void onMessage(Message message) throws Exception{
        System.out.println(message);
    }
}

As mentioned in the documentation and quoted above you will need Jackson in your class path.

Sunday 9 August 2015

PART 4: Spring MVC web-services (Spring Caching Abstraction)

This post explores how Spring framework supports caching. Like many features in Spring, caching is just abstraction. This allows applications to choose between the different caching implementation available, e.g. EHcache, Redis, Guava cache, ... etc. In Spring 4.1, the cache abstraction added support of JSR-107 annotations.


Dependancy

Add the following snippet to services/pom.xml
    

    org.springframework.data
    spring-data-redis
    1.4.3.RELEASE


    redis.clients
    jedis
    2.4.2
    jar
    compile

For the caching abstraction to work with an application developers need to do two things:
  • Configure cache implementation. In this project we will use Redis implementation.
  • Declare caching, that is which methods to be cached.


Configurations

Caching configuration in this project is being done on two steps, configuration file per technology (Redis, etc.) and a general configuration file.


Configure the Redis caching Manager

Add the following configuration class, com.coffeebeans.services.config.RedisCachingConfig
package com.coffeebeans.services.config;

import com.coffeebeans.utilities.generic.Constants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.*;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * Created by muhamadto on 26/07/2015.
 */
@Configuration
public class RedisCachingConfig {
    @Autowired
    Environment env;

    @Bean
    JedisConnectionFactory jedisConnectionFactory() {
        JedisConnectionFactory factory = new JedisConnectionFactory();
        factory.setHostName(System.getProperty(Constants.REDIS_HOST_NAME, Constants.LOCALHOST));
        factory.setPort(Integer.parseInt(System.getProperty(Constants.REDIS_PORT)));
        factory.setUsePool(true);
        return factory;
    }

    @Bean
    RedisTemplate<Object, Object> redisTemplate() {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(jedisConnectionFactory());
        return redisTemplate;
    }

    @Bean
    CacheManager redisCacheManager() {
        RedisCacheManager redisCacheManager = new RedisCacheManager(redisTemplate());
        redisCacheManager.setUsePrefix(true);
        redisCacheManager.setDefaultExpiration(Constants.REDIS_DEFAULT_EXPIRY_TIME);
        return redisCacheManager;
    }
}

Notice: You will need to add JVM variables REDIS_HOST and REDIS_PORT which referred to by Constants.REDIS_HOST_NAME and Constants.REDIS_PORT of the new com.coffeebeans.utilities.generic.Constants class of Utilities module.
package com.coffeebeans.utilities.generic;

/**
 * Created by muhamadto on 2/08/2015.
 */
public class Constants {
    public static final String REDIS_HOST_NAME="REDIS_HOST";
    public static final String REDIS_PORT ="REDIS_PORT";
    public static final long REDIS_DEFAULT_EXPIRY_TIME=300;

    public static final String COFFEEBEANS_REPOSITORY_PACKAGE="repository.package";
    public static final String COFFEEBEANS_MODEL_PACKAGE="model.package";

    public static final String JDBC_DRIVER_CLASS="jdbc.driverClass";
    public static final String DB_URL="DB_URL";
    public static final String DB_USERNAME="DB_USERNAME";
    public static final String DB_PASSWORD="DB_PASSWORD";

    public static final String BONECP_IDLE_CONNECTION_TEST_PERIOD_IN_MINUTES="bonecp.idleConnectionTestPeriodInMinutes";
    public static final String BONECP_IDLE_MAX_AGE_IN_MINUTES="bonecp.idleMaxAgeInMinutes";
    public static final String BONECP_MAX_CONNECTIONS_PER_PARTITION="bonecp.maxConnectionsPerPartition";
    public static final String BONECP_MIN_CONNECTIONS_PER_PARTITION="bonecp.minConnectionsPerPartition";
    public static final String BONECP_PARTITION_COUNT="bonecp.partitionCount";
    public static final String BONECP_ACQUIRE_INCREMENT="bonecp.acquireIncrement";
    public static final String BONECP_STATEMENTS_CACHE_SIZE="bonecp.statementsCacheSize";

    public static final String HIBERNATE_CACHE_USE_SECOND_LEVEL_CACHE="hibernate.cache.use_second_level_cache";
    public static final String HIBERNATE_CACHE_REGION_FACTORY_CLASS="hibernate.cache.region.factory_class";
    public static final String HIBERNATE_CACHE_USE_QUERY_CACHE="hibernate.cache.use_query_cache";
    public static final String HIBERNATE_GENERATE_STATISTICS="hibernate.generate_statistics";
    public static final String HIBERNATE_DIALECT="hibernate.dialect";

    public static final String LOCALHOST="localhost";
}


Abstract caching manager

Add the following class, com.coffeebeans.services.config.CachingConfig
package com.coffeebeans.services.config;

import com.coffeebeans.utilities.env.Environment;
import com.coffeebeans.utilities.env.EnvironmentEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.CacheErrorHandler;
import org.springframework.cache.interceptor.CacheResolver;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.cache.interceptor.SimpleKeyGenerator;
import org.springframework.cache.support.CompositeCacheManager;
import org.springframework.cache.support.NoOpCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by muhamadto on 2/08/2015.
 */
@Configuration
@EnableCaching
@Import({RedisCachingConfig.class})
@ComponentScan(basePackages = {"com.coffeebeans.services.service"})
public class CachingConfig implements CachingConfigurer {

    @Qualifier("redisCacheManager")
    @Autowired
    private CacheManager redisCacheManager;


    @Bean
    @Override
    public CacheManager cacheManager() {
        List<Cachemanager> cacheManagers = new ArrayList<>();

        if(EnvironmentEnum.LOCAL == Environment.resoveEnv()){
            cacheManagers.add(new NoOpCacheManager());
        } else {
            cacheManagers.add(this.redisCacheManager);
        }

        CompositeCacheManager cacheManager = new CompositeCacheManager();
        cacheManager.setCacheManagers(cacheManagers);
        cacheManager.setFallbackToNoOpCache(true);
        return cacheManager;
    }

    @Bean
    @Override
    public KeyGenerator keyGenerator() {
        return new SimpleKeyGenerator();
    }

    @Override
    public CacheResolver cacheResolver() {
        return null;
    }

    @Override
    public CacheErrorHandler errorHandler() {
        return null;
    }
}
This last class provide us to disable caching for local machines using org.springframework.cache.support.NoOpCacheManager.

Now Import the Caching Config to AppConfig
package com.coffeebeans.services.config;

import com.coffeebeans.persistence.config.PersistenceConfig;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
 * Created by muhamadto on 26/07/2015.
 */

@Configuration
@Import({PersistenceConfig.class, ServicesConfig.class, CachingConfig.class})
public class AppConfig {
}

You might realised the usage of Environment.resoveEnv in CachingConfig. This method lives in class  com.coffeebeans.utilities.env.Environment in the newly created Utilities module.
package com.coffeebeans.utilities.env;

/**
 * Created by muhamadto on 2/08/2015.
 */
public class Environment {

    public static EnvironmentEnum resoveEnv() throws IllegalArgumentException, NullPointerException{
        return EnvironmentEnum.valueOf(System.getProperty("env").toUpperCase());

    }
}
And it uses the following enum which represents the three environments possible for this project. 
package com.coffeebeans.utilities.env;

/**
 * Created by muhamadto on 2/08/2015.
 */
public enum EnvironmentEnum {
    LOCAL("LOCAL"),
    STAGING("STAGING"),
    PROD("STAGING");

    private String env;

    EnvironmentEnum(String env){
        this.env = env;
    }

    public String toString() {
        return env;
    }
}
Now the configuration is done, let's declare a method as cachable and inspect the resulting behaviour.
  

Declare cachable Methods

In com.coffeebeans.services.service.user.UserServiceImpl, add @Cacheable(value = "users") before getUserByUsername(). So, it looks like the following:
@Override
@Cacheable(value = "users")
public UserResponse getUserByUsername(String username) {
    UserResponse userResponse;
    User user = this.userRepository.findByUsername(username);
    if(user != null){
        userResponse = UserResponse.convert(user);
    } else{
        LOGGER.error("Could not find a user for search criteria [{}]", username);
        throw new NotFoundException(String.format("Could not find a user for search criteria [username = %s]", username));
    }
    LOGGER.debug("User has been found [{}].", user);
    return userResponse;
} 


TESTING

Testing Locally

Use curl command to retrieve a use 
  • Request
  • curl  http://localhost:8080/services/1/user/mh
  • Response
  • {
       {  
       "location":"http://localhost:8080/services/1/user/mh",
       "enabled":false,
       "verified":false,
       "username":"mh",
       "email":"mh@example.com"
    }
However, every time you make the request the method will be executed, you can verify that from the log, as the following message will be printed once per request.
[SRV] 19:23:53.613 DEBUG com.coffeebeans.services.service.user.UserServiceImpl - User has been found [User(username=mh, email=mh@example.com)].

Testing On Staging

Making the same request in staging environment - given Redis server is working - will result in single execution for that method. All subsequent requests will retrieve the data from the cache. You can verify that by making sure that no message is being print in the log as a result of the request.


WHAT IS NEXT

Since in this post we started to make use for environments different form local machine. Next step in this project will be setting up a foundation for supporting multiple environment.