00001 #include "auc-cd.h"
00002 #include "../server/Listener.cpp"
00003 #include "../server/EventSender.cpp"
00004 #include <time.h>
00005
00006 using namespace std;
00007 using boost::asio::ip::udp;
00008
00009 void hbCallback(const boost::system::error_code&);
00010 void stream();
00011 void trCallback(const boost::system::error_code& error);
00012
00013 void mdCliever::processEvent( const cdIncoming &dgEvent )
00014 {
00015 assert(EventSender<cdIncoming>::isSending());
00016
00017 switch(dgEvent.dg.hdr.msgType) {
00018 case MDDG_NEWBORN:
00019 switch(dgEvent.dg.hdr.clientType) {
00020 case MDDEV_MD:
00021 if (dgEvent.dg.hdr.sinkHandle) {
00022 myHandle = dgEvent.dg.hdr.sinkHandle;
00023 theseLogs.logN(1,"Got handle (%d) from MD. End of natal sequence for this cliever.",myHandle);
00024 }
00025 else if (!myHandle)
00026 theseLogs.logN(0,"MD rejected natal sequence. Please kill me and call tech. support.");
00027 break;
00028 case MACHINE:
00029 break;
00030 case MDDEV_INSTRUMENT:
00031 break;
00032 }
00033 break;
00034 }
00035
00036 }
00037 void mdCliever::processEvent( const cdHeartbeat &thisPulse )
00038 {
00039 assert(EventSender<cdHeartbeat>::isSending());
00040
00041 myPulse.dg.hdr.msgSN = sentMsgCount[MDDG_HEARTBEAT][0]++;
00042 myPulse.dg.hdr.sourceHandle = myHandle;
00043 strcpy(myPulse.dg.payLoad,thisConfig->telemetryPortStr.c_str());
00044 myPulse.dg.hdr.primeOffset = strlen(thisConfig->telemetryPortStr.c_str()) + 1;
00045 strcpy((char *)(&myPulse.dg.payLoad[strlen(myPulse.dg.payLoad)+1]),"CLIEVER");
00046 myPulse.dg.hdr.payloadSize = myPulse.dg.hdr.primeOffset + strlen("CLIEVER") + 1;
00047 fg->send_to(myPulse.dg, myStdDevIdx );
00048 theseLogs.logNdebug(MAX_DEBUG,1,"Heartbeat (%d)",sentMsgCount[MDDG_HEARTBEAT][0]);
00049
00050 }
00051 void mdCliever::processEvent( const cdInteractiveCommand &cmdEvent )
00052 {
00053 assert(EventSender<cdInteractiveCommand>::isSending());
00054
00055 }
00056 void mdCliever::processEvent( const cdShutdown &bye )
00057 {
00058 assert(EventSender<cdShutdown>::isSending());
00059 shuttingDown = true;
00060 theseLogs.logN(0,"Shutting down: draining work for immediate exit.");
00061
00062 }
00063 void mdCliever::processEvent( const cdTelemetryFrame &thisFrame )
00064 {
00065 assert(EventSender<cdTelemetryFrame>::isSending());
00066
00067 }
00068 void mdCliever::processEvent( const cdResponse &thisReply )
00069 {
00070 const void *queued = &thisReply;
00071
00072 assert(EventSender<cdResponse>::isSending());
00073 queue(new mdWQitem( queued, MD_NEWBORN, 0 ));
00074
00075 }
00076 void mdDGChannel::handle_receive_from(const boost::system::error_code& error,
00077 size_t bytes_recvd)
00078 {
00079 if (!error && bytes_recvd > 0)
00080 {
00081 cdIncoming incoming( *(thisCliever->bg) );
00082
00083 if (incoming.dg.hdr.clientType >= 0 && incoming.dg.hdr.clientType < N_MDDEV_TYPES)
00084 {theseLogs.logNdebug(MAX_DEBUG,2,"msgtype %d received from a '%s'.",incoming.dg.hdr.msgType,clientTypes[incoming.dg.hdr.clientType]);
00085 incoming.ip = p_endpoint_;
00086 incoming.send();
00087
00088 } else
00089 theseLogs.logNdebug(1,1,"msgtype %d received from unknown MD client type, ignored.",incoming.dg.hdr.msgType);
00090
00091 }
00092 passive_.async_receive_from(
00093 boost::asio::buffer(data_, MD_MAX_DATAGRAM), p_endpoint_,
00094 boost::bind(&mdDGChannel::handle_receive_from, this,
00095 boost::asio::placeholders::error,
00096 boost::asio::placeholders::bytes_transferred));
00097
00098 }
00099 void mdCliever::dispatch(mdWQitem *next) {
00100
00101 switch(next->kind) {
00102 case MD_NEWBORN:
00103 break;
00104 }
00105 delete next;
00106
00107 }
00108 void mdCDHeartbeat::nextBeat(const boost::system::error_code& error) { thisCliever->myPulse.send(); }
00109 void mdCDHeartbeat::run() {
00110
00111 while (!thisCliever->shuttingDown)
00112 {t0->async_wait(hbCallback);
00113 if (thisCliever->myHandle)
00114 sleep(60*MD_HEARTBEAT);
00115 else
00116 sleep(10*MD_HEARTBEAT);
00117 }
00118
00119 }
00120 void hbCallback(const boost::system::error_code& error) { if (thisCliever->alive) thisCliever->pulse->nextBeat(error); }
00121 void pulse() {if ((thisCliever->connected=thisCliever->fg->connect_to( thisConfig->mdAddress, thisConfig->telemetryPortStr )))
00122 theseLogs.logNdebug(NORMAL_DEBUG,2,"Cliever connected to: MD @ %s port %s.", thisConfig->mdAddress.c_str(),thisConfig->telemetryPortStr.c_str());
00123 else
00124 theseLogs.logNdebug(NORMAL_DEBUG*2,2,"Cliever connect_to: MD @ %s port %s failed.", thisConfig->mdAddress.c_str(),thisConfig->telemetryPortStr.c_str());
00125 if (thisCliever->connected)
00126 thisCliever->pulse->run();
00127 }
00128 void runCliever() {
00129
00130 try { cb[0] = new mdCB();
00131
00132 thisCliever = new mdCliever(thisConfig);
00133
00134 EventSender<cdHeartbeat>::add(*thisCliever);
00135 assert(EventSender<cdHeartbeat>::getNumListeners() == 1);
00136 EventSender<cdIncoming>::add(*thisCliever);
00137 assert(EventSender<cdIncoming>::getNumListeners() == 1);
00138 EventSender<cdShutdown>::add(*thisCliever);
00139 assert(EventSender<cdShutdown>::getNumListeners() == 1);
00140 EventSender<cdTelemetryFrame>::add(*thisCliever);
00141 assert(EventSender<cdTelemetryFrame>::getNumListeners() == 1);
00142 EventSender<cdInteractiveCommand>::add(*thisCliever);
00143 assert(EventSender<cdInteractiveCommand>::getNumListeners() == 1);
00144
00145 thisCliever->fg = new mdDGChannel( io_fg, 0 );
00146
00147 } catch(...) {
00148 theseLogs.logNdebug(-1,0,"Unknown error in Cliever initialization block.");
00149 }
00150
00151 theseLogs.logNdebug(NORMAL_DEBUG*2,0,"Cliever instantiated, starting heartbeat and telemetry stream.");
00152
00153 thisCliever->alive = true;
00154 io_fg.run();
00155 theseLogs.logNdebug(0,0,"runCliever asio ended");
00156
00157 }
00158 void runDataLayer() {
00159
00160 int assertCliever;
00161
00162 theseLogs.logN(1,"Spin to attach MD datalayer background on port %d ...",thisConfig->telemetryPort);
00163
00164 for(assertCliever=0;!thisCliever && assertCliever < MAX_DEBUG;assertCliever++);
00165 for(assertCliever=0;!thisCliever->alive && assertCliever < MAX_DEBUG;assertCliever++);
00166
00167 try {
00168
00169 thisCliever->bg = new mdDGChannel(io_bg, thisConfig->telemetryPort );
00170 theseLogs.logN(0,"... main bus background running.");
00171 thisCliever->pulse = new mdCDHeartbeat();
00172 thisCliever->pulse->init();
00173 boost::thread myPulse(pulse);
00174 assert(myPulse.joinable());
00175 boost::thread telemetryStream(stream);
00176 assert(telemetryStream.joinable());
00177 io_bg.run();
00178 myPulse.join();
00179 telemetryStream.join();
00180
00181 }
00182 catch (std::exception& e)
00183 {
00184 theseLogs.logN(1,"Fatal error in data layer bg: %s .",e.what());
00185 }
00186 catch (...)
00187 {
00188 theseLogs.logN(0,"Unknown failure in datalayer bg.");
00189 }
00190
00191 theseLogs.logNdebug(1,0,"asio background io service ended.");
00192
00193 }
00194 void shutdown() {
00195
00196 cdShutdown bye = cdShutdown();
00197 bye.send();
00198
00199 }
00200 void stream() {
00201
00202 boost::asio::deadline_timer t1(io_bg, boost::posix_time::seconds(MD_REFRESH));
00203 while (!thisCliever->shuttingDown)
00204 {t1.async_wait(trCallback);
00205 sleep(2*MD_REFRESH);
00206 }
00207
00208 }
00209 void trCallback(const boost::system::error_code& error) {
00210
00211 cdTelemetryFrame thisFrame;
00212 thisFrame.send();
00213
00214 }