/*******************************************************************************
* ALMA - Atacama Large Millimeter Array
* Copyright (c) ESO - European Southern Observatory, 2011
* (in the framework of the ALMA collaboration).
* All rights reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*******************************************************************************/
package alma.acs.nsstatistics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.StringUtils;
import org.omg.CORBA.OBJECT_NOT_EXIST;
import org.omg.CORBA.ORB;
import org.omg.CosNaming.Binding;
import org.omg.CosNaming.BindingIteratorHolder;
import org.omg.CosNaming.BindingListHolder;
import org.omg.CosNaming.NameComponent;
import org.omg.CosNaming.NamingContext;
import org.omg.CosNaming.NamingContextHelper;
import org.omg.CosNaming.NamingContextPackage.CannotProceed;
import org.omg.CosNaming.NamingContextPackage.NotFound;
import org.omg.CosNotifyChannelAdmin.AdminNotFound;
import org.omg.CosNotifyChannelAdmin.ChannelNotFound;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannel;
import org.omg.CosNotifyChannelAdmin.EventChannelFactory;
import org.omg.CosNotifyChannelAdmin.EventChannelFactoryHelper;
import org.omg.CosNotifyChannelAdmin.EventChannelHelper;
import org.omg.CosNotifyChannelAdmin.ProxyNotFound;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynAnyFactoryHelper;
import gov.sandia.CosNotification.NotificationServiceMonitorControl;
import gov.sandia.CosNotification.NotificationServiceMonitorControlHelper;
import alma.ACSErrTypeCommon.wrappers.AcsJIllegalArgumentEx;
import alma.ACSErrTypeCommon.wrappers.AcsJUnexpectedExceptionEx;
import alma.acs.component.client.AdvancedComponentClient;
import alma.acs.concurrent.ThreadLoopRunner;
import alma.acs.concurrent.ThreadLoopRunner.CancelableRunnable;
import alma.acs.concurrent.ThreadLoopRunner.ScheduleDelayMode;
import alma.acs.container.ContainerServices;
import alma.acs.exceptions.AcsJException;
import alma.acs.logging.AcsLogLevel;
import alma.acs.logging.ClientLogManager;
import alma.acs.nc.ArchiveConsumer;
import alma.acs.nc.Helper;
import alma.acs.nc.NCSubscriber;
import alma.acs.util.AcsLocations;
import alma.acscommon.ACS_NC_DOMAIN_ARCHIVING;
import alma.acscommon.ACS_NC_DOMAIN_LOGGING;
import alma.acscommon.NOTIFICATION_FACTORY_NAME;
/**
* @author jschwarz, hsommer
*
* $Id: EventModel.java,v 1.39 2013/02/22 15:36:44 hsommer Exp $
*/
public class EventModel {
private final static String nsStatsId = "nsStatistics";
/**
* Singleton instance, used by GUI layer classes to access the model.
*/
private static EventModel modelInstance;
/**
* Key = service ID (e.g. "AlarmNotifyEventChannelFactory")
* Value = NotifyServiceData object
*
* This is the root of our data model. From here we can go to NCs and their attributes.
*/
private final Map
* Todo: we could keep the consumer reference in the matching ChannelData object
* and eliminate this map.
*/
private final HashMap
* TODO: move the detection of NotifyService out of the ctor, to make the application start faster.
* This would require a first-time service-detection-independently-of-NCs mechanism.
*
* We declare
* This method is broken out from {@link #discoverNotifyServicesAndChannels()} to make it more readable.
* It should be called only from there, to keep services and NCs aligned.
*
* @param bindingMap Name service bindings in the form key = bindingName, value = bindingKind
*/
private synchronized void discoverNotifyServices(Map
* This method is broken out from {@link #discoverNotifyServicesAndChannels(boolean)} to make it more readable.
* It should be called only from there, to keep services and NCs aligned.
*
* @param bindingMap Name service bindings in the form key = bindingName, value = bindingKind
*/
private synchronized void discoverChannels(Map
* This method may get called when the notify service is not reachable.
* Therefore we do not return the narrowed object, since the narrow operation makes a remote "is_a" call
* that in this case would take very long.
*/
private org.omg.CORBA.Object resolveNotifyService(String notifyBindingName)
throws CannotProceed, org.omg.CosNaming.NamingContextPackage.InvalidName, NotFound {
NameComponent[] ncomp = new NameComponent[1];
ncomp[0] = new NameComponent(notifyBindingName, "");
return nctx.resolve(ncomp);
}
/**
* Resolves the TAO monitor-control object that is bound in the naming service with the given name.
*/
private NotificationServiceMonitorControl resolveMonitorControl(String notifyBindingName)
throws CannotProceed, org.omg.CosNaming.NamingContextPackage.InvalidName, NotFound {
String name = "MC_" + notifyBindingName;
NameComponent[] ncomp = new NameComponent[1];
ncomp[0] = new NameComponent(name, "");
NotificationServiceMonitorControl nsmc = NotificationServiceMonitorControlHelper.narrow(nctx.resolve(ncomp));
return nsmc;
}
/**
* Simplifies the notify service ID by cutting off the trailing "NotifyEventChannelFactory".
* For the default NC service, whose ID is only this suffix, it returns "DefaultNotifyService".
* @param id
* @return "Logging", "Alarm", "DefaultNotifyService", "MyRealtimeNotifyService", ...
*/
private String simplifyNotifyServiceName(String id) {
String displayName = id.substring(0, id.indexOf(NOTIFICATION_FACTORY_NAME.value));
if (displayName.isEmpty()) {
displayName = "DefaultNotifyService";
}
return displayName;
}
public NotifyServices getNotifyServicesRoot() {
return new NotifyServices(notifyServices);
}
/**
* Called by NotifyServiceUpdateJob (single/periodic refresh of service summary / channel tree).
*/
public synchronized boolean getChannelStatistics() {
if(false == discoverNotifyServicesAndChannels())
{
return false;
}
for (NotifyServiceData nsData : notifyServices.values()) {
if (!nsData.isReachable()) {
// we skip services that were unreachable already in the above discoverNotifyServicesAndChannels call
continue;
}
// iterate over NCs
// TODO: Try to rely only on MC data and skip these 'get_all_consumeradmins' etc calls,
// especially if we don't want to display the admin objects as tree nodes to show consumer allocation to the shared admins.
for (ChannelData channelData : nsData.getChannels()) {
String channelName = channelData.getQualifiedName();
EventChannel ec = channelData.getCorbaRef();
int[] consAndSupp = {0,0}; // initial or previous count of consumers / suppliers
if (channelData.isNewNc()) {
lastConsumerAndSupplierCount.put(channelName, consAndSupp);
}
else if (lastConsumerAndSupplierCount.containsKey(channelName)) {
consAndSupp = lastConsumerAndSupplierCount.get(channelName);
}
// for consumers we must count the proxies, cannot just deduce their number from the consumer admins
int consumerCount = 0;
for (int consumerAdminId : ec.get_all_consumeradmins()) {
try {
ConsumerAdmin consumerAdmin = ec.get_consumeradmin(consumerAdminId);
int[] push_suppliers_ids = consumerAdmin.push_suppliers();
for(int proxyID: push_suppliers_ids) {
try {
ProxySupplier proxy = consumerAdmin.get_proxy_supplier(proxyID);
if(!NCSubscriber.AdminReuseCompatibilityHack.isDummyProxy(proxy)) {
consumerCount++;
}
} catch(ProxyNotFound ex) {
m_logger.log(AcsLogLevel.NOTICE, "Proxy with ID='" + proxyID + "' not found for consumer admin with ID='" + consumerAdminId + "', " +
"even though this Id got listed a moment ago.", ex);
}
}
} catch (AdminNotFound ex) {
ex.printStackTrace();
}
}
final String[] roleNames = {"consumer", "supplier"};
int [] proxyCounts = new int[2];
int [] proxyDeltas= new int[2];
proxyCounts[0] = consumerCount;
proxyCounts[1] = ec.get_all_supplieradmins().length; // currently for suppliers we have 1 admin object per supplier
// same code for consumer and supplier
for (int i = 0; i < proxyCounts.length; i++) {
String cstr = channelName;
int cdiff = proxyCounts[i] - consAndSupp[i];
if (cdiff != 0) {
if (cdiff > 0) {
cstr += " has added " + cdiff + " " + roleNames[i];
}
else if (cdiff < 0) {
cstr += " has removed " + (-cdiff) + " " + roleNames[i];
}
cstr += (Math.abs(cdiff)!=1 ? "s." : ".");
m_logger.info(cstr);
}
proxyDeltas[i] = cdiff;
}
lastConsumerAndSupplierCount.put(channelName, proxyCounts);
//m_logger.info("Channel: " + channelName + " has " + adminCounts[0] + " consumers and " + adminCounts[1] + " suppliers.");
channelData.setNumberConsumers(proxyCounts[0]);
channelData.setNumberSuppliers(proxyCounts[1]);
channelData.setDeltaConsumers(proxyDeltas[0]);
channelData.setDeltaSuppliers(proxyDeltas[1]);
}
}
return true;
}
/**
* Resolves a notification channel in the naming service.
*
* @return Reference to the event channel specified by channelName.
* @param bindingName
* Name of the event channel and trailing domain name, as the NC is registered with the CORBA Naming Service
* @throws AcsJException
* Standard ACS Java exception.
*/
protected EventChannel resolveNotificationChannel(String bindingName) throws AcsJException {
EventChannel retValue = null;
String nameServiceKind = alma.acscommon.NC_KIND.value;
//m_logger.info("Will call 'nctx.resolve' for binding='" + bindingName + "', kind='" + nameServiceKind + "'.");
try {
NameComponent[] t_NameSequence = { new NameComponent(bindingName, nameServiceKind) };
retValue = EventChannelHelper.narrow(nctx.resolve(t_NameSequence));
}
catch (OBJECT_NOT_EXIST ex) {
m_logger.severe("The NC '" + bindingName + "' no longer exists, probably because its notify service was restarted. The naming service still lists this NC.");
throw new AcsJUnexpectedExceptionEx(ex);
}
catch (org.omg.CosNaming.NamingContextPackage.NotFound e) {
// No other suppliers have created the channel yet
m_logger.info("The '" + bindingName + "' channel has not been created yet.");
throw new AcsJUnexpectedExceptionEx(e);
}
catch (org.omg.CosNaming.NamingContextPackage.CannotProceed e) {
// Think there is virtually no chance of this every happening but...
throw new AcsJUnexpectedExceptionEx(e);
}
catch (org.omg.CosNaming.NamingContextPackage.InvalidName e) {
// Think there is virtually no chance of this every happening but...
throw new AcsJUnexpectedExceptionEx(e);
}
return retValue;
}
public ContainerServices getContainerServices() {
return cs;
}
private AdminConsumer getAdminConsumer(String channelName) throws AcsJException {
synchronized (consumerMap) {
if (!consumerMap.containsKey(channelName)) {
AdminConsumer adm = new AdminConsumer(channelName, cs, nctx, equeue);
adm.setPrintDetails(true);
consumerMap.put(channelName, adm);
return adm;
}
return consumerMap.get(channelName);
}
}
/**
* Called from SubscribeNCHandler.
* @throws AcsJException
*/
public void addChannelSubscription(ChannelData channelData ) throws AcsJException {
String channelName = channelData.getName();
AdminConsumer consumer = null;
synchronized (consumerMap) {
// The subscription flag and the existence of a consumer for that NC must be in sync
if (channelData.isSubscribed() != consumerMap.containsKey(channelName)) {
m_logger.warning("Inconsistent state between channel flag and consumer map. Will not subscribe to " + channelName);
return;
}
else if (channelData.isSubscribed()) {
m_logger.warning("Already subscribed to " + channelName + ". Ignoring subscription request.");
return;
}
channelData.setSubscribed(true);
consumer = getAdminConsumer(channelName);
}
consumer.startReceivingEvents();
}
/**
* TODO: Call this from mouse menu of Archiving NC instead of in the beginning.
* (It used to be called once a minute from a thread of the event list / archiving list parts.)
*
* Creates on demand an ArchiveConsumer and stores its reference in field {@link #archiveConsumer}.
*/
private synchronized void getArchiveConsumer() {
if (archiveConsumer == null) {
try {
archiveConsumer = new ArchiveConsumer(new ArchiveReceiver(archQueue), cs, nctx);
archiveConsumer.startReceivingEvents();
m_logger.info("Subscribed to monitoring/archiving events.");
} catch (AcsJException ex) {
m_logger.log(Level.WARNING, "Failed to subcribe to monitoring/archiving events.", ex);
}
}
}
public DynAnyFactory getDynAnyFactory() {
return dynAnyFactory;
}
public void closeSelectedConsumer(ChannelData channelData) {
String channelName = channelData.getName();
synchronized (consumerMap) {
if (consumerMap.containsKey(channelName)) {
AdminConsumer consumer = consumerMap.get(channelName);
try {
consumer.disconnect();
} catch (Exception ex) {
m_logger.log(Level.WARNING, "Failed to close subscriber: ", ex);
}
consumerMap.remove(channelName);
}
}
channelData.setSubscribed(false);
}
public void closeAllConsumers() {
synchronized (consumerMap) {
for (ChannelData channelData : getNotifyServicesRoot().getAllChannels()) {
if (channelData.isSubscribed()) {
closeSelectedConsumer(channelData);
}
}
}
}
/**
* TODO: Call from mouse menu handler for the archiving NC
*/
public synchronized void closeArchiveConsumer() {
if (archiveConsumer != null) {
try {
archiveConsumer.disconnect();
m_logger.info("Closed subscriber for baci monitoring events");
} catch (Exception ex) {
m_logger.log(Level.WARNING, "Got an exception disconnecting the archive consumer.", ex);
}
archiveConsumer = null;
}
}
public void tearDown() {
try {
closeAllConsumers();
closeArchiveConsumer();
unreachableServiceChecker.shutdown(10, TimeUnit.SECONDS);
acc.tearDown();
} catch (Exception ex) {
m_logger.log(Level.WARNING, "Error in EventModel#tearDown: ", ex);
}
}
/**
* Note that this method may be called very frequently, to determine whether menu items are selected/enabled, thus it must be fast.
*/
public boolean isSubscribed(ChannelData channelData) {
return channelData.isSubscribed();
}
}
Throwable
so that this constructor can catch, print and re-throw even the nastiest errors,
* which in case of VerifyError etc are otherwise not well shown by the Eclipse container.
*/
private EventModel() throws Throwable {
try {
notifyServices = new HashMaptrue
if the notify service exists.
* @throws AcsJIllegalArgumentEx if notifyServiceObj is reachable but not a CosNotifyChannelAdmin/EventChannelFactory
,
* which should never happen because of the filtering we do on the name service bindings.
*/
private boolean isNotifyServiceReachable(org.omg.CORBA.Object notifyServiceObj, String notifyServiceBindingName) throws AcsJIllegalArgumentEx {
boolean ret = false;
if (notifyServiceObj != null) {
try {
if (notifyServiceObj._is_a(EventChannelFactoryHelper.id())) {
ret = true;
}
else {
AcsJIllegalArgumentEx ex = new AcsJIllegalArgumentEx();
ex.setVariable("notifyServiceObj");
ex.setValue("Type=" + notifyServiceObj.getClass().getName());
throw ex;
}
}
catch (Exception ex) {
// If the service has died, this will be a org.omg.CORBA.TRANSIENT
// TODO: Use retry/timeout config etc to not wait 10 seconds for this exception
m_logger.log(Level.SEVERE, "Notify service '" + notifyServiceBindingName + "' is not reachable.");
}
}
return ret;
}
/**
* Checks if the given NotifyServiceData references a reachable notify service,
* updates the MC reference if the service is reachable now but was not reachable last time,
* and updates the "isReachable" flag.
* @param notifyService The notify service "proxy" to check.
*/
private void updateReachability(NotifyServiceData notifyService) {
boolean wasReachableBefore = notifyService.isReachable();
boolean isReachableNow = false;
try {
org.omg.CORBA.Object serviceRef = notifyService.getEventChannelFactory();
if (serviceRef == null) {
// This happens if the notify service was unreachable when the eventGUI was started
serviceRef = resolveNotifyService(notifyService.getFactoryName());
}
isReachableNow = isNotifyServiceReachable(serviceRef, notifyService.getFactoryName());
if (isReachableNow && notifyService.getEventChannelFactory() == null) {
notifyService.updateEventChannelFactory(EventChannelFactoryHelper.narrow(serviceRef));
}
} catch (AcsJIllegalArgumentEx ex) {
// This ex cannot happen... the corba object wrapped by NotifyServiceData surely is a notify service.
} catch (Exception ex) {
}
if (isReachableNow && !wasReachableBefore) {
try {
notifyService.updateMC(resolveMonitorControl(notifyService.getFactoryName()));
} catch (Exception ex) {
// This is unexpected, because if the notify service itself is reachable, then we expect also the associated MC object to be reachable.
// We count this as "not reachable" in total.
isReachableNow = false;
}
}
notifyService.setReachable(isReachableNow);
}
/**
* Checks the naming service for NC instances and stores / updates them under the matching service from {@link #notifyServices}.
* It does not query the NCs for consumers etc. though.
*