How to improve overall dataservices performance because of poor hazelcast caching?

ANALYSIS:
Debugging into this yielded that hazelcast stores cached values in a format that makes use of java object serialization via java.io.Serializable. Every time a model is requested for a document it is deserialised from its java serialized object form. For one of our models this serialized form takes up 18kb (mainly because the model has some enumerations defined in it).

Debugging also showed that there are many, really really many cache requests for a given model. Specifcally the rpc operation only loads and saves a copy of one document of a specific model. But already that model is requested dozens of times from the cache. So hazelcast is spending all time in java object deserialization. There are no cache misses reported during the measurements due to prewarming.

SOLUTION:
In order to work around this we implemented a cache decoration for hazelcast. During a transaction the decoration requests the actually cached instance from hazelcast once and then reuses the instance for the remainder of the transaction.

Implementation is as follows:

The actual cache implementation:

import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

import org.springframework.cache.Cache;
import org.springframework.cache.support.SimpleValueWrapper;
import org.springframework.core.NamedThreadLocal;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TransactionScopedCache implements Cache {

	private final Cache delegate;
	private final ThreadLocal<Map<Object, ValueWrapper>> transactionLocalCaches;

	public TransactionScopedCache(Cache delegate) {
		this.delegate = delegate;
		this.transactionLocalCaches = NamedThreadLocal.withInitial("transaction-scoped-cache " + delegate.getName(),
				this::createInitialValue);
	}

	private Map<Object, ValueWrapper> createInitialValue() {
		if (log.isTraceEnabled()) {
			log.trace("starting transaction scoped caching of {}.", TransactionScopedCache.this.getName());
		}

		if (!TransactionSynchronizationManager.isActualTransactionActive()) {
			throw new IllegalStateException(
					MessageFormat.format("Can only use transaction scoped cache {0} within a transaction.",
							TransactionScopedCache.this.getName()));
		}

		TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
			public void afterCompletion(int status) {
				if (log.isTraceEnabled()) {
					log.trace("removing transaction scoped cache entries of {}.",
							TransactionScopedCache.this.getName());
				}
				transactionLocalCaches.remove();
			};
		});

		return new HashMap<>();
	}

	@Override
	public String getName() {
		return delegate.getName();
	}

	@Override
	public Object getNativeCache() {
		return delegate.getNativeCache();
	}

	@Override
	public ValueWrapper get(Object key) {
		if (TransactionSynchronizationManager.isActualTransactionActive()) {
			Map<Object, ValueWrapper> transactionLocalCache = transactionLocalCaches.get();
			ValueWrapper value = transactionLocalCache.get(key);
			if (value != null) {
				return value;
			}
			ValueWrapper valueWrapper = delegate.get(key);
			transactionLocalCache.put(key, valueWrapper);
			return valueWrapper;
		} else {
			return delegate.get(key);
		}
	}

	@Override
	public <T> T get(Object key, Class<T> type) {
		if (TransactionSynchronizationManager.isActualTransactionActive()) {
			Map<Object, ValueWrapper> transactionLocalCache = transactionLocalCaches.get();
			ValueWrapper wrappedValue = transactionLocalCache.get(key);
			if (wrappedValue != null) {
				return type.cast(wrappedValue.get());
			}
			T value = delegate.get(key, type);
			transactionLocalCache.put(key, new SimpleValueWrapper(value));
			return value;
		} else {
			return delegate.get(key, type);
		}
	}

	@SuppressWarnings("unchecked")
	@Override
	public <T> T get(Object key, Callable<T> valueLoader) {
		if (TransactionSynchronizationManager.isActualTransactionActive()) {
			Map<Object, ValueWrapper> transactionLocalCache = transactionLocalCaches.get();
			ValueWrapper wrapped = transactionLocalCache.get(key);
			if (wrapped != null) {
				return (T) wrapped.get();
			}
			T value = delegate.get(key, valueLoader);
			transactionLocalCache.put(key, new SimpleValueWrapper(value));
			return value;
		} else {
			return delegate.get(key, valueLoader);
		}
	}

	@Override
	public void put(Object key, Object value) {
		evictTransactionLocalKey(key);
		delegate.put(key, value);
	}

	@Override
	public void evict(Object key) {
		evictTransactionLocalKey(key);
		delegate.evict(key);
	}

	private void evictTransactionLocalKey(Object key) {
		if (TransactionSynchronizationManager.isActualTransactionActive()) {
			transactionLocalCaches.get().remove(key);
		}
	}

	@Override
	public void clear() {
		transactionLocalCaches.remove();
		delegate.clear();
	}
}

The cache manager:

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class TransactionScopedCacheManager implements CacheManager {

	private final CacheManager delegate;
	private final Map<String, Cache> cacheNameToDecoratedCache = new ConcurrentHashMap<>();

	@Override
	public Cache getCache(String name) {
		return cacheNameToDecoratedCache.computeIfAbsent(name, cacheName -> {
			Cache delegateCache = delegate.getCache(cacheName);
			return new TransactionScopedCache(delegateCache);
		});
	}

	@Override
	public Collection<String> getCacheNames() {
		return delegate.getCacheNames();
	}
}

The spring config to enable all of this:

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;

import lombok.extern.slf4j.Slf4j;

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Slf4j
public class TransactionScopedCacheConfiguration {

	@Bean
	public BeanPostProcessor cacheManagerDecorationPostProcessor() {
		return new BeanPostProcessor() {
			@Override
			public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
				if (bean instanceof CacheManager delegate) {
					log.info("decorating cacheManager {} with transaction scope", delegate);
					return new TransactionScopedCacheManager(delegate);
				}
				return bean;
			}
		};
	}
}

A BeanPostProcessor is used to decorate the hazelcast cache manager in order to not need to modify default dataservices configuration which enables hazelcast.

CAVEATS:

  • Since the cache decoration is only effective during an ongoing transaction there won’t be any benefits when working with documents and needing to request their respecitve models outside of a transaction.
  • The implementation alters dataservices behaviour. Hazelcast cache synchronization propagating changed cache values in the middle of a transaction won’t be noticed anymore since hazelcast cache is queried only once in a transaction - this is basically repeatable cache reads for transactions. But this resembles more of the behaviour one would get with hazelcast caching being disabled because the jpa entity manager would for example also cache once loaded entities like document models in the jpa session and not necessarily notice that the underlying entity was changed by another transaction. This is probably a good thing for developers used to work with jpa.

RESULTS:
These changes brought request processing time from previously 3.0 - 3.3s down to 1.7 - 2.0s - basically all of the 31% that was originally spend for hazelcast cache lookups.

BONUS:
This change seems to have a very positive overall effect on dataservices performance. For example SearchIndexLoader performance during dataservices startup is at least 2x as fast.