MailQueue.java

/***************************************************************************
   Copyright 2012 Emily Estes

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
***************************************************************************/
package net.metanotion.emailqueue;


import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.metanotion.functor.Block;
import net.metanotion.util.JDBCTransaction;

/** This class polls a database table containing email messages queued to be sent. When it wakes up, if it detects no
changes in the table, it goes back to sleep for a fixed amount of time. If there are messages to be sent(with less than
the configurable retry limit) it connects via the MailSender implementation provided and sends up to the "maxBurstRate"
number of messages at once. If messages fail to be accepted by the SMTP server it increases the retry count on the
message.
	@param <C> The connection type for the mail sender interface.
*/
public final class MailQueue<C extends AutoCloseable> implements Runnable {
	private static final Logger logger = LoggerFactory.getLogger(MailQueue.class);
	/** SQL queries for manipulating the email queue. */
	private static final Queries q = new Queries();

	/** The data source for normal database operations. */
	private final DataSource ds;
	/** The executor service to queue background tasks in. */
	private final ScheduledExecutorService es;
	/** The MailSender implementation to use for sending emails. */
	private final MailSender<C> mailSender;
	/** Maximum number of emails to retrieve from the database in one query. */
	private final int burstRate;
	/** the maximum number of attempts to send a message. */
	private final int maxRetries;
	/** the amount of time to sleep (in milliseonds) between polling the database message queue. */
	private final int timeoutInMS;

	/** Create an instance of a MailQueue service to poll the database and send queued emails.
		@param ds A data source to use for regular database operations.
		@param es The executor service to run our background tasks.
		@param mSender The MailSender implementation to use for sending emails.
		@param burstRate Maximum number of emails to retrieve from the database in one query.
		@param maxRetries the maximum number of attempts to send a message.
		@param timeoutInMS the amount of time to sleep (in milliseonds) between polling the database message queue.
	*/
	public MailQueue(final DataSource ds,
							final ScheduledExecutorService es,
							final MailSender<C> mSender,
							final int burstRate,
							final int maxRetries,
							final int timeoutInMS) {
		this.ds = ds;
		this.es = es;
		this.mailSender = mSender;
		this.burstRate = burstRate;
		this.maxRetries = maxRetries;
		this.timeoutInMS = timeoutInMS;
	}

	/** This is the mail sending thread process. When it is executed, it processes the mail queue until there are no
	more pending messages, then it goes to sleep for a while by rescheduling itself with the provided thread pool. */
	@Override public void run() {
		logger.debug("Processing Mail Queue");
		try {
			while(processFrontOfQueue(mailSender, burstRate) > 0) { }
		} catch (final Exception ex) {
			logger.error("Mail Queue Processing Error: {}", ex);
		}
		logger.debug("Sleeping");
		es.schedule(this, timeoutInMS, java.util.concurrent.TimeUnit.MILLISECONDS);
	}

	/** Retrieve a set of messages from the database and attempt to send them, clearing the emails
		that are successfully sent.
		@param sender An instance of MailSender to use for this attempt.
		@param length The maximum number of messages to grab from the queue at once.
		@return The number of messages successfully sent and cleared.
		@throws Exception if the list of pending messages can not be retrieved.
	*/
	public int processFrontOfQueue(final MailSender<C> sender, final int length) throws Exception {
		List<MessageStruct> msgList = null;
		try (final Connection conn = ds.getConnection()) {
			msgList = q.getMessages(conn, length, this.maxRetries);
		}
		return this.clearSent(this.sendMessages(sender, msgList));
	}

	/** Take a list of emails that have been successfully sent and remove them from the database.
		@param sent A list of emails that have been successfully sent.
		@return The number of emails cleared from the database.
	*/
	public int clearSent(final Iterable<MessageStruct> sent) {
		return JDBCTransaction.doTX(ds, new Block<Connection,Integer>() {
			public Integer eval(final Connection conn) throws Exception {
				int counter = 0;
				for(final MessageStruct m: sent) {
					counter += q.deleteMessage(conn, m.MailId);
				}
				logger.debug("Sent/Deleted " + counter + " message(s)");
				return counter;
			}
		});
	}

	/** Take a list of messages and send them using a MailSender instance.
		@param sender The MailSender implementation to send the messages.
		@param messages The list of messages to send.
		@return A list of messages that were successfully sent.
	*/
	public Iterable<MessageStruct> sendMessages(final MailSender<C> sender, final List<MessageStruct> messages) {
		logger.debug("Found " + messages.size() + " message(s).");
		final ArrayList<MessageStruct> sent = new ArrayList<>();
		final ArrayList<Long> incList = new ArrayList<>();
		try (final C conn = sender.getConnection()) {
			if(conn == null) { return sent; }
			for(final MessageStruct m: messages) {
				logger.trace("Sending message");
				try {
					if(sender.send(conn, m)) {
						sent.add(m);
					} else {
						incList.add(m.MailId);
					}
				} catch (final Exception e) {
					incList.add(m.MailId);
					throw new RuntimeException("Broken connection: {}", e);
				}
			}
		} catch (final Exception e) {
			logger.error("Error sending messages: {}", e);
		}
		if(incList.size() > 0) {
			try {
				JDBCTransaction.doTX(ds, new Block<Connection,Boolean>() {
					public Boolean eval(final Connection conn) throws Exception {
						for(final Long mid: incList) {
							q.incrementRetryCounter(conn, mid);
						}
						return true;
					}
				});
			} catch (final RuntimeException e) {
				logger.error("Error incrementing retry counters: {}", e);
			}
		}
		return sent;
	}
}