/******************************************************************************* * ALMA - Atacama Large Millimiter Array * (c) European Southern Observatory, 2011 * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * "@(#) $Id: bulkDataNTReceiverStream.i,v 1.26 2012/09/13 10:59:14 bjeram Exp $" * * who when what * -------- -------- ---------------------------------------------- * bjeram 2011-04-19 created */ /************************************************************************ * *---------------------------------------------------------------------- */ #include // error definition ?? #include // we need it for TAO_Tokenizer ?? using namespace AcsBulkdata; using namespace ACS_BD_Errors; template BulkDataNTReceiverStream::BulkDataNTReceiverStream(const char* streamName, const ReceiverStreamConfiguration &cfg, bool enabledCallingCBforAllFlows) : BulkDataNTReceiverStreamBase(streamName, cfg), notRemoveFromMap_m(false), enabledCallingCBforAllFlows_m(enabledCallingCBforAllFlows) { AUTO_TRACE(__PRETTY_FUNCTION__); ACS_LOG(LM_RUNTIME_CONTEXT, __FUNCTION__, (LM_INFO, "Receiver Stream: %s has been created.", streamName)); } template BulkDataNTReceiverStream::BulkDataNTReceiverStream(const char* receiverName, const char* streamName, const ReceiverStreamConfiguration &cfg, bool enabledCallingCBforAllFlows) : BulkDataNTReceiverStreamBase(receiverName, streamName, cfg), notRemoveFromMap_m(false), enabledCallingCBforAllFlows_m(enabledCallingCBforAllFlows) { AUTO_TRACE(__PRETTY_FUNCTION__); ACS_LOG(LM_RUNTIME_CONTEXT, __FUNCTION__, (LM_INFO, "Receiver Stream: %s with receiver name: %s has been created.", streamName, receiverName)); } template BulkDataNTReceiverStream::~BulkDataNTReceiverStream() { AUTO_TRACE(__PRETTY_FUNCTION__); notRemoveFromMap_m = true; //elements should not be removed from the map ReceiverFlowMap::iterator i = receiverFlows_m.begin(); for(;i!=receiverFlows_m.end(); i++) delete (i->second); receiverFlows_m.clear(); ACS_LOG(LM_RUNTIME_CONTEXT, __FUNCTION__, (LM_INFO, "Receiver Stream: %s has been destroyed.", streamName_m.c_str())); }//~BulkDataNTReceiverStream template BulkDataNTReceiverFlow* BulkDataNTReceiverStream::createFlow(const char *flowName, const ReceiverFlowConfiguration &cfg, BulkDataNTCallback *cb, bool releaseCB) { AUTO_TRACE(__PRETTY_FUNCTION__); BulkDataNTCallback *callback=0; BulkDataNTReceiverFlow* flow; if (this->existFlow(flowName)) { FlowAlreadyExistsExImpl ex(__FILE__, __LINE__, __PRETTY_FUNCTION__); ex.setStreamName(streamName_m.c_str()); ex.setFlowName(flowName); throw ex; }//if try{ callback = (cb==0) ? new TReceiverCallback() : cb; flow = new BulkDataNTReceiverFlow(this, flowName, cfg, callback, (cb==0)||releaseCB); receiverFlows_m.insert(std::pair(flowName, flow)); if (enabledCallingCBforAllFlows_m==false) flow->disableCallingCB(); return flow; }catch(const ACSErr::ACSbaseExImpl &acsEx) { FlowCreateProblemExImpl ex(acsEx, __FILE__, __LINE__, __PRETTY_FUNCTION__); ex.setStreamName(streamName_m.c_str()); ex.setFlowName(flowName); throw ex; } catch(const std::exception &stdEx) { ACSErrTypeCommon::StdExceptionExImpl stdExWrapper(__FILE__, __LINE__, __PRETTY_FUNCTION__); stdExWrapper.setWhat(stdEx.what()); FlowCreateProblemExImpl ex(stdExWrapper, __FILE__, __LINE__, __PRETTY_FUNCTION__); ex.setStreamName(streamName_m.c_str()); ex.setFlowName(flowName); throw ex; } catch(...) { ACSErrTypeCommon::UnexpectedExceptionExImpl uex(__FILE__, __LINE__,__PRETTY_FUNCTION__); FlowCreateProblemExImpl ex(uex, __FILE__, __LINE__, __PRETTY_FUNCTION__); ex.setStreamName(streamName_m.c_str()); ex.setFlowName(flowName); throw ex; }//try-catch }//createFlow template BulkDataNTReceiverFlow* BulkDataNTReceiverStream::getFlow(const char *flowName) { AUTO_TRACE(__PRETTY_FUNCTION__); ReceiverFlowMap::iterator iter = receiverFlows_m.find(flowName); if ( iter != receiverFlows_m.end() ) { return iter->second; } else { FlowNotExistExImpl ex(__FILE__, __LINE__, __PRETTY_FUNCTION__); ex.setStreamName(streamName_m.c_str()); ex.setFlowName(flowName); throw ex; } }//getFlow template bool BulkDataNTReceiverStream::existFlow(const char* flowName) { AUTO_TRACE(__PRETTY_FUNCTION__); return ( receiverFlows_m.find(flowName) != receiverFlows_m.end() ); }//existFlow template void BulkDataNTReceiverStream::removeFlowFromMap(const char* flowName) { if (notRemoveFromMap_m) return; AUTO_TRACE(__PRETTY_FUNCTION__); // could we jsut use receiverFlows_m.erase(flowname); ?? ReceiverFlowMap::iterator iter = receiverFlows_m.find(flowName); if ( iter != receiverFlows_m.end() ) { receiverFlows_m.erase(iter); } else { //TBD: error handling } }//removeFlowFromMap template void BulkDataNTReceiverStream::createMultipleFlowsFromConfig(const char *config) { try { if(ACE_OS::strcmp(config, "") == 0) { ReceiverFlowConfiguration cfg; //just temporary createFlow("00", cfg); return; } TAO_Tokenizer addressToken(config, '/'); int numOtherFeps = addressToken.num_tokens(); if(numOtherFeps > 19) { ACS_SHORT_LOG((LM_ERROR,"BulkDataReceiver<>::createMultipleFlows too many flows specified - maximum 19")); ACSBulkDataError::AVInvalidFlowNumberExImpl err = ACSBulkDataError::AVInvalidFlowNumberExImpl(__FILE__,__LINE__,"BulkDataReceiver::createMultipleFlows"); throw err; } char strFlowNumber[2]; for (int i=0; icreateFlow(strFlowNumber, cfg); }//for }catch(const ACSErr::ACSbaseExImpl &ex) { ex.log(); } catch(...) { printf("... ERROR in createMultipleFlows using fepsConfig\n"); }//try-catch } template std::vector BulkDataNTReceiverStream::getFlowNames() { std::vector flowNames; ReceiverFlowMap::iterator i = receiverFlows_m.begin(); for(;i!=receiverFlows_m.end(); i++) flowNames.push_back(i->first); return flowNames; }//getFlowNames template unsigned int BulkDataNTReceiverStream::getFlowNumber() { return receiverFlows_m.size(); }//getFlowNumber template void BulkDataNTReceiverStream::enableCallingCBforAllFlows() { AUTO_TRACE(__PRETTY_FUNCTION__); ReceiverFlowMap::iterator i = receiverFlows_m.begin(); for(;i!=receiverFlows_m.end(); i++) (i->second)->enableCallingCB(); enabledCallingCBforAllFlows_m=true; }//enableCallingCBforAllFlows template void BulkDataNTReceiverStream::disableCallingCBforAllFlows() { AUTO_TRACE(__PRETTY_FUNCTION__); ReceiverFlowMap::iterator i = receiverFlows_m.begin(); for(;i!=receiverFlows_m.end(); i++) (i->second)->disableCallingCB(); enabledCallingCBforAllFlows_m=false; }//disableCallingCBforAllFlows