00001 #define MD_CORE
00002 #include "auc-md.h"
00003 #include "masterDaemon.h"
00004 #include "coreapi.h"
00005 #include "../server/Listener.cpp"
00006 #include "../server/EventSender2.cpp"
00007
00008 void attention();
00009 void arCallback(const boost::system::error_code& error);
00010 void mdWQ();
00011
00012 void masterDaemon::dispatch(mdWQitem *next) {
00013
00014 bool success;
00015 boost::system::error_code ec;
00016 const char *failure;
00017 int sentBytes,step=1;
00018 mdResponse *ackOrNak = (mdResponse *)next->what;
00019
00020 switch(next->kind) {
00021 case DV_MDQUERY:
00022 failure = (const char *)&ackOrNak->reply.dg.payLoad[0];
00023 success = ackOrNak->reply.dg.hdr.dgType.value == 1;
00024 goto commonReply;
00025 case MD_NEWBORN:
00026 ackOrNak->reply.dg.hdr.msgType = MDDG_NEWBORN;
00027 failure = "Stillbirth";
00028 success = ackOrNak->mdStdDevIdx >= 0;
00029 commonReply:
00030 assert(cb.find(ackOrNak->mdStdDevIdx) != cb.end());
00031 ackOrNak->reply.dg.hdr.clientType = MDDEV_MD;
00032 if (success) {
00033 if (!cb[ackOrNak->mdStdDevIdx]->connection.open) {
00034 ackOrNak->bus->connect_to(ackOrNak->ip,ec,step,ackOrNak->mdStdDevIdx);
00035 if (cb[ackOrNak->mdStdDevIdx]->connection.open) {
00036 if ((sentBytes=ackOrNak->bus->send(ackOrNak->reply,ec)) != sizeof(mdDGReply))
00037 theseLogs->logNdebug(1,2,"incomplete blocking send: %d: %s",sentBytes,ec.message().c_str());
00038 }
00039 else
00040 theseLogs->logNdebug(1,2,"Couldn't get back channel to client: %s (in step %d).",ec.message().c_str(),step);
00041 }
00042 else if ((sentBytes=ackOrNak->bus->send(ackOrNak->reply,ec)) != sizeof(mdDGReply))
00043 theseLogs->logNdebug(1,2,"incomplete blocking send: %d: %s",sentBytes,ec.message().c_str());
00044
00045 } else {
00046 theseLogs->logNdebug(1,0,failure);
00047 }
00048 delete ackOrNak;
00049 break;
00050 }
00051 delete next;
00052
00053 }
00054
00055
00056
00057
00058
00059 void masterDaemon::dispatch(const mdIncoming &what) {
00060
00061 bool isObservation;
00062 const char *name,*xStr;
00063 int about=what.dg.hdr.handle;
00064 md_device thisKind;
00065 mdInstrument *d1;
00066 mdMachine *d2;
00067
00068 map<int,mdLiveClient*>::iterator iter = thisConfig->allClients.find(about);
00069
00070 switch(what.dg.hdr.msgType) {
00071 case MDDG_CDRESET:
00072 theseLogs->logN(0,"Shutdown request received from a Cliever");
00073 thisConfig->halt = true;
00074 break;
00075 case MDDG_MDQUERY:
00076 if( iter == thisConfig->allClients.end() )
00077 theseLogs->logN(1,"Query for device whose handle (%d) has disappeared, ignored.", about );
00078 else {
00079 thisKind = thisConfig->allClients[about]->devType;
00080 name = &what.dg.payLoad[0];
00081 xStr = &what.dg.payLoad[what.dg.hdr.primeOffset];
00082 switch(what.dg.hdr.dgSubType) {
00083 case MDDG_REGSCPI:
00084 theseLogs->logNdebug(NORMAL_DEBUG,4,"Src SCPI: '%s' from type: %d ('%s'), handle %d.", name, what.dg.hdr.clientType,xStr,about);
00085 if (thisKind == MACHINE) theMachine->registerCmd(name,what);
00086 else thisConfig->instruments[about]->registerCmd(name,what);
00087 break;
00088 case MDDG_REGODE:
00089 theseLogs->logNdebug(NORMAL_DEBUG,4,"Src ODE: '%s' from type: %d ('%s'), handle %d.", name, what.dg.hdr.clientType,xStr,about);
00090 goto regName;
00091 case MDDG_REGOBS:
00092 theseLogs->logNdebug(NORMAL_DEBUG,4,"Src Obs: '%s' from type: %d ('%s'), handle %d.", name, what.dg.hdr.clientType,xStr,about);
00093 regName:
00094 if (thisKind == MACHINE) theMachine->state.registerData(name,what);
00095 else thisConfig->instruments[about]->state.registerData(name,what);
00096 break;
00097 }}
00098 break;
00099 case MDDG_HEARTBEAT:
00100 if (!what.dg.hdr.sourceHandle) {
00101 if (what.dg.hdr.clientType < MDDEV_CD || what.dg.hdr.clientType > MDDEV_DATACLIENT) {
00102 theseLogs->logN(1,"Heartbeat from unknown client type: %d, ignored.", what.dg.hdr.clientType);
00103 break;
00104 }
00105 theseLogs->logNdebug(NORMAL_DEBUG*4,1,"Heartbeat from new %s ...",clientTypes[what.dg.hdr.clientType]);
00106 if (what.dg.hdr.primeOffset >= 5) {
00107 mdClientBirth itsAWhat;
00108 name = (char *)(&what.dg.payLoad[what.dg.hdr.primeOffset]);
00109 theseLogs->logNdebug(NORMAL_DEBUG*4,1," ... its telemetry port is %s.",what.dg.payLoad);
00110 theseLogs->logNdebug(NORMAL_DEBUG*4,1," ... '%s' will be its _deviceName.",name);
00111 itsAWhat.dg = what.dg;
00112 itsAWhat.ip = what.ip;
00113 itsAWhat.ip.port((unsigned short)atoi(what.dg.payLoad));
00114 itsAWhat.dgDetermined = true;
00115 itsAWhat.send();
00116 }
00117 else
00118 theseLogs->logN(1,"Heartbeat didn't appear to say what port to use, ignored.");
00119 }
00120 else {
00121 theseLogs->logNdebug(MAX_DEBUG,2,"Heartbeat from client with handle: %d.",what.dg.hdr.sourceHandle);
00122 }
00123 break;
00124 }
00125
00126 }
00127 int masterDaemon::initBaseAPI(void) {
00128
00129 int rc=OK;
00130
00131 try {
00132
00133 theseLogs->logN(0,"Create Generic Core API");
00134
00135 xmlrpc_c::methodPtr const registerDeviceP(new registerDevice(thisService->cfg));
00136 xmlrpc_c::methodPtr const getMDversionP(new getMDversion);
00137 xmlrpc_c::methodPtr const getP(new getter);
00138 xmlrpc_c::methodPtr const setP(new setter);
00139 xmlrpc_c::methodPtr const getCmdListP(new cmdListFetch);
00140 xmlrpc_c::methodPtr const getCmdP(new cmd);
00141 xmlrpc_c::methodPtr const createP(new create);
00142
00143 thisConfig->api_registry.addMethod("device.registeR", registerDeviceP );
00144 thisConfig->api_registry.addMethod("state.getMDversion", getMDversionP );
00145 thisConfig->api_registry.addMethod("state.create", createP);
00146 thisConfig->api_registry.addMethod("state.get", getP );
00147 thisConfig->api_registry.addMethod("state.set", setP );
00148 thisConfig->api_registry.addMethod("behavior.getCommandList", getCmdListP );
00149 thisConfig->api_registry.addMethod("behavior.command", getCmdP );
00150
00151 }
00152 catch(...)
00153 { rc = NOT_OK; }
00154
00155 return rc;
00156
00157 }
00158 void masterDaemon::listen() {
00159
00160 EventSender<mdAttention>::add(*this);
00161 assert(EventSender<mdAttention>::getNumListeners() == 1);
00162 EventSender<mdCDPulse>::add(*this);
00163 assert(EventSender<mdCDPulse>::getNumListeners() == 1);
00164 EventSender<mdClientBirth>::add(*this);
00165 assert(EventSender<mdClientBirth>::getNumListeners() == 1);
00166 EventSender<mdClientDeath>::add(*this);
00167 assert(EventSender<mdClientDeath>::getNumListeners() == 1);
00168 EventSender<mdDeviceCommand>::add(*this);
00169 assert(EventSender<mdDeviceCommand>::getNumListeners() == 1);
00170 EventSender<mdIncoming>::add(*this);
00171 assert(EventSender<mdIncoming>::getNumListeners() == 1);
00172 EventSender<mdResponse>::add(*this);
00173 assert(EventSender<mdResponse>::getNumListeners() == 1);
00174 EventSender<mdTelemetryFrame>::add(*this);
00175 assert(EventSender<mdTelemetryFrame>::getNumListeners() == 1);
00176
00177 boost::thread mdAr(attention);
00178
00179 }
00180 void masterDaemon::processEvent( const mdAttention &thisAR )
00181 {
00182 assert(EventSender<mdAttention>::isSending());
00183 }
00184 void masterDaemon::processEvent( const mdCDPulse &thisPulse )
00185 {
00186 assert(EventSender<mdCDPulse>::isSending());
00187 }
00188 void masterDaemon::processEvent( const mdClientBirth &thisWhat )
00189 {
00190 assert(EventSender<mdClientBirth>::isSending());
00191 if (thisWhat.dgDetermined) {
00192 deviceFactory->newFromHeartbeat(thisWhat);
00193 }
00194 else {
00195 deviceFactory->newFromAPI(
00196 thisWhat.clientType,thisWhat.signature);
00197 }
00198 }
00199 void masterDaemon::processEvent( const mdClientDeath &thisWas )
00200 {
00201 assert(EventSender<mdClientDeath>::isSending());
00202 }
00203 void masterDaemon::processEvent( const mdDeviceCommand &thisCmd )
00204 {
00205 assert(EventSender<mdDeviceCommand>::isSending());
00206 }
00207 void masterDaemon::processEvent( const mdIncoming &thisDatagram )
00208 {
00209 assert(EventSender<mdIncoming>::isSending());
00210 thisService->dispatch(thisDatagram);
00211 }
00212 void masterDaemon::processEvent( const mdResponse &thisReply )
00213 {
00214 const void *queued = &thisReply;
00215
00216 assert(EventSender<mdResponse>::isSending());
00217 queue(new mdWQitem( queued, thisReply.dCat, 0 ));
00218
00219 }
00220 void masterDaemon::processEvent( const mdTelemetryFrame &thisFrame )
00221 {
00222 assert(EventSender<mdTelemetryFrame>::isSending());
00223 }
00224 void masterDaemon::run() {
00225
00226 deviceFactory = new mdDeviceFabrik();
00227 fg = new mdDGChannel( thisService->io_, 0 );
00228
00229 if (initBaseAPI()) return;
00230 listen();
00231 boost::thread work(mdWQ);
00232 assert(work.joinable());
00233 theseLogs->logNdebug(MAX_DEBUG,0,"Master Daemon worker started, foreground async i/o service joins MD worker.");
00234 io_.run();
00235 work.join();
00236
00237 }
00238
00239 void mdDGChannel::handle_receive_from(const boost::system::error_code& error,
00240 size_t bytes_recvd)
00241 {
00242 const char *c1;
00243
00244 if (!error && bytes_recvd > 0)
00245 {
00246 mdIncoming incoming(thisService->bg);
00247
00248 incoming.ip = thisService->bg->p_endpoint_;
00249 c1 = thisService->bg->p_endpoint_.address().to_string().c_str();
00250
00251 if (incoming.dg.hdr.clientType > 0 && incoming.dg.hdr.clientType < N_MDDEV_TYPES)
00252 { theseLogs->logNdebug(MAX_DEBUG,3,"msgtype %d received from %s (a '%s').",incoming.dg.hdr.msgType,c1,clientTypes[incoming.dg.hdr.clientType]);
00253 incoming.send();
00254 } else
00255 theseLogs->logNdebug(1,2,"msgtype %d received from unknown MD client type at %s, ignored.",incoming.dg.hdr.msgType,c1);
00256 }
00257 passive_.async_receive_from(
00258 boost::asio::buffer(data_, MD_MAX_DATAGRAM), p_endpoint_,
00259 boost::bind(&mdDGChannel::handle_receive_from, this,
00260 boost::asio::placeholders::error,
00261 boost::asio::placeholders::bytes_transferred));
00262 }
00263
00264
00265
00266
00267 void attention() {
00268
00269 bool announced = false;
00270
00271
00272 boost::asio::deadline_timer t0(thisService->io_, boost::posix_time::seconds(MD_HAUSHALT));
00273 while (!thisService->shuttingDown)
00274 {t0.async_wait(arCallback);
00275 sleep(2*MD_HAUSHALT);
00276 if (!announced) {announced = true;
00277 theseLogs->logNdebug(MAX_DEBUG,0,"First invocation of housekeeping.");}
00278 }
00279
00280 }
00281 void arCallback(const boost::system::error_code& error) {
00282
00283 mdAttention housekeep( thisService->arCycles++ );
00284 housekeep.send();
00285
00286 }
00287 void runMasterDaemon() {
00288
00289 cb[0] = new mdCB();
00290 thisService = new masterDaemon( thisConfig );
00291 thisService->run();
00292
00293 }
00294 void mdWQ() {
00295
00296 while(!thisConfig->shutdown) {
00297 if (!thisConfig->halt && thisService->q.size())
00298 { thisService->dispatch(thisService->q.top()); thisService->q.pop(); }
00299 else
00300 boost::this_thread::yield();
00301 }
00302
00303 }
00304 void runDataLayer() {
00305
00306 boost::asio::io_service io_;
00307
00308 theseLogs->logN(1,"Background dgram service thread starting on port %d.",thisConfig->telemetryPort);
00309
00310 try {
00311
00312 thisService->bg = new mdDGChannel(io_, thisConfig->telemetryPort);
00313 io_.run();
00314
00315 }
00316 catch (std::exception& e)
00317 {
00318 theseLogs->logN(1,"Fatal error on main bus background: %s .",e.what());
00319 }
00320 catch (...)
00321 {
00322 theseLogs->logN(0,"Unknown failure in background datalayer.");
00323 }
00324
00325 theseLogs->logNdebug(1,0,"mainbus background thread exited.");
00326
00327 }