/* * ALMA - Atacama Large Millimiter Array * (c) European Southern Observatory, 2005 * Copyright by ESO (in the framework of the ALMA collaboration), * All rights reserved * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, * MA 02111-1307 USA */ package alma.acs.logging.engine; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Date; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.omg.CORBA.ORB; import com.cosylab.logging.engine.LogEngineException; import com.cosylab.logging.engine.ACS.ACSLogConnectionListener; import com.cosylab.logging.engine.ACS.ACSRemoteLogListener; import com.cosylab.logging.engine.ACS.LCEngine; import com.cosylab.logging.engine.log.ILogEntry; import com.cosylab.logging.engine.log.LogField; import com.cosylab.logging.engine.log.LogTypeHelper; import si.ijs.maci.Manager; /** * Receives log messages from the ACS Log service * and sorts them by timestamp, even if the log messages arrive * in different order within a certain time window. *
* For a client of this class to consume these log messages, * method {@link #getLogQueue()} provides a {@link java.util.concurrent.BlockingQueue} * from which the client can extract the messages. *
* Before log messages can be received, {@link #initialize(ORB, Manager)} must be called. * To disconnect from the the log stream, call {@link #stop()}. *
* As an alternative to
* This method attempts to wait for successful initialization for up to 10 seconds.
* If initialization did not happen within this time,
* If you call this method, make sure to subsequently call
* {@link #getLogQueue()} and drain the queue at your own responsibility,
* or to call {@link #startCaptureLogs(PrintWriter)} (or {@link #startCaptureLogs(PrintWriter, ThreadFactory)})
* which will drain the queue automatically.
*
* @return
* This method attempts to wait for successful initialization for up to
* The queue elements are of type {@link DelayedLogEntry},
* from which the log record can be extracted
* using the method {@link DelayedLogEntry#getLogEntry()}.
* @return
*/
public BlockingQueue
* The
*
* @param logWriter
* @throws IOException
*/
public void startCaptureLogs(final PrintWriter logWriter) throws IOException {
startCaptureLogs(logWriter, (ThreadFactory) null);
}
/**
* Variant of {@link #startCaptureLogs(PrintWriter)} which takes an optional ThreadFactory
* which will be used to create the thread that reads out the log queue.
* This method could be used if getLogQueue
and stop
,
* this class also offers the method pair {@link #startCaptureLogs(PrintWriter)}
* and {@link #stopCaptureLogs()}. These methods can be used to directly
* print the received log messages to some writer, sparing the effort of listening on the LogQueue.
* Method {@link #stopCaptureLogs()} must be called in that case to stop capturing logs.
*
* @author hsommer
*/
public class LogReceiver {
private boolean verbose = false;
protected LCEngine lct;
protected MyRemoteResponseCallback rrc;
// the queue into which all log records are stored
private DelayQueuefalse
is returned, otherwise true.
* true
if initialization was successful within at most 10 seconds
* @throws LogEngineException In case of error instantiating the {@link LCEngine}
* @see #initialize(ORB, Manager, int)
*
*/
public boolean initialize() throws LogEngineException {
return initialize(null, null, 10);
}
/**
* Variant of {@link #initialize()} which takes an existing ORB and manager reference.
* timeoutSeconds
seconds.
* If initialization did not happen within this time, false
is returned, otherwise true.
*
* @param theORB the fully functional ORB object to reuse, or null
if an ORB should be created
* @param manager reference to the acs manager, or null
if this reference should be created
* @param timeoutSeconds timeout for awaiting the successful initialization.
* @return true if initialization was successful within at most timeoutSeconds
seconds.
* @throws LogEngineException In case of error instantiating the {@link LCEngine}
*/
public boolean initialize(ORB theORB, Manager manager, int timeoutSeconds) throws LogEngineException {
boolean ret = false;
if (verbose) {
System.out.println("Attempting to connect to Log channel...");
}
logDelayQueue = new DelayQueuedelayTimeMillis
parameter in the constructor sets the buffer time during which log entries
* are not yet available for the consumer, so that late arriving records get a chance
* to be sorted in according to timestamp.
* As of ACS 7.0.1 the issue of timestamps that lie in the future (e.g. logs from a different machine with unsync'd time)
* is addressed in the way that "future" log records will become available to the consumer before
* the local system time has reached the timestamp.
*
* @author hsommer
*/
public static class DelayedLogEntry implements Delayed {
/** delay for sorting by timestamp */
private long delayTimeMillis;
private boolean isQueuePoison = false;
private static final AtomicInteger logRecordCounter = new AtomicInteger();
private int logRecordIndex;
private ILogEntry logEntry;
private long triggerTimeMillis;
DelayedLogEntry(ILogEntry logEntry, long delayTimeMillis) {
logRecordIndex = logRecordCounter.incrementAndGet();
this.logEntry = logEntry;
this.delayTimeMillis = delayTimeMillis;
Date logDate = new Date((Long)logEntry.getField(LogField.TIMESTAMP));
// if log record has a time stamp in the future (according to local machine time), then we clip it to the current time
long adjustedLogTimestamp = Math.min(System.currentTimeMillis(), logDate.getTime());
triggerTimeMillis = adjustedLogTimestamp + delayTimeMillis;
}
/**
* Ctor used for special queue poison instance
* @param delayTimeMillis
*/
private DelayedLogEntry(long delayTimeMillis) {
logRecordIndex = logRecordCounter.incrementAndGet();
this.delayTimeMillis = delayTimeMillis;
triggerTimeMillis = System.currentTimeMillis() + delayTimeMillis;
}
public static DelayedLogEntry createQueuePoison(long delayTimeMillis) {
DelayedLogEntry dle = new DelayedLogEntry(delayTimeMillis);
dle.isQueuePoison = true;
return dle;
}
/**
* True if this entry designates the end of the queue.
* According to {@link BlockingQueue}, this element is called the "poison".
* @return true if this is the end-of-queue marker.
* @see #createQueuePoison(long)
*/
public boolean isQueuePoison() {
return isQueuePoison;
}
/**
* Returns the ILogEntry
class that was wrapped for sorting inside the queue.
* That class represents the log record as it was received from the logging service.
* Beware that the log level you get from this ILogEntry is not an ACS log level, but
* comes from some level representation internally used by the jlog application whose code got reused here!
* To get an ACS level, you must convert it using {@link LogTypeHelper#getAcsCoreLevel(Integer)}
* @deprecated use {@link #getLogRecord()} to avoid dealing with jlog-internal log level (=severity) number ranges.
*/
public ILogEntry getLogEntry() {
return logEntry;
}
/**
* Returns the log record that was wrapped for sorting inside the queue.
* The returned object represents the log record as it was received from the logging service.
*/
public ReceivedLogRecord getLogRecord() {
return new ReceivedLogRecord(logEntry);
}
/**
* This method is used by the queue to determine whether the log record may
* leave the queue already.
*/
public long getDelay(TimeUnit unit) {
long delay = triggerTimeMillis - System.currentTimeMillis();
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
/**
* This method is used by the queue for sorting,
* comparing timestamps and arrival order.
*/
public int compareTo(Delayed other) {
DelayedLogEntry otherDelayedLogEntry = (DelayedLogEntry) other;
long i = triggerTimeMillis;
long j = (otherDelayedLogEntry).triggerTimeMillis;
int returnValue;
if (i < j) {
returnValue = -1;
} else if (i > j) {
returnValue = 1;
} else {
// if timestamps are equal we sort by arrival order
if (this.logRecordIndex < otherDelayedLogEntry.logRecordIndex) {
returnValue = -1;
} else if (this.logRecordIndex > otherDelayedLogEntry.logRecordIndex) {
returnValue = 1;
} else {
// this should never happen
returnValue = 0;
}
}
return returnValue;
}
/**
* Equals method, just to be consistent with compareTo
.
*/
public boolean equals(Object other) {
if (other==null) {
return false;
}
if (!(other instanceof DelayedLogEntry)) {
return false;
}
DelayedLogEntry otherDelayedLogEntry = (DelayedLogEntry) other;
return ( otherDelayedLogEntry.triggerTimeMillis == triggerTimeMillis &&
otherDelayedLogEntry.logRecordIndex == logRecordIndex );
}
/**
* hashCode method, since we already have equals.
*/
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + logRecordIndex;
result = prime * result + (int) (triggerTimeMillis ^ (triggerTimeMillis >>> 32));
return result;
}
long getDelayTimeMillis() {
return delayTimeMillis;
}
}
/**
* A jlog-independent representation of the log record we received
*/
public static class ReceivedLogRecord {
private ILogEntry jlogRecord;
public ReceivedLogRecord(ILogEntry jlogRecord) {
this.jlogRecord = jlogRecord;
}
public Date getTimestamp() {
return new Date((Long)jlogRecord.getField(LogField.TIMESTAMP));
}
public LogTypeHelper getLevel() {
return (LogTypeHelper)jlogRecord.getField(LogField.ENTRYTYPE);
}
public String getSourceObject() {
return (String)jlogRecord.getField(LogField.SOURCEOBJECT);
}
public String getFile() {
return (String)jlogRecord.getField(LogField.FILE);
}
public int getLine() {
return ((Integer)jlogRecord.getField(LogField.LINE)).intValue();
}
public String getRoutine() {
return (String)jlogRecord.getField(LogField.ROUTINE);
}
public String getHost() {
return (String)jlogRecord.getField(LogField.HOST);
}
public String getProcess() {
return (String)jlogRecord.getField(LogField.PROCESS);
}
public String getLogID() {
return (String)jlogRecord.getField(LogField.LOGID);
}
public String getThread() {
return (String)jlogRecord.getField(LogField.THREAD);
}
public String getMessage() {
return (String)jlogRecord.getField(LogField.LOGMESSAGE);
}
// @TODO add more getter methods if required...
// CONTEXT("Context",String.class,"Context"),
// PRIORITY("Priority",Integer.class,"Priority"),
// URI("URI",String.class,"URI"),
// STACKID("Stack ID",String.class,"StackId"),
// STACKLEVEL("Stack Level",Integer.class,"StackLevel"),
// AUDIENCE("Audience",String.class, "Audience"),
// ARRAY("Array",String.class,"Array"),
// ANTENNA("Antenna",String.class,"Antenna");
}
/**
* Convenience method to capture logs directly into a PrintWriter.
* Method {@link #initialize(ORB, Manager)} must be called as a precondition.
* Method {@link #stopCaptureLogs()} must be called to stop writing logs to logWriter
.
* LogReceiver
is run inside a container
* or as part of a ComponentClientTestCase.
*/
public void startCaptureLogs(final PrintWriter logWriter, ThreadFactory threadFactory) throws IOException {
if (!isInitialized()) {
throw new IllegalStateException("First call LogReceiver#initialize(ORB, Manager), then startCaptureLogs(PrintWriter)");
}
if (listenForLogs) {
if (verbose) {
System.out.println("Ignoring call to 'startCaptureLogs' while already capturing logs.");
}
return;
}
Runnable logRetriever = new Runnable() {
public void run() {
// System.out.println("logRetriever.run called...");
try {
BlockingQueue