TimeoutDispatcherMixin.java

/***************************************************************************
   Copyright 2013 Emily Estes

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
***************************************************************************/
package net.metanotion.util;


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This dispatcher attempts to implement a "time out" aspect, such that attempting to dispatch a
message against a receiver that takes "too long" may be timed out. This class provides this functionality via
usage of the {@link java.util.concurrent.Future} returned by the {@link java.util.concurrent.ExecutorService}
provided to this class. As such, the ability to successfully timeout tasks is limited by the capabilities of the
underlying executor service/future implementation. So within these bounds, this class should allow users to
limit execution time of message dispatched by the underlying dispatcher.
	@param <I> The type of reciever that this dispatcher decodes message for.
	@param <D> The type of the data that this dispatcher decodes into a message.
*/
public final class TimeoutDispatcherMixin<I,D> implements Dispatcher<I,D> {
	private static final Logger logger = LoggerFactory.getLogger(TimeoutDispatcherMixin.class);

	private final Dispatcher<I,D> child;
	private final ExecutorService es;
	private final TimeUnit unit;
	private final long timeout;

	/** Create a Dispatcher instance that limits the execution of messages created by the child dispatcher.
		@param es The ExecutorService instance in which the tasks will be asynchronously executed. However,
			the messages produced by this dispatcher will block on the completion of the forcing.
		@param unit The unit of time for the timeout.
		@param timeout How long to wait before interrupting the message evaluation in the units specificed.
		@param child The dispatcher to add the timeout aspect to.
	*/
	public TimeoutDispatcherMixin(final ExecutorService es,
			final TimeUnit unit,
			final long timeout,
			final Dispatcher<I,D> child) {
		this.child = child;
		this.es = es;
		this.unit = unit;
		this.timeout = timeout;
	}

	private final class Msg<I> implements Message<I> {
		private final Message<I> message;
		public Msg(final Message<I> child) { this.message = child; }

		@Override public Class<I> receiverType() { return message.receiverType(); }
		@Override public Object call(final I i) {
			final Future result = es.submit(new Callable() {
				@Override public Object call() throws Exception {
					return message.call(i);
				}
			});
			try {
				return result.get(timeout, unit);
			} catch (TimeoutException ex) {
			} catch (InterruptedException ie) {
			} catch (ExecutionException ee) {
				result.cancel(true);
				throw new RuntimeException(ee.getCause());
			}
			result.cancel(true);
			throw new RuntimeException("Timeout on dispatch on message to " + message.receiverType());
		}
	}

	@Override public Message<I> dispatch(final D data) {
		return new Msg<I>(child.dispatch(data));
	}
}