BasicStore.java

/***************************************************************************
   Copyright 2013 Emily Estes

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
***************************************************************************/
package net.metanotion.contentstore;


import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.metanotion.functor.Block;
import net.metanotion.io.FileSource;
import net.metanotion.io.FileSystem;
import net.metanotion.io.ReaderPump;
import net.metanotion.io.StreamPump;
import net.metanotion.io.TraverseableFile;
import net.metanotion.io.WriteableFile;
import net.metanotion.sql.SchemaGenerator;
import net.metanotion.sql.SequenceExecutor;
import net.metanotion.util.JDBCTransaction;

/** A basic implementation of a ContentStore backed by a SQL database with a FileSystem to store binary blobs/files.
See src/main/sql/BasicStore.schema.sql for the schema DDL expected.
	@param <F> The type of the file objects this store works with.
*/
public final class BasicStore<F extends WriteableFile<F> & TraverseableFile<F>> implements ContentStore {
	private static final Logger logger = LoggerFactory.getLogger(BasicStore.class);

	private static final SchemaGenerator schemaGenerator = new SchemaGenerator() {
		@Override public Iterator<String> openSchema() { return SequenceExecutor.openSchema(BasicStore.class, ";--"); }
	};

	/** Calculate the final name and path of the file from the file id number.
		This basically makes a primitive b-tree of folders. The top level divides files into "mebi counts" (2^20).
		The second level of directories does "kibi counts"(2^10). So each directory will only 1024 entries in it
		until the top level.
		@param fid The database id of the file record.
		@return a string representing the path to the file.
	*/
	private static String filename(final long fid) {
		final String slash = "/";
		final long kilo = (fid >> KILO_SHIFT_CONSTANT) % KILO_MODULO_CONSTANT;
		final long mega = fid >> MEGA_SHIFT_CONSTANT;
		return slash + mega + slash + kilo + slash  + fid;
	}
	private static final int KILO_SHIFT_CONSTANT = 10;
	private static final int KILO_MODULO_CONSTANT = 1024;
	private static final int MEGA_SHIFT_CONSTANT = 20;

	private static final Queries queries = new Queries();

	/** If you are using the default scheduled executor service, this is the default thread pool size. */
	public static final int DEFAULT_POOL_SIZE = 2;
	/** If you are using the default GC period parameters, this is the default period between GC collections. */
	public static final long DEFAULT_GC_PERIOD = 73L;
	/** If you are using the default GC period parameters, this is the default time unit. */
	public static final TimeUnit DEFAULT_UNIT = TimeUnit.HOURS;

	/** This returns a SchemaGenerator instance that returns an iterator of the statements required to build the expected
		schema for the BasicStore implementation of ContentStore.
		@return an instance of SchemaGenerator that will enumerate the SQL DDL required to build the basic store database tables.
	*/
	public static SchemaGenerator schemaFactory() { return schemaGenerator; }

	private static final class GC<F extends WriteableFile<F> & TraverseableFile<F>> implements Runnable {
		private static final int MAX_FILES = 100;

		private final DataSource ds;
		private final FileSystem<F> fs;
		private final ScheduledExecutorService es;
		private final long period;
		private final TimeUnit unit;

		public GC(final DataSource ds,
				final FileSystem<F> fs,
				final ScheduledExecutorService es,
				final long period,
				final TimeUnit unit) {
			this.ds = ds;
			this.fs = fs;
			this.es = es;
			this.period = period;
			this.unit = unit;
		}

		@Override public void run() {
			try (final Connection conn = this.ds.getConnection()) {
				long startFileId = 0;
				List<Long> files;
				do {
					files = queries.readDeleteQueue(conn, startFileId, MAX_FILES);
					for(final Long fileId: files) {
						startFileId = fileId;
						final F f = fs.get(filename(fileId));
						try {
							if((!f.exists()) || (f.exists() && f.delete()))  {
								queries.finalizeFile(conn, fileId);
							}
						} catch (final Exception ex) { }
					}
				} while(files.size() > 0);
			} catch (final Exception ex) {
				logger.error("Failure in file GC pass, {}", ex);
			}
			es.schedule(this, period, unit);
		}
	}

	/** Needed so some anonymous inner classes can grab a reference to the parent content store. */
	private final ContentStore that = this;
	/** The database connection pool. */
	private final DataSource ds;
	/** The file system storing large objects. */
	private final FileSystem<F> fs;

	/** Create a basic ContentStore with default GC parameters.
		@param ds The connection pool for the SQL database.
		@param fs The FileSystem to back the file/binary blobs.
		@param es The scheduled executor service to use for scheduling deleted file garbage collections.
	*/
	public BasicStore(final DataSource ds, final FileSystem<F> fs, final ScheduledExecutorService es) {
		this(ds, fs, es, DEFAULT_GC_PERIOD, DEFAULT_UNIT);
	}

	/** Create a basic ContentStore.
		@param ds The connection pool for the SQL database.
		@param fs The FileSystem to back the file/binary blobs.
		@param es The scheduled executor service to use for scheduling deleted file garbage collections.
		@param period The time between deleted file garbage collection cycles.
		@param unit The time unit for the period.
	*/
	public BasicStore(final DataSource ds,
			final FileSystem<F> fs,
			final ScheduledExecutorService es,
			final long period,
			final TimeUnit unit) {
		this.ds = ds;
		this.fs = fs;
		new GC(ds, fs, es, period, unit).run();
	}

	private static final class BadCollectionException extends RuntimeException {
		private static final String DNE = "' doesn't exist.";
		public BadCollectionException(final long id, final Throwable t) {
			super("Collection with id '" + id + DNE, t);
		}

		public BadCollectionException(final String name, final Throwable t) {
			super("Collection with name '" + name + DNE, t);
		}
	}

	@Override public Collection getCollection(final long id) {
		try (final Connection conn = this.ds.getConnection()) {
			return new BasicCollection(that, ds, fs, queries.getCollection(conn, id), id);
		} catch (final Exception e) { throw new BadCollectionException(id, e); }
	}

	@Override public Collection getCollection(final String name) {
		try (final Connection conn = this.ds.getConnection()) {
			return new BasicCollection(that, ds, fs, name, queries.lookupCollection(conn, name));
		} catch (final Exception e) { throw new BadCollectionException(name, e); }
	}

	@Override public Collection makeCollection(final String name) {
		return JDBCTransaction.doTX(this.ds, new Block<Connection, BasicCollection>() {
			public BasicCollection eval(final Connection conn) throws Exception {
				final long cid = queries.reserveCID(conn);
				if(queries.addCollection(conn, name, cid) > 0) {
					return new BasicCollection(that, ds, fs, name, cid);
				} else {
					throw new RuntimeException("Attempted to create a collection with the same name '" + name);
				}
			}
		});
	}

	@Override public Entry getEntry(final long id) {
		try (final Connection conn = this.ds.getConnection()) {
			return new BasicEntry(that, ds, fs, id, queries.getEntry(conn, id));
		} catch (final Exception e) { throw new BadCollectionException(id, e); }
	}

	/** A private implementation of a Collection for use with the BasicStore implementation of ContentStore. */
	private static final class BasicCollection<F extends WriteableFile<F> & TraverseableFile<F>> implements Collection {
		/** The database id number for this collection. */
		private final long id;
		/** The title of the collection. */
		private final String name;
		/** The content store associated with this collection. */
		private final ContentStore store;
		/** The database connection pool. */
		private final DataSource ds;
		/** The file system for storing large objects. */
		private final FileSystem<F> fs;
		/** Needed so some anonymous inner classes can grab a reference to the parent collection. */
		private final BasicCollection that = this;

		/** Create a Collection instance backed with information from the db and file system.
			@param store The ContentStore instance this collection is associated with.
			@param ds The database this collection came from.
			@param fs The file system this collection is stored in.
			@param name The ContentStore unique name this is assigned to this collection.
			@param id The OID from the database that is allocated to this collection.
		*/
		public BasicCollection(final ContentStore store,
				final DataSource ds,
				final FileSystem<F> fs,
				final String name,
				final long id) {
			this.id = id;
			this.name = name;
			this.store = store;
			this.ds = ds;
			this.fs = fs;
		}

		@Override public List<Entry> elements(final int pageSize, final int offset) {
			try (final Connection conn = this.ds.getConnection()) {
				final ArrayList<Entry> ret = new ArrayList<Entry>();
				for(final SQLEntry e: queries.listEntries(conn, id, pageSize, offset)) {
					ret.add(new BasicEntry(store, that, ds, fs, e.id, e));
				}
				return ret;
			} catch (final Exception ex) { throw new RuntimeException(ex); }
		}

		@Override public List<Header> header(final int pageSize, final int offset) {
			try (final Connection conn = this.ds.getConnection()) {
				return queries.listCollectionHeaders(conn, id, pageSize, offset);
			} catch (final Exception ex) { throw new RuntimeException(ex); }
		}

		@Override public Entry append(final String title) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, Entry>() {
				public Entry eval(final Connection conn) throws Exception {
					final long eid = queries.reserveEID(conn);
					queries.addEntry(conn, id, eid, (title == null) ? "" : title);
					return new BasicEntry(store, that, ds, fs, eid, queries.getEntry(conn, eid));
				}
			});
		}

		@Override public Collection setTitle(final String title) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, Collection>() {
				public Collection eval(final Connection conn) throws Exception {
					queries.updateCollectionTitle(conn, id, title);
					return new BasicCollection(store, ds, fs, title, id);
				}
			});
		}

		@Override public String getTitle() { return name; }
		@Override public long oid() { return id; }

		@Override public void delete() {
			JDBCTransaction.doTX(this.ds, new Block<Connection, Integer>() {
				public Integer eval(final Connection conn) throws Exception {
					return queries.deleteCollection(conn, id);
				}
			});
		}

		@Override public void attach(final Entry e) {
			JDBCTransaction.doTX(this.ds, new Block<Connection, Integer>() {
				public Integer eval(final Connection conn) throws Exception {
					return queries.attachCollection(conn, e.oid(), id);
				}
			});
		}

		@Override public void detach(final Entry e) {
			JDBCTransaction.doTX(this.ds, new Block<Connection, Integer>() {
				public Integer eval(final Connection conn) throws Exception {
					return queries.detachCollection(conn, e.oid(), id);
				}
			});
		}
	}

	/** A private implementation of an Entry for use with the BasicStore implementation of ContentStore. */
	private static final class BasicEntry<F extends WriteableFile<F> & TraverseableFile<F>> implements Entry {
		/** The collection this entry belongs to. */
		private final Collection coll;
		/** The content store ths entry belongs to. */
		private final ContentStore store;
		/** The database id of this entry. */
		private final long id;
		/** The database connection pool. */
		private final DataSource ds;
		/** The file system for storing large objects. */
		private final FileSystem<F> fs;
		/** The row describing this entry returned from the database. */
		private final SQLEntry e;

		/** Create a new instance of a entry.
			@param store The content store backing this entry.
			@param ds The database connection pool.
			@param fs The file system for large objects.
			@param id The database id of this entry.
			@param e The row record for this entry from the database.
		*/
		public BasicEntry(final ContentStore store,
				final DataSource ds,
				final FileSystem<F> fs,
				final long id,
				final SQLEntry e) {
			this(store, store.getCollection(e.cid), ds, fs, id, e);
		}

		/** Create an Entry instance backed with information from the db and file system.
			@param store The ContentStore instance this Entry is associated with.
			@param coll The collection this entry is a member of.
			@param ds The database this entry came from.
			@param fs The file system this entry is stored in.
			@param id The OID from the database that is allocated to this entry.
			@param e The data structure returned from the database representing this entry.
		*/
		public BasicEntry(final ContentStore store,
				final Collection coll,
				final DataSource ds,
				final FileSystem<F> fs,
				final long id,
				final SQLEntry e) {
			this.coll = coll;
			this.store = store;
			this.id = id;
			this.ds = ds;
			this.fs = fs;
			this.e = e;
		}

		@Override public List<Collection> elements(final int pageSize, final int offset) {
			try (final Connection conn = this.ds.getConnection()) {
				final ArrayList<Collection> result = new ArrayList<>();
				for(Header h: queries.listEntryHeaders(conn, id, pageSize, offset)) {
					result.add(new BasicCollection(store, ds, fs, h.Title, h.id));
				}
				return result;
			} catch (final Exception ex) { throw new RuntimeException(ex); }
		}

		@Override public List<Header> header(final int pageSize, final int offset) {
			try (final Connection conn = this.ds.getConnection()) {
				return queries.listEntryHeaders(conn, id, pageSize, offset);
			} catch (final Exception ex) { throw new RuntimeException(ex); }
		}

		@Override public Collection append(final String title) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, Collection>() {
				public Collection eval(final Connection conn) throws Exception {
					final Collection c = store.makeCollection(title);
					queries.attachCollection(conn, id, c.oid());
					return c;
				}
			});
		}

		@Override public Entry setTitle(final String title) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, BasicEntry>() {
				public BasicEntry eval(final Connection conn) throws Exception {
					queries.updateEntryTitle(conn, id, title);
					return new BasicEntry(store, coll, ds, fs, id, queries.getEntry(conn, id));
				}
			});
		}

		@Override public Entry update(final String entity, final String contentType) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, BasicEntry>() {
				public BasicEntry eval(final Connection conn) throws Exception {
					queries.updateTextEntry(conn, id, entity, contentType);
					return new BasicEntry(store, coll, ds, fs, id, queries.getEntry(conn, id));
				}
			});
		}

		@Override public Entry update(final InputStream entity, final String fileName, final String contentType) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, BasicEntry>() {
				public BasicEntry eval(final Connection conn) throws Exception {
					final long fid = queries.reserveFID(conn);
					final F f = fs.get(filename(fid));
					f.getParent().mkdirs();
					f.createNewFile();
					try (final OutputStream out = f.openOutput()) {
						new StreamPump(entity, out).pumpAll();
					}
					queries.updateFileEntry(conn, id, fid, contentType, fileName);
					return new BasicEntry(store, coll, ds, fs, id, queries.getEntry(conn, id));
				}
			});
		}

		@Override public Entry update(final FileSource entity, final String fileName, final String contentType) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, BasicEntry>() {
				public BasicEntry eval(final Connection conn) throws Exception {
					final long fid = queries.reserveFID(conn);
					final F f = fs.get(filename(fid));
					f.getParent().mkdirs();
					f.createNewFile();
					f.copyFrom(entity);
					queries.updateFileEntry(conn, id, fid, contentType, fileName);
					return new BasicEntry(store, coll, ds, fs, id, queries.getEntry(conn, id));
				}
			});
		}

		@Override public Collection getCollection() { return this.coll; }

		@Override public Entry move(final Collection collection) {
			return JDBCTransaction.doTX(this.ds, new Block<Connection, BasicEntry>() {
				public BasicEntry eval(final Connection conn) throws Exception {
					if(queries.moveEntry(conn, id, coll.oid(), collection.oid()) > 0) {
						return new BasicEntry(store, collection, ds, fs, id, e);
					} else {
						throw new RuntimeException("Could not move collection");
					}
				}
			});
		}

		@Override public String getContentType() { return e.ContentType; }
		@Override public String getFilename() { return e.FileName; }
		@Override public boolean isFile() { return ((e.FileID != null) && (e.FileID.longValue() != 0)); }

		@Override public InputStream readFile() {
			if((e.FileID != null) && (e.FileID.longValue() != 0)) {
				final F f = fs.get(filename(e.FileID));
				return f.openInput();
			} else {
				if(e.TextValue == null) { return new ByteArrayInputStream(new byte[0]); }
				return new ByteArrayInputStream(e.TextValue.getBytes(StandardCharsets.UTF_8));
			}
		}

		@Override public String readContent() {
			if((e.FileID != null) && (e.FileID.longValue() != 0)) {
				try (final InputStream f = this.readFile()) {
					final StringWriter s = new StringWriter();
					new ReaderPump(new InputStreamReader(f, StandardCharsets.UTF_8), s).pumpAll();
					return s.toString();
				} catch (final Exception ex) { throw new RuntimeException(ex); }
			} else {
				return e.TextValue;
			}
		}

		@Override public String getTitle() { return e.Title; }
		@Override public long oid() { return id; }

		@Override public void delete() {
			JDBCTransaction.doTX(this.ds, new Block<Connection, Integer>() {
				public Integer eval(final Connection conn) throws Exception {
					return queries.deleteEntry(conn, id);
				}
			});
		}
	}
}