/*
* ALMA - Atacama Large Millimiter Array
* (c) European Southern Observatory, 2004
* Copyright by ESO (in the framework of the ALMA collaboration),
* All rights reserved
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston,
* MA 02111-1307 USA
*/
package alma.acs.logging;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import alma.acs.concurrent.DaemonThreadFactory;
/**
* Queue for LogRecord
s which takes care of dispatching them to a remote log service,
* using the one provided in {@link #setRemoteLogDispatcher(RemoteLogDispatcher)}.
*
* Technically this class is not a singleton, but it is foreseen to be used as a single instance. * It is thread-safe, so multiple log handlers can submit records. *
* All log records to be sent remotely to the central log service must be submitted
* to the {@link #log(LogRecord)} method.
* If the remote log service is not available (e.g. during startup, or later temporarily due to network problems),
* the log records will be cached.
* The cache size is given by {@link #MAX_QUEUE_SIZE}.
* If the cache is more than 70% full, the log
method will only accept records with level INFO
or higher.
* If the cache is full, no records are accepted. The idea is to not jeopardize the running system, but rather stop remote logging.
*
* The queue sorts log records by their log levels and thus dispatches the most important records first,
* using {@link LogRecordComparator}.
* Threading note: it seems ok to make this method "synchronized". This avoids problems with stale queue size,
* even though the damage would be small since the treatment of queue size is somewhat arbitrary anyway.
* Synchronization should not block callers long at all because flushing is done in a separate thread
* and only gets triggered here.
*
* @param logRecord to be logged
* @return true if logRecord was added to the queue for logging. False if queue was too full for this record.
*/
synchronized boolean log(LogRecord logRecord) {
int oldSize = queue.size();
// if queue is full, then drop the record
if (oldSize >= maxQueueSize) {
// first time overflow? Then start periodic flushing attempts to drain the queue once the central logger comes up again
boolean firstTimeOverflow = !outOfCapacity.getAndSet(true);
if (firstTimeOverflow) {
preOverflowFlushPeriod = currentFlushPeriod;
setPeriodicFlushing(10000);
}
if (LOSSLESS) {
do {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// nada
}
} while (queue.size() >= maxQueueSize);
} else {
if (DEBUG || firstTimeOverflow) {
System.out.println("log queue overflow: log record with message '" + logRecord.getMessage()
+ "' and possibly future log records will not be sent to the remote logging service.");
}
return false;
}
}
// The fact that we get here means that queue.size() < maxQueueSize
if (outOfCapacity.getAndSet(false)) {
// queue was full before, but now is better again
setPeriodicFlushing(preOverflowFlushPeriod);
preOverflowFlushPeriod = 0;
System.out.println("log queue no longer overflowing.");
}
// drop less important messages (DEBUG and below) if queue space gets scarce
if (!LOSSLESS) {
final int filterThreshold = maxQueueSize * 7 / 10;
if (oldSize >= filterThreshold) {
boolean firstTimeScarce = !scarceCapacity.getAndSet(true);
if (logRecord.getLevel().intValue() < Level.INFO.intValue()) {
if (DEBUG || firstTimeScarce) {
System.out.println("looming log queue overflow (" + (oldSize+1) + "/" + maxQueueSize
+ "): low-level log record with message '" + logRecord.getMessage()
+ "' and possibly other log records below INFO level will not be sent to the remote logging service.");
}
return false;
}
}
else {
scarceCapacity.set(false);
}
}
queue.put(logRecord);
if (DEBUG) {
System.out.println("DispatchingLogQueue#log called with record msg = " + logRecord.getMessage());
}
flushIfEnoughRecords(true);
return true;
}
/////////////////////////////////////////////////////////////
// external flush methods
/////////////////////////////////////////////////////////////
/**
* Flushes all log records if the remote log service has been made available before.
* Returns only when the log queue contains no more records, so execution time may be long.
* Should better not be called from the main thread.
*/
void flushAllAndWait() {
if (DEBUG) {
System.out.println("DispatchingLogQueue#flushAllAndWait() called");
}
if (!hasRemoteDispatcher()) {
if (DEBUG) {
System.out.println("ignoring call to DispatchingLogQueue#flushAllAndWait because the remote log service has not been supplied.");
}
return;
}
boolean flushSuccess = true; // not yet used other than for debugging
// If logs are produced very fast, for example ORB debug logs that are produced while dispatching logs to the Log service,
// then the queue would never be seen as fully empty by our while loop.
// To avoid getting stuck in an endless loop, we only require to flush as many logs as the queue contains in the beginning.
int initialQueueSize = realQueueSize();
long targetFlushCount = flushedLogCount.get() + initialQueueSize;
while (flushedLogCount.get() < targetFlushCount) {
Future
* This method returns immediately, since flushing is done in a separate thread.
* The returned future object can be used to wait for termination of the log flush and get the result, or to cancel the flush.
* The result is a
* Since flushing happens in a separate thread with a certain time lag, this method can avoid creating too many flush requests
* by only generating them if the number of queued log records is a multiple of the send buffer
* as opposed to larger than the buffer. This is controlled by the
* Note that the most important records are sent first because the queue sorts by log level, and that a successful
* flush() will call this method again to submit another flush;
* thus even a large queue will be drained fairly fast if the remote log service works,
* even in the absence of periodic flushing.
* @param conservative
*/
private void flushIfEnoughRecords(boolean conservative) {
int numRecords = queue.size();
if (DEBUG) {
System.out.println("flushIfEnoughRecords(" + conservative + "): current queue size is " + numRecords + ", remote dispatcher available: " + hasRemoteDispatcher());
}
if (hasRemoteDispatcher() && numRecords > 0) {
if ( (numRecords % remoteLogDispatcher.getBufferSize() == 0) ||
(!conservative && (numRecords >= remoteLogDispatcher.getBufferSize())) ) {
flush();
}
}
}
/**
* Internal flush method which covers straight calls to flush as well as scheduled calls.
*
* Threading note: this method is thread safe because competing threads are blocked on a flush lock
* (and additionally on
* Note that not all of these requested flushes necessarily result in a flushing of log records,
* because some of them may find an empty log record queue when they get executed, and thus end w/o effect.
*/
int pendingFlushes() {
BlockingQueue
* Note that during flushing, the records are first taken out of the queue, but then get resubmitted if sending
* them to the central log service failed.
* Therefore, calling
* All control over periodic log flushing is confined in this method.
*
* The call returns without further action if flushing is already enabled
* with the same period as
*
* @author hsommer
* created Apr 19, 2005 1:48:27 PM
*/
public class DispatchingLogQueue {
private volatile PriorityBlockingQueuefalse
, because application performance is more important than logging.
* For performance testing etc, setting this property may be useful though.
*/
private static boolean LOSSLESS = Boolean.getBoolean("alma.acs.logging.lossless");
DispatchingLogQueue() {
outOfCapacity = new AtomicBoolean(false);
scarceCapacity = new AtomicBoolean(false);
preOverflowFlushPeriod = 0;
currentFlushPeriod = 0;
setMaxQueueSize(1000);
flushLock = new ReentrantLock();
queue = new PriorityBlockingQueue<LoggingConfig MaxLogQueueSize/>
in the CDB.
*/
synchronized void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
/**
* Adds a LogRecord
to the internal queue, so that it gets scheduled for logging.
* If a high-level log record should be sent out immediately, the caller of this method should
* subsequently call {@link #flush()}, as the log method itself does not trigger a flush based on levels.
* Boolean
which is true if all or at least 1 log record could be taken off the log queue.
*/
Futureconservative
flag.
* queue.drainTo(..)
),
* so that the second thread may unexpectedly not find any log records and thus will return immediately.
*
* @param isScheduled true if this method is called by a timer, as opposed to some more direct thread. Used only for debugging.
* @return Success indicator: true if all or at least some log records were permanently taken off the queue by this flush request.
*/
private boolean flush(boolean isScheduled) {
if (DEBUG) {
System.out.println("DispatchingLogQueue#flush(isScheduled=" + isScheduled + ") called in thread " + Thread.currentThread().getName());
}
boolean flushedSomeRecords = false;
if (!hasRemoteDispatcher()) {
System.out.println("failed to flush logging queue because remote logging service has not been made available.");
}
else {
flushLock.lock();
try {
//Listqueue.size()
w/o flush synchronization may yield too low a value.
*/
int realQueueSize() {
flushLock.lock();
try {
return queue.size();
} finally {
flushLock.unlock();
}
}
boolean flushesPeriodically() {
return (flushScheduleFuture != null);
}
/////////////////////////////////////////////////////////////
// adding features
/////////////////////////////////////////////////////////////
/**
* Triggers periodic calls to {@link #flush(boolean)},
* or terminates such automatic flushing if periodMillisec == 0
.
* periodMillisec
.
*
* @param periodMillisec the delay between end of last scheduled flush() and the next scheduled flush().
*/
void setPeriodicFlushing(final int periodMillisec) {
if (!hasRemoteDispatcher()) {
System.out.println("DispatchingLogQueue#setPeriodicFlushing is ignored until setRemoteLogDispatcher() has been called!");
return;
}
// Only re-set the flushing if the value has changed, because the operation is expensive (stop/start).
if (currentFlushPeriod == periodMillisec && flushesPeriodically()) {
return;
}
// store period
currentFlushPeriod = periodMillisec;
if (flushesPeriodically()) {
// we already have something scheduled periodically
flushScheduleFuture.cancel(false);
flushScheduleFuture = null;
if (DEBUG && periodMillisec == 0) {
System.out.println("Stopping periodic log flushing.");
}
}
if (periodMillisec > 0) {
Runnable cmd = new Runnable() {
public void run() {
if (queue.size() > 0) {
// skip if last non-scheduled flush was too recent
if (lastFlushFinished <= System.currentTimeMillis() - periodMillisec) {
try {
flush(true);
} catch (Throwable thr) {
System.out.println("Scheduled flushing of log buffer failed: " + thr.getMessage());
// we swallow the error because otherwise future executions would be suppressed
}
}
else {
if (DEBUG) {
System.out.println("Skipping a scheduled log flush because of another recent flush.");
}
}
}
else {
if (DEBUG) {
System.out.println("Skipping a scheduled log flush because log queue is empty.");
}
}
}
};
flushScheduleFuture = executor.scheduleWithFixedDelay(cmd, periodMillisec, periodMillisec, TimeUnit.MILLISECONDS);
}
}
/**
* Sets the remote log dispatcher. Should be called once the remote log service is available.
* Calling this method will not flush the log queue (need to call {@link #flushAllAndWait()} separately),
* nor will it automatically trigger periodic flushes (call {@link #setPeriodicFlushing(int)} for this).
*
* @param remoteLogDispatcher The remoteLogDispatcher to set.
*/
void setRemoteLogDispatcher(RemoteLogDispatcher remoteLogDispatcher) {
this.remoteLogDispatcher = remoteLogDispatcher;
}
void shutDown() {
if (DEBUG) {
System.out.println("DispatchingLogQueue#shutDown called");
}
if (!executor.isShutdown()) {
executor.shutdown();
}
}
}