/* * ALMA - Atacama Large Millimiter Array (c) European Southern Observatory, 2006 * * This library is free software; you can redistribute it and/or modify it under * the terms of the GNU Lesser General Public License as published by the Free * Software Foundation; either version 2.1 of the License, or (at your option) * any later version. * * This library is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more * details. * * You should have received a copy of the GNU Lesser General Public License * along with this library; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ package alma.acs.util.stringqueue; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** * The queue of entries. *
* This class has been introduced to avoid keeping in memory a
* never ending queue of {@link QueueEntry} and reduce the
* chance to face an out of memory at run-time.
*
* This class is composed of two lists and a file.
* inMemoryQueue
is the list of the entries to get.
* When this list contains few items then some more items are read from the file.
*
* The other list, EntriesQueue
, is a buffer where the entries
* are stored ready to be flushed on disk.
* This is done to write more entries at once reducing the I/O and increasing
* the performances.
*
* Implementation note
* QueueEntry
items are read only with the get
* method and pushed with the put
.
*
* Adding entries:
* Getting entries:
* The I/O is paginated i.e. each read or write is done
* for a block of
* Note: a new page is always added at the end of the
* file while the reading happens in a different
* order.
*/
private volatile int nextPageToRead=0;
/**
* The random number generator
*/
private final Random randomNumberGenerator = new Random(System.currentTimeMillis());
/**
* Put an entry in Cache.
*
* If the cache is full the entry is added to the buffer.
*
* @param entry The not
* The vector contains the last added entries so if there are pages in
* the file they are flushed before the vector
*
* @throws IOException In case of error during I/O
*/
private void flushEntriesInQueue() throws IOException {
if (pagesOnFile==0) {
if (cachedEntries.size()!=0) {
inMemoryQueue.addAll(cachedEntries);
cachedEntries.clear();
}
} else {
// Get the next page from disk
readNextPageFromFile();
}
}
/**
* Read page from the file putting all the
* If there is enough room in inMemoryQueue
* (i.e. inMemoryQueue.size()
in the file
*
* @throws IOException In case of error creating a new temporary file
*/
private void writePageOnFile() throws IOException {
if (file==null) {
file=getNewFile();
try {
raFile=new RandomAccessFile(file,"rw");
} catch (FileNotFoundException e) {
// Ops an error creating the file
// print a message and exit: in this way it will try again
// at next iteration
file=null;
raFile=null;
IOException ioe = new IOException("Error creating the random file",e);
throw ioe;
}
}
if (cachedEntries.size()cachedEntries
ready to be written on file.
* If the size of cachedEntries
is greater then PAGE_LEN
,
* the size of a page, then a page is flushed on disk.
* Note that what is in the list, cacheEntries
is added at the end of the file.
*
* The entry to get is always in inMemoryQueue
.
* After getting an entry, it checks if the the size of the queue allows to get
* new entries from the file or from the cachedEntries
.
* Note that the right order is first the file and then cachedEntries
.
* In fact cachedEntries
, contains the last received entries,
* packed to be transferred on a new page on disk while the first entries to push
* in the queue are on a page disk (if any).
*
*
* @author acaproni
*
*/
public class EntriesQueue {
/**
* The entries to keep in memory.
*/
private final ListTHRESHOLD
then the
* entries in the buffer are flushed in the queue
*/
public static final int THRESHOLD=12500;
/**
* The buffer for each I/O
*/
private byte[] fileBuffer = new byte[PAGE_SIZE];
/**
* The buffer containing the hexadecimal string of a QueueEntry
*/
private byte[] entryBuffer =new byte[QueueEntry.ENTRY_LENGTH];
/**
*
* This Vector contains the entries that will be written on the file.
*
*/
private ListbufferFile
.
* PAGE_LEN
entries.
*/
private RandomAccessFile raFile=null;
/**
* The number of pages written on file and not yet read
*/
private volatile int pagesOnFile=0;
/**
* The number of the next page to read from file.
* null
{@link QueueEntry} to add to the queue
* @throws IOException In case of I/O error while flushing the cache on disk
*/
public synchronized void put(QueueEntry entry) throws IOException {
if (entry==null) {
throw new IllegalArgumentException("The queue do not contain null items!");
}
if (inMemoryQueue.size()null
if the
* queue is empty
*
* @throws IOException In case of error during I/O
*/
public synchronized QueueEntry get() throws IOException {
if (inMemoryQueue.isEmpty()) {
return null;
}
QueueEntry e = inMemoryQueue.remove(0);
if (e!=null && inMemoryQueue.size()true
if the queue is empty;
* false
otherwise.
*/
public synchronized boolean isEmpty() {
return size()==0;
}
/**
* Attempts to create the file for the strings in several places
* before giving up.
*
* @return A new temporary file
* null
if it was not possible to create a new file
* @throws IOException In case of error creating the temporary file
*/
private File getNewFile() throws IOException {
String name=null;
File f=null;
try {
// Try to create the file in $ACSDATA/tmp
String acsdata = System.getProperty("ACS.data");
acsdata=acsdata+File.separator+"tmp"+File.separator;
File dir = new File(acsdata);
f = File.createTempFile("entriesQueue",".tmp",dir);
name=f.getAbsolutePath();
} catch (IOException ioe) {
// Another error :-O
String homeDir = System.getProperty("user.dir");
File homeFileDir = new File(homeDir);
if (homeFileDir.isDirectory() && homeFileDir.canWrite()) {
do {
// Try to create the file in the home directory
int random = randomNumberGenerator.nextInt();
name = homeDir +File.separator + "entriesQueue"+random+".jlog";
f = new File(name);
} while (f.exists());
} else {
// last hope, try to get a system temp file
f = File.createTempFile("entriesQueue",".tmp");
name=f.getAbsolutePath();
}
}
if (f!=null) {
f.deleteOnExit();
}
return f;
}
/**
* Move the entries from the file or the vector into the queue
* QueueEntry
it contains
* in the queue.
*
* @throws IOException In case of error during I/O
*/
private void readNextPageFromFile() throws IOException {
if (pagesOnFile==0) {
throw new IllegalStateException("No pages available on file");
}
if (raFile==null || file==null) {
throw new IllegalStateException("The file (random or buffer) is null!");
}
if (inMemoryQueue.size()