00001 #define DV_DLL
00002 #include "md_device.h"
00003 #include "../server/Listener.cpp"
00004 #include "../server/EventSender.cpp"
00005 #include <time.h>
00006 #include <boost/thread/mutex.hpp>
00007 #include <boost/foreach.hpp>
00008 #include "telemetry.h"
00009
00010 using namespace std;
00011 using boost::asio::ip::udp;
00012
00013 void dvWQ();
00014 void hbCallback(const boost::system::error_code&);
00015 void stream();
00016 void trCallback(const boost::system::error_code& error);
00017
00018
00019 boost::mutex mutAlive;
00020 boost::mutex mdQuery;
00021 boost::condition_variable deviceRunning;
00022 boost::condition_variable mdObsQPending;
00023 boost::condition_variable mdODEQPending;
00024 boost::condition_variable mdCmdQPending;
00025 bool deviceIsRunning=false;
00026
00027 void mdEmbedded::dispatch(mdWQitem *next) {
00028
00029 bool gc = true;
00030 dvQueryMD *thisQ = (dvQueryMD *)next->what;
00031 dvTelemetryFrame *thisF = (dvTelemetryFrame *)next->what;
00032
00033 switch(next->kind) {
00034 case CD_FRAME:
00035 cd->send_to(thisF->frame.dg, thisF->dest );
00036 break;
00037 case DV_MDQUERY:
00038 gc = false;
00039 md->send_to(thisQ->mdg.dg, MDDEV_MD );
00040 break;
00041 default:;
00042 break;
00043 }
00044 if (gc) delete next;
00045
00046 }
00047 void mdEmbedded::processEvent( const dvIncoming &dgEvent )
00048 {
00049 const char *name;
00050 assert(EventSender<dvIncoming>::isSending());
00051
00052 switch(dgEvent.mdg.dg.hdr.msgType) {
00053 case MDDG_MDQUERY:
00054 name = &dgEvent.mdg.dg.payLoad[dgEvent.mdg.dg.hdr.primeOffset];
00055 theseLogs.logN(1,"Got response to query re: '%s' from MD",name);
00056 switch(dgEvent.mdg.dg.hdr.dgSubType) {
00057 case MDDG_REGSCPI:
00058 thisCmdQry.response = (dvIncoming *)&dgEvent;
00059 mdCmdQPending.notify_one();
00060 break;
00061 case MDDG_REGODE:
00062 thisODEQry.response = (dvIncoming *)&dgEvent;
00063 mdODEQPending.notify_one();
00064 break;
00065 case MDDG_REGOBS:
00066 thisObsQry.response = (dvIncoming *)&dgEvent;
00067 mdObsQPending.notify_one();
00068 break;
00069 }
00070 break;
00071 case MDDG_NEWBORN:
00072 switch(dgEvent.mdg.dg.hdr.clientType) {
00073 case MDDEV_MD:
00074 if (dgEvent.mdg.dg.hdr.sinkHandle) { const char *rawCD = dgEvent.mdg.dg.payLoad;
00075 const unsigned short *rawCDport = (unsigned short *)(&dgEvent.mdg.dg.payLoad[0] + strlen(rawCD) + 1);
00076 myHandle = dgEvent.mdg.dg.hdr.sinkHandle;
00077 theseLogs.logN(3,"Got handle (%d) from MD, device will use telemetry CD at %s:%d.",myHandle,rawCD,*rawCDport);
00078 thisConfig->cdAddress = std::string(rawCD);
00079 thisConfig->cdPort = *rawCDport;
00080 }
00081 else if (!myHandle)
00082 theseLogs.logN(0,"MD rejected natal sequence. Please kill me and call tech. support.");
00083 break;
00084 }
00085 }
00086
00087 }
00088 void mdEmbedded::processEvent( const dvHeartbeat &thisPulse )
00089 {
00090 assert(EventSender<dvHeartbeat>::isSending());
00091
00092 myPulse.mdg.dg.hdr.msgSN = sentBeats++;
00093 myPulse.mdg.dg.hdr.sourceHandle = myHandle;
00094 strcpy(myPulse.mdg.dg.payLoad,thisConfig->telemetryPortStr.c_str());
00095 myPulse.mdg.dg.hdr.primeOffset = strlen(thisConfig->telemetryPortStr.c_str()) + 1;
00096 strcpy((char *)(&myPulse.mdg.dg.payLoad[myPulse.mdg.dg.hdr.primeOffset]),thisConfig->deviceName.c_str());
00097 myPulse.mdg.dg.hdr.payloadSize = strlen(thisConfig->telemetryPortStr.c_str()) + thisConfig->deviceName.length() + 2;
00098 md->send_to(myPulse.mdg.dg, MDDEV_MD );
00099 theseLogs.logNdebug(MAX_DEBUG,1,"Heartbeat (%d)",sentBeats);
00100
00101 }
00102 void mdEmbedded::processEvent( const dvShutdown &bye )
00103 {
00104 assert(EventSender<dvShutdown>::isSending());
00105 shuttingDown = true;
00106 theseLogs.logN(0,"Shutting down: draining work for immediate exit.");
00107
00108 }
00109 void mdEmbedded::processEvent( const dvQueryMD &thisQuery )
00110 {
00111 const void *queued = &thisQuery;
00112
00113 assert(EventSender<dvQueryMD>::isSending());
00114 queue(new mdWQitem( queued, DV_MDQUERY, 0 ));
00115
00116 }
00117 void mdEmbedded::processEvent( const dvResponse &thisReply )
00118 {
00119 const void *queued = &thisReply;
00120
00121 assert(EventSender<dvResponse>::isSending());
00122 queue(new mdWQitem( queued, MD_NEWBORN, 0 ));
00123
00124 }
00125 void mdEmbedded::processEvent( const dvTelemetryFrame &frameEv )
00126 {
00127 assert(EventSender<dvTelemetryFrame>::isSending());
00128
00129 const void *queued = &frameEv;
00130 std::pair <std::string,mdObservable *> obsPair;
00131 std::pair <std::string,mdOperationalDataElement *> odePair;
00132 mdTelemetryFrame tFrame(frameEv.frame.dg.payLoad,sizeof(frameEv.frame.dg.payLoad));
00133
00134 tFrame.newOut();
00135
00136 BOOST_FOREACH( obsPair, myObs )
00137 { tFrame.pack( obsPair.second, obsPair.first ); }
00138
00139 BOOST_FOREACH( odePair, myODEs )
00140 { tFrame.pack( odePair.second, odePair.first ); }
00141
00142 queue(new mdWQitem( queued, CD_FRAME, 0 ));
00143
00144 }
00145 void mdEmbedded::run() {
00146
00147 boost::thread work(dvWQ);
00148 assert(work.joinable());
00149 theseLogs.logNdebug(MAX_DEBUG,0,"MD Embedded premptible worker started.");
00150 work.join();
00151
00152 }
00153 void mdDGChannel::handle_receive_from(const boost::system::error_code& error,
00154 size_t bytes_recvd)
00155 {
00156 if (!error && bytes_recvd > 0)
00157 {
00158 dvIncoming incoming( *(thisDevice->cd) );
00159
00160 if (incoming.mdg.dg.hdr.clientType >= 0 && incoming.mdg.dg.hdr.clientType < N_MDDEV_TYPES)
00161 {theseLogs.logNdebug(MAX_DEBUG,2,"msgtype %d received from a '%s'.",incoming.mdg.dg.hdr.msgType,clientTypes[incoming.mdg.dg.hdr.clientType]);
00162 incoming.ip = p_endpoint_;
00163 incoming.send();
00164 } else
00165 theseLogs.logNdebug(1,1,"msgtype %d received from unknown MD client type, ignored.",incoming.mdg.dg.hdr.msgType);
00166
00167 }
00168 passive_.async_receive_from(
00169 boost::asio::buffer(data_, MD_MAX_DATAGRAM), p_endpoint_,
00170 boost::bind(&mdDGChannel::handle_receive_from, this,
00171 boost::asio::placeholders::error,
00172 boost::asio::placeholders::bytes_transferred));
00173
00174 }
00175 void mdDVHeartbeat::nextBeat(const boost::system::error_code& error) { thisDevice->myPulse.send(); }
00176 void mdDVHeartbeat::run() {
00177
00178 int i;
00179
00180 while (!thisDevice->shuttingDown)
00181 {t0->async_wait(hbCallback);
00182 i = (thisDevice->myHandle) ? 60 : 10;
00183 boost::system_time const alarum=boost::get_system_time() + boost::posix_time::seconds(i*MD_HEARTBEAT);
00184 boost::this_thread::sleep(alarum);
00185 }
00186
00187 }
00188
00189
00190
00191 void hbCallback(const boost::system::error_code& error) { if (thisDevice->alive) thisDevice->pulse->nextBeat(error); }
00192 void pulse() { if ((thisDevice->connected=thisDevice->md->connect_to( thisConfig->mdAddress, thisConfig->telemetryPortStr )))
00193 theseLogs.logNdebug(NORMAL_DEBUG,2,"device connect_to: MD @ %s port %s.", thisConfig->mdAddress.c_str(),thisConfig->telemetryPortStr.c_str());
00194 else
00195 theseLogs.logNdebug(NORMAL_DEBUG*2,2,"device connect_to: MD @ %s port %s failed.", thisConfig->mdAddress.c_str(),thisConfig->telemetryPortStr.c_str());
00196 if (thisDevice->connected)
00197 thisDevice->pulse->run();
00198 }
00199 void runEmbedded() {
00200
00201 try {
00202
00203 thisDevice = new mdEmbedded(thisConfig);
00204 cb[thisDevice->mdStdDevIdx] = new mdCB();
00205
00206 EventSender<dvHeartbeat>::add(*thisDevice);
00207 assert(EventSender<dvHeartbeat>::getNumListeners() == 1);
00208 EventSender<dvIncoming>::add(*thisDevice);
00209 assert(EventSender<dvIncoming>::getNumListeners() == 1);
00210 EventSender<dvQueryMD>::add(*thisDevice);
00211 assert(EventSender<dvQueryMD>::getNumListeners() == 1);
00212 EventSender<dvResponse>::add(*thisDevice);
00213 assert(EventSender<dvResponse>::getNumListeners() == 1);
00214 EventSender<dvShutdown>::add(*thisDevice);
00215 assert(EventSender<dvShutdown>::getNumListeners() == 1);
00216 EventSender<dvTelemetryFrame>::add(*thisDevice);
00217 assert(EventSender<dvTelemetryFrame>::getNumListeners() == 1);
00218
00219 thisDevice->cd = new mdDGChannel( io_, 0 , 1 );
00220 thisDevice->md = new mdDGChannel( io_, 0 , 0 );
00221
00222 } catch(...) {
00223 theseLogs.logNdebug(-1,0,"Unknown error in device initialization block.");
00224 }
00225
00226 theseLogs.logNdebug(NORMAL_DEBUG*2,0,"Device instantiated, starting MD embedded foreground.");
00227
00228 deviceIsRunning = true;
00229 deviceRunning.notify_all();
00230
00231 io_.run();
00232 theseLogs.logNdebug(0,0,"runEmbedded asio ended");
00233
00234 }
00235 void runDevice() { thisDevice->run(); }
00236 void runDataLayer() {
00237
00238 boost::unique_lock<boost::mutex> liveLock(mutAlive);
00239
00240 if (!deviceIsRunning) {
00241 theseLogs.logN(1,"Waiting for device to initialize MD background on port %d ...",thisConfig->telemetryPort);
00242 deviceRunning.wait(liveLock);
00243 }
00244 theseLogs.logN(1,"Initializing MD background on port %d ...",thisConfig->telemetryPort);
00245
00246 try {
00247
00248 thisDevice->cd = new mdDGChannel(io_, thisConfig->telemetryPort );
00249 theseLogs.logN(0,"... main bus background running.");
00250 thisDevice->pulse = new mdDVHeartbeat();
00251 thisDevice->pulse->init();
00252 boost::thread myPulse(pulse);
00253 mdDDAPI->data_layer = new boost::thread(runDevice);
00254 assert(myPulse.joinable());
00255 assert(mdDDAPI->data_layer->joinable());
00256 mdDDAPI->telemetry = new boost::thread(stream);
00257 assert(mdDDAPI->telemetry->joinable());
00258 thisDevice->alive = true;
00259 io_.run();
00260 myPulse.join();
00261 mdDDAPI->telemetry->join();
00262
00263 }
00264 catch (std::exception& e)
00265 {
00266 theseLogs.logN(1,"Fatal error in data layer: %s .",e.what());
00267 }
00268 catch (...)
00269 {
00270 theseLogs.logN(0,"Unknown failure in datalayer.");
00271 }
00272
00273 theseLogs.logNdebug(1,0,"asio background io service ended.");
00274
00275 }
00276 void setStrMsg(mdDG &mdg,const char *str,
00277 mdDGtype md_DG_type,mdDGtype md_DG_subtype,
00278 const char *extraString="" ) {
00279
00280 mdg.dg.hdr.msgType = md_DG_type;
00281 mdg.dg.hdr.dgSubType = md_DG_subtype;
00282 mdg.dg.hdr.clientType = thisConfig->thisDeviceType;
00283 strcpy(&mdg.dg.payLoad[0],(char *)str);
00284 mdg.dg.hdr.primeOffset = strlen((char *)str) + 1;
00285 strcpy((char *)(&mdg.dg.payLoad[mdg.dg.hdr.primeOffset]),extraString);
00286 mdg.dg.hdr.payloadSize = mdg.dg.hdr.primeOffset + strlen(extraString) + 1;
00287
00288 }
00289 void shutdown() {
00290
00291 dvShutdown bye = dvShutdown();
00292 bye.send();
00293
00294 }
00295 void stream() {
00296
00297 boost::asio::deadline_timer t1(io_, boost::posix_time::seconds(MD_REFRESH));
00298 while (!thisDevice->shuttingDown)
00299 {t1.async_wait(trCallback);
00300 boost::system_time const alarum=boost::get_system_time() + boost::posix_time::seconds(2*MD_REFRESH);
00301 boost::this_thread::sleep(alarum);
00302 }
00303
00304 }
00305 void trCallback(const boost::system::error_code& error) {
00306
00307 if (!thisConfig->cdConnected) {
00308
00309 char portStr[10];
00310
00311 if (thisConfig->cdAddress.empty()) return;
00312 sprintf(portStr,"%d",thisConfig->cdPort);
00313 std::string cdPort(portStr);
00314
00315 if (!thisDevice->cd->connect_to(thisConfig->cdAddress,cdPort,1)) {
00316 theseLogs.logNdebug(NORMAL_DEBUG,0,"attempt to connect to CD failed.");
00317 return;
00318 } else
00319 {theseLogs.logNdebug(NORMAL_DEBUG,0,"connected to CD.");
00320 *mdDDAPI->cdConnected = true;
00321 }
00322
00323 }
00324
00325 dvTelemetryFrame thisFrame;
00326 thisFrame.send();
00327
00328 }
00329 void dvWQ() {
00330
00331 while(!thisConfig->terminateRequest) {
00332 if (!thisConfig->yield && thisDevice->q.size())
00333 { thisDevice->dispatch(thisDevice->q.top()); thisDevice->q.pop(); }
00334 else
00335 boost::this_thread::yield();
00336 }
00337
00338 }