To add a bit more information this is what code we used to implement all of the above:
The following class deals with the actual solr commit and when it is necessary to issue one.
@RequiredArgsConstructor
@Slf4j
public class SolrCommitCoordinator {
/*
* This only supports one transaction per request. This might in some cases not
* be enough. However there is currently no other possibility known to determine
* whether additional solr commits are needed.
*/
private final NamedThreadLocal<CompletableFuture<Void>> commitCoordination = new NamedThreadLocal<>(
"solr-commit-coordination");
private final SolrClient solrClient;
private final DataServicesCoreProperties dataServicesCoreProperties;
private final AsyncTaskExecutor taskExecutor;
public void triggerSolrCommit() {
Object existingCommitCoordination = commitCoordination.get();
if (existingCommitCoordination == null) {
log.debug("Triggering solr commit.");
CompletableFuture<Void> solrCommit = taskExecutor.submitCompletable(() -> {
return executeSolrCommit();
});
commitCoordination.set(solrCommit);
}
}
private Void executeSolrCommit() throws SolrServerException, IOException {
StopWatch commitDuration = null;
if (log.isDebugEnabled()) {
commitDuration = new StopWatch();
commitDuration.start();
}
try {
solrClient.commit(dataServicesCoreProperties.getSearch().getSolr().getCollection().getName(), false, true,
true);
} finally {
if (commitDuration != null) {
commitDuration.stop();
log.debug("Solr commit took {}ms.", commitDuration.getTotalTime(TimeUnit.MILLISECONDS));
}
}
return null;
}
public void blockUntilSolrCommitFinished() {
CompletableFuture<Void> existingCommitCoordination = commitCoordination.get();
if (existingCommitCoordination != null) {
commitCoordination.remove();
StopWatch blockedDuration = null;
if (log.isDebugEnabled()) {
blockedDuration = new StopWatch();
blockedDuration.start();
}
try {
existingCommitCoordination.join();
} catch (CancellationException | CompletionException e) {
throw new IllegalStateException("An error occured while waiting for the solr commit to finish.", e);
} finally {
if (blockedDuration != null) {
blockedDuration.stop();
log.debug("Waited {}ms for solr commit to finish.",
blockedDuration.getTotalTime(TimeUnit.MILLISECONDS));
}
}
}
}
public void cleanup() {
commitCoordination.remove();
}
}
This class is responsible to actually trigger a commit after a transaction completes. If a transaction completes the triggering may happen many times. That’s why SolrCommitCoordinator takes care not to issue any further commits if one has already been issued.
@RequiredArgsConstructor
public class SolrCommitListener {
private final SolrCommitCoordinator solrCommitCoordinator;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterDocumentCreate(DocumentAfterCreateEvent event) {
solrCommitCoordinator.triggerSolrCommit();
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterDocumentUpdate(DocumentAfterUpdateEvent event) {
solrCommitCoordinator.triggerSolrCommit();
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void afterDocumentDelete(DocumentAfterDeleteEvent event) {
solrCommitCoordinator.triggerSolrCommit();
}
}
And a servlet filter is used to keep the behaviour of clients seeing data in solr that was changed with the previous request.
/**
* Waits for pending solr commits to happen so that the client receives the
* response after the solr commit and is able to find the data in solr.
*/
@RequiredArgsConstructor
public class WaitForSolrCommitFilter extends OncePerRequestFilter {
private final SolrCommitCoordinator solrCommitCoordinator;
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
try {
filterChain.doFilter(request, response);
solrCommitCoordinator.blockUntilSolrCommitFinished();
} finally {
solrCommitCoordinator.cleanup();
}
}
}
The actual spring config used for this is then:
@Configuration
@ConditionalOnProperty(prefix = "mgmtp.a12.dataservices.search.solr.commitPerUpdate",
value = "enabled",
havingValue = "false",
matchIfMissing = false)
@RequiredArgsConstructor
public class SolrCommitAfterJdbcCommitConfiguration {
private final SolrClient solrClient;
private final DataServicesCoreProperties dataServicesCoreProperties;
private final AsyncTaskExecutor taskExecutor;
@Bean
public SolrCommitListener solrCommitListener() {
return new SolrCommitListener(solrCommitCoordinator());
}
@Bean
public SolrCommitCoordinator solrCommitCoordinator() {
return new SolrCommitCoordinator(solrClient, dataServicesCoreProperties, taskExecutor);
}
@Bean
public GenericFilterBean waitForSolrCommitFilter() {
return new WaitForSolrCommitFilter(solrCommitCoordinator());
}
}
Note that the following configuration properties must be set for this to be effective:
mgmtp.a12.dataservices.search.solr.initialization.commitBypass.enabled=true
mgmtp.a12.dataservices.search.solr.commitPerUpdate.enabled=false