/* * ALMA - Atacama Large Millimiter Array * (c) European Southern Observatory, 2009 * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ package alma.acs.nc; import static alma.acs.nc.sm.generated.EventSubscriberAction.createConnection; import static alma.acs.nc.sm.generated.EventSubscriberAction.createEnvironment; import static alma.acs.nc.sm.generated.EventSubscriberAction.destroyConnection; import static alma.acs.nc.sm.generated.EventSubscriberAction.destroyEnvironment; import static alma.acs.nc.sm.generated.EventSubscriberAction.resumeConnection; import static alma.acs.nc.sm.generated.EventSubscriberAction.suspendConnection; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import org.apache.commons.scxml.ErrorReporter; import org.apache.commons.scxml.EventDispatcher; import org.apache.commons.scxml.SCInstance; import org.apache.commons.scxml.TriggerEvent; import alma.ACSErrTypeCommon.wrappers.AcsJBadParameterEx; import alma.ACSErrTypeCommon.wrappers.AcsJCORBAProblemEx; import alma.ACSErrTypeCommon.wrappers.AcsJCouldntPerformActionEx; import alma.ACSErrTypeCommon.wrappers.AcsJIllegalStateEventEx; import alma.ACSErrTypeCommon.wrappers.AcsJStateMachineActionEx; import alma.acs.container.ContainerServicesBase; import alma.acs.exceptions.AcsJException; import alma.acs.logging.MultipleRepeatGuard; import alma.acs.logging.RepeatGuard; import alma.acs.logging.RepeatGuard.Logic; import alma.acs.nc.sm.generated.EventSubscriberAction; import alma.acs.nc.sm.generated.EventSubscriberSignal; import alma.acs.nc.sm.generated.EventSubscriberSignalDispatcher; import alma.acs.nc.sm.generic.AcsScxmlActionDispatcher; import alma.acs.nc.sm.generic.AcsScxmlActionExecutor; import alma.acs.nc.sm.generic.AcsScxmlEngine; import alma.acs.util.StopWatch; import alma.acsErrTypeLifeCycle.wrappers.AcsJEventSubscriptionEx; import alma.acsnc.EventDescription; /** * Base class for an event subscriber, that can be used for both Corba NC and for in-memory test NC. *
* We will have to see if it can also be used for DDS-based events, or if the required modifications will be too heavy.
*
* @param
* Errors are logged, using also fields {@link #numEventsDiscarded} and {@link #receiverTooSlowLogRepeatGuard}.
*
* The difference of this mechanism compared to the logs controlled by {@link #processTimeLogRepeatGuard} is that
* here we take into account the actual event rate and check whether the receiver can handle it,
* whereas {@link #processTimeLogRepeatGuard} only compares the actual process times against pre-configured values.
*/
private ThreadPoolExecutor eventHandlingExecutor;
/**
* @see #eventHandlingExecutor
*/
private final AtomicLong numEventsDiscarded = new AtomicLong(0);
/**
* Throttles logs from {@link #logEventProcessingTooSlowForEventRate(long, String, long)}.
*/
private final RepeatGuard receiverTooSlowLogRepeatGuard;
/**
* Contains a generic receiver to be used by the
* {@link #addGenericSubscription()} method.
*/
protected AcsEventSubscriber.GenericCallback genericReceiver;
/**
* Contains a list of receiver functions to be invoked when an event
* of a particular type is received.
*
* key = the event type (Java class derived from IDL struct).
* Normally an ACS class such as container services will act as the factory for event subscriber objects,
* but for exceptional cases it is also possible to create one stand-alone,
* as long as the required parameters can be provided.
*
* @param services
* To get ACS logger, access to the CDB, etc.
* @param clientName
* A name that identifies the client of this NCSubscriber.
* TODO: Check if we still need this name to be specified separately from {@link services#getName()}.
* @throws AcsJException
* Thrown on any really bad error conditions encountered.
*/
public AcsEventSubscriberImplBase(ContainerServicesBase services, String clientName, Class
* The overhead of implementing this method is the price we pay for avoiding reflection
* and allowing flexible action implementation in the SM design.
* @see alma.acs.nc.sm.generic.AcsScxmlActionExecutor#execute(java.lang.Enum, org.apache.commons.scxml.EventDispatcher, org.apache.commons.scxml.ErrorReporter, org.apache.commons.scxml.SCInstance, java.util.Collection)
*/
@Override
public boolean execute(EventSubscriberAction action, EventDispatcher evtDispatcher, ErrorReporter errRep,
SCInstance scInstance, Collection
* Shuts down the event queue.
* Queued events may still be processed by the receivers afterwards,
* but here we wait for up to 500 ms to log it if it is the case.
*
* Further events delivered to {@link #processEventAsync(Object, EventDescription)}
* will cause an exception there.
*
* Subclass may override, but must call super.destroyConnectionAction().
*/
protected void destroyConnectionAction(EventDispatcher evtDispatcher, ErrorReporter errRep, SCInstance scInstance,
Collection
* The subclass must know whether such a condition is expected or not,
* e.g. because event filtering is set up outside of the subscriber
* and only subscribed event types are expected to arrive.
*/
protected abstract void logNoEventReceiver(String eventName);
/**
* Logs the error that the local event buffer could not be emptied before shutting down the subscriber.
*/
protected abstract void logQueueShutdownError(int timeoutMillis, int remainingEvents);
/**
* Asynchronously calls {@link #processEvent(Object, EventDescription)},
* using {@link #eventHandlingExecutor}.
*
* This method should be called from the subclass-specific method that receives the event,
* for example
* This method is thread-safe.
*
* @param eventData (defined as
* No exception is allowed to be thrown by this method, even if the receiver implementation throws a RuntimeExecption
* @param eventData (defined as
* If in addition to this generic subscription we also have specific subscriptions via
* {@link #addSubscription(Class, Callback)},
* then those more specific subscriptions will take precedence in receiving an event.
*
* Notice though that any server-side filters previously created for the event type specific
* subscriptions get deleted when calling this method, so that even after removing a generic
* subscription the network performance gain of server-side filtering is lost.
* @TODO: Couldn't this be fixed by creating the server-side filters on demand (see also javadoc class comment about lifecycle)?
*/
@Override
public final void addGenericSubscription(GenericCallback receiver) throws AcsJEventSubscriptionEx {
// First time we create the filter and set the receiver
if( genericReceiver == null ) {
notifyFirstSubscription(null);
}
// After the filter is created, we just replace the receiver
genericReceiver = receiver;
}
/**
* Removes the generic receiver handler.
*
* @throws AcsJCORBAProblemEx
*/
@Override
public final void removeGenericSubscription() throws AcsJEventSubscriptionEx {
if (genericReceiver == null ) {
AcsJEventSubscriptionEx ex = new AcsJEventSubscriptionEx();
ex.setContext("Failed to remove generic subscription when not actually subscribed.");
ex.setEventType("generic");
throw ex;
}
notifySubscriptionRemoved(null);
genericReceiver = null;
}
@Override
public final void addSubscription(Callback receiver)
throws AcsJEventSubscriptionEx {
Class subscribedEventType = receiver.getEventType();
if (subscribedEventType == null || !(eventType.isAssignableFrom(subscribedEventType)) ) {
AcsJEventSubscriptionEx ex = new AcsJEventSubscriptionEx();
ex.setContext("Receiver is returning a null or invalid event type. " +
"Check the getEventType() method implementation and try again.");
ex.setEventType(subscribedEventType == null ? "null" : subscribedEventType.getName());
throw ex;
}
// First time we create the filter and set the receiver
if (!receivers.containsKey(subscribedEventType)) {
notifyFirstSubscription(subscribedEventType);
}
// After the filter is created, we just replace the corresponding receivers
receivers.put(subscribedEventType, receiver);
}
@Override
public final void removeSubscription(Class structClass) throws AcsJEventSubscriptionEx {
// Removing subscription from receivers list
if (structClass != null) {
if (receivers.containsKey(structClass)) {
receivers.remove(structClass);
notifySubscriptionRemoved(structClass);
}
else {
AcsJEventSubscriptionEx ex = new AcsJEventSubscriptionEx();
ex.setContext("Trying to unsubscribe from an event type not being subscribed to.");
ex.setEventType(structClass.getName());
throw ex;
}
}
else {
// Removing every type of event
receivers.clear();
notifyNoSubscription();
}
}
@Override
public final void startReceivingEvents() throws AcsJIllegalStateEventEx, AcsJCouldntPerformActionEx {
try {
stateMachineSignalDispatcher.startReceivingEvents();
} catch (AcsJStateMachineActionEx ex) {
throw new AcsJCouldntPerformActionEx(ex);
}
}
/**
* @see alma.acs.nc.AcsEventSubscriber#disconnect()
*/
@Override
public final void disconnect() throws AcsJIllegalStateEventEx, AcsJCouldntPerformActionEx {
try {
stateMachineSignalDispatcher.stopReceivingEvents();
}
catch (AcsJIllegalStateEventEx ex) {
// ignore. If the state is totally wrong then the subsequent cleanUpEnvironment
// will also throw AcsJIllegalStateEventEx
}
catch (AcsJStateMachineActionEx ex) {
throw new AcsJCouldntPerformActionEx(ex);
}
finally {
// even after AcsJIllegalStateEventEx in stopReceivingEvents we want to do the second clean-up step
try {
stateMachineSignalDispatcher.cleanUpEnvironment();
} catch (AcsJStateMachineActionEx ex) {
throw new AcsJCouldntPerformActionEx(ex);
}
}
}
@Override
public final void suspend() throws AcsJIllegalStateEventEx, AcsJCouldntPerformActionEx {
try {
stateMachineSignalDispatcher.suspend();
} catch (AcsJStateMachineActionEx ex) {
throw new AcsJCouldntPerformActionEx(ex);
}
}
@Override
public final void resume() throws AcsJIllegalStateEventEx, AcsJCouldntPerformActionEx {
try {
stateMachineSignalDispatcher.resume();
} catch (AcsJStateMachineActionEx ex) {
throw new AcsJCouldntPerformActionEx(ex);
}
}
////////////////////////////////////////////////////////////////////////////////////////
///////////////////////// Subscription helper methods //////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////
/**
* @param structClass Can be Object
or IDLEntity
.
*/
public abstract class AcsEventSubscriberImplBase
* value = the matching event handler.
*/
protected final Maptrue
if events should be logged when they are received.
*/
protected abstract boolean isTraceEventsEnabled();
/**
* Base class constructor, to be called from subclass ctor.
* IMPORTANT: Subclasses MUST call "stateMachine.setUpEnvironment()" at the end of their constructors.
* This is because Java does not support template method design for constructors
* (see for example http://stackoverflow.com/questions/2906958/running-a-method-after-the-constructor-of-any-derived-class).
* Since the subclasses are all meant to be produced within ACS I did not want to go for dirty tricks to work around this.
* We also don't want to call a public init() method after constructing the subscriber.
* push_structured_event
in case of Corba NC.
* Object
instead of T
to include data for generic subscription).
* @param eventDesc event meta data
*/
protected void processEventAsync(final Object eventData, final EventDescription eventDesc) {
// to avoid unnecessary scary logs, we tolerate previous events up to half the queue size
boolean isReceiverBusyWithPreviousEvent = ( eventHandlingExecutor.getQueue().size() > EVENT_QUEUE_CAPACITY / 2 );
// logger.info("Queue size: " + eventHandlingExecutor.getQueue().size());
boolean thisEventDiscarded = false;
try {
eventHandlingExecutor.execute(
new Runnable() {
public void run() {
// here we call processEvent from the worker thread
processEvent(eventData, eventDesc);
}
});
} catch (RejectedExecutionException ex) {
// receivers have been too slow, queue is actually full, will drop data.
thisEventDiscarded = true;
numEventsDiscarded.incrementAndGet();
}
if (thisEventDiscarded || isReceiverBusyWithPreviousEvent) {
// receiverTooSlowLogRepeatGuard currently not thread-safe, therefore synchronize
synchronized (receiverTooSlowLogRepeatGuard) {
if (receiverTooSlowLogRepeatGuard.checkAndIncrement()) {
// About numEventsDiscarded and concurrency:
// That counter may have been incremented by other threads between the above RejectedExecutionException
// and here. These threads are blocked now, and have not yet incremented the repeat guard.
// This can lead to some harmless irregularities in how often we actually log the message.
// What matters is that we report correctly the number of discarded events.
logEventProcessingTooSlowForEventRate(
numEventsDiscarded.getAndSet(0),
eventData.getClass().getName());
}
}
}
}
/**
* This method should be called from the subclass-specific method that receives the event,
* for example push_structured_event
in case of Corba NC,
* or preferably via {@link #processEventAsync(Object, EventDescription)}.
* Object
instead of T
to include data for generic subscription).
* @param eventDesc
*/
protected void processEvent(Object eventData, EventDescription eventDesc) {
Class> incomingEventType = eventData.getClass();
String eventName = incomingEventType.getName();
// figure out how much time this event has to be processed (according to configuration)
double maxProcessTimeSeconds = getMaxProcessTimeSeconds(eventName);
StopWatch profiler = new StopWatch();
// we give preference to a receiver that has registered for this event type T or a subtype
if (eventType.isAssignableFrom(incomingEventType) && receivers.containsKey(incomingEventType)) {
@SuppressWarnings("unchecked")
T typedEventData = (T) eventData;
Callback extends T> receiver = receivers.get(incomingEventType);
profiler.reset();
try {
_process(receiver, typedEventData, eventDesc);
}
catch (Throwable thr) {
logEventReceiveHandlerException(eventName, receiver.getClass().getName(), thr);
}
double usedSecondsToProcess = (profiler.getLapTimeMillis() / 1000.0);
// warn the end-user if the receiver is taking too long, using a repeat guard
if (usedSecondsToProcess > maxProcessTimeSeconds && processTimeLogRepeatGuard.checkAndIncrement(eventName)) {
logEventProcessingTimeExceeded(eventName, processTimeLogRepeatGuard.counterAtLastExecution(eventName));
}
}
// fallback to generic receive method
else if (genericReceiver != null) {
profiler.reset();
genericReceiver.receiveGeneric(eventData, eventDesc);
double usedSecondsToProcess = (profiler.getLapTimeMillis() / 1000.0);
// warn the end-user if the receiver is taking too long
if (usedSecondsToProcess > maxProcessTimeSeconds && processTimeLogRepeatGuard.checkAndIncrement(eventName)) {
logEventProcessingTimeExceeded(eventName, processTimeLogRepeatGuard.counterAtLastExecution(eventName));
}
}
// No receiver found.
// This may be OK or not, depending on whether the subclass sets up filtering in the underlying notification framework
// that ensures that only subscribed event types reach the subscriber. Subclass should decide if and how to report this.
else {
logNoEventReceiver(eventName);
}
}
/**
* "Generic helper method" to enforce type argument inference by the compiler,
* see http://www.angelikalanger.com/GenericsFAQ/FAQSections/ProgrammingIdioms.html#FAQ207
*/
private void _process(Callback receiver, T eventData, EventDescription eventDescrip) {
U castCorbaData = null;
try {
castCorbaData = receiver.getEventType().cast(eventData);
}
catch (ClassCastException ex) {
// This should never happen and would be an ACS error
logger.warning("Failed to deliver incompatible data '" + eventData.getClass().getName() +
"' to subscriber '" + receiver.getEventType().getName() + "'. Fix data subscription handling in " + getClass().getName() + "!");
}
// user code errors (runtime ex etc) we let fly up
receiver.receive(castCorbaData, eventDescrip);
}
@Override
public String getLifecycleState() {
return stateMachine.getCurrentState();
}
/**
* Use only for unit testing!
*/
public boolean hasGenericReceiver() {
return ( genericReceiver != null );
}
/**
* Use only for unit testing!
*/
public int getNumberOfReceivers() {
return receivers.size();
}
////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////// AcsEventSubscriber impl //////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////
/**
* Subscribes to all events. The latest generic receiver displaces the previous one.
* null
in case of generic subscription.
*/
protected abstract void notifyFirstSubscription(Class> structClass) throws AcsJEventSubscriptionEx;
/**
* @param structClass
* @throws AcsJEventSubscriptionEx
*/
protected abstract void notifySubscriptionRemoved(Class> structClass) throws AcsJEventSubscriptionEx;
/**
*
*/
protected abstract void notifyNoSubscription();
@Override
public boolean isSuspended() {
return stateMachine.isStateActive("EnvironmentCreated::Connected::Suspended");
}
public final boolean isDisconnected() {
return (
stateMachine.isStateActive("EnvironmentCreated::Disconnected") ||
stateMachine.isStateActive("EnvironmentUnknown")
);
}
}