/* * Remote Laboratory Protocol Terminal Part * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. * * (c) 2014 - 2019 Timothy Pearson * Raptor Engineering * http://www.raptorengineeringinc.com */ /* This part illustrates the correct method of transmitting and receiving * data in a dedicated thread, using two separate message queues to enable * fully non-blocking, event-driven execution. * * NOTE * inboundQueue is filled by the GUI thread with data inbound to the worker thread * outboundQueue is filled by the worker thread with data outbound to the GUI thread */ #include "define.h" #include "part.h" #include //::createAboutData() #include #include #include //::start() #include #include #include #include //encodeName() #include #include #include #include #include #include #include #include #include #include //access() #include #include #include "layout.h" #define NETWORK_COMM_TIMEOUT_MS 15000 /* exception handling */ struct exit_exception { int c; exit_exception(int c):c(c) { } }; namespace RemoteLab { typedef KParts::GenericFactory Factory; #define CLIENT_LIBRARY "libremotelab_prototerminal" K_EXPORT_COMPONENT_FACTORY( libremotelab_prototerminal, RemoteLab::Factory ) ProtoTerminalWorker::ProtoTerminalWorker() : TQObject() { m_networkDataMutex = new TQMutex(false); m_outboundQueueMutex = new TQMutex(false); m_inboundQueueMutex = new TQMutex(false); m_newData = false; } ProtoTerminalWorker::~ProtoTerminalWorker() { delete m_networkDataMutex; m_networkDataMutex = NULL; delete m_inboundQueueMutex; m_inboundQueueMutex = NULL; delete m_outboundQueueMutex; m_outboundQueueMutex = NULL; } void ProtoTerminalWorker::run() { TQEventLoop* eventLoop = TQApplication::eventLoop(); if (!eventLoop) { return; } while (1) { m_instrumentMutex->lock(); // Handle inbound queue m_inboundQueueMutex->lock(); if (m_inboundQueue.count() > 0) { TQDataStream ds(m_socket); ds.setPrintableData(true); ProtoTerminalEventQueue::iterator it; for (it = m_inboundQueue.begin(); it != m_inboundQueue.end(); ++it) { if ((*it).first == TxRxSyncPoint) { break; } if ((*it).first == ConsoleTextSend) { ds << (*it).second.toString(); m_socket->writeEndOfFrame(); } it = m_inboundQueue.erase(it); } m_socket->flush(); } m_inboundQueueMutex->unlock(); // Handle outbound queue if (m_newData) { bool queue_modified = false; m_networkDataMutex->lock(); m_newData = false; // Receive data if (m_socket->canReadFrame()) { TQDataStream ds(m_socket); ds.setPrintableData(true); // Get command status TQString input; while (!ds.atEnd()) { ds >> input; m_outboundQueueMutex->lock(); m_outboundQueue.push_back(ProtoTerminalEvent(ConsoleTextReceive, TQVariant(input))); m_outboundQueueMutex->unlock(); queue_modified = true; } m_socket->clearFrameTail(); } m_networkDataMutex->unlock(); if (queue_modified) { m_inboundQueueMutex->lock(); ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin(); if ((it) && (it != m_inboundQueue.end())) { // Remove sync point if ((*it).first == TxRxSyncPoint) { it = m_inboundQueue.erase(it); } } m_inboundQueueMutex->unlock(); emit(outboundQueueUpdated()); } } m_instrumentMutex->unlock(); // Wait for queue status change or new network activity if (!eventLoop->processEvents(TQEventLoop::ExcludeUserInput)) { eventLoop->processEvents(TQEventLoop::ExcludeUserInput | TQEventLoop::WaitForMore); } } eventLoop->exit(0); } void ProtoTerminalWorker::appendItemToInboundQueue(ProtoTerminalEvent item, bool syncPoint) { m_inboundQueueMutex->lock(); m_inboundQueue.push_back(item); if (syncPoint) { m_inboundQueue.push_back(ProtoTerminalEvent(TxRxSyncPoint, TQVariant())); } m_inboundQueueMutex->unlock(); } bool ProtoTerminalWorker::syncPointActive() { bool active = false; m_inboundQueueMutex->lock(); ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin(); if ((it) && (it != m_inboundQueue.end())) { if ((*it).first == TxRxSyncPoint) { active = true; } } m_inboundQueueMutex->unlock(); return active; } void ProtoTerminalWorker::wake() { // Do nothing -- the main event loop will wake when this is called } void ProtoTerminalWorker::dataReceived() { m_networkDataMutex->lock(); m_newData = true; m_networkDataMutex->unlock(); } void ProtoTerminalWorker::lockOutboundQueue() { m_outboundQueueMutex->lock(); } void ProtoTerminalWorker::unlockOutboundQueue() { m_outboundQueueMutex->unlock(); } ProtoTerminalEventQueue* ProtoTerminalWorker::outboundQueue() { return &m_outboundQueue; } ProtoTerminalPart::ProtoTerminalPart( TQWidget *parentWidget, const char *widgetName, TQObject *parent, const char *name, const TQStringList& ) : RemoteInstrumentPart( parent, name ), m_commHandlerState(-1), m_commHandlerMode(0), m_commHandlerCommandState(0), m_connectionActiveAndValid(false), m_base(0) { // Initialize important base class variables m_clientLibraryName = CLIENT_LIBRARY; // Initialize mutex m_instrumentMutex = new TQMutex(false); // Initialize kpart setInstance(Factory::instance()); setWidget(new TQVBox(parentWidget, widgetName)); // Set up worker m_worker = new ProtoTerminalWorker(); m_workerThread = new TQEventLoopThread(); m_worker->moveToThread(m_workerThread); TQObject::connect(this, TQT_SIGNAL(wakeWorkerThread()), m_worker, TQT_SLOT(wake())); TQObject::connect(m_worker, TQT_SIGNAL(outboundQueueUpdated()), this, TQT_SLOT(processOutboundQueue())); // Create timers m_updateTimeoutTimer = new TQTimer(this); connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(networkTimeout())); // Create widgets m_base = new ProtoTerminalBase(widget()); // Initialize widgets m_base->setMinimumSize(500,350); connect(m_base->sendText, SIGNAL(clicked()), this, SLOT(sendTextClicked())); connect(m_base->textInput, SIGNAL(returnPressed()), m_base->sendText, SIGNAL(clicked())); TQTimer::singleShot(0, this, TQT_SLOT(postInit())); } ProtoTerminalPart::~ProtoTerminalPart() { if (m_instrumentMutex->locked()) { printf("[WARNING] Exiting when data transfer still in progress!\n\r"); fflush(stdout); } disconnectFromServer(); delete m_instrumentMutex; if (m_workerThread) { m_workerThread->terminate(); m_workerThread->wait(); delete m_workerThread; m_workerThread = NULL; delete m_worker; m_worker = NULL; } } void ProtoTerminalPart::postInit() { setUsingFixedSize(false); } bool ProtoTerminalPart::openURL(const KURL &url) { int ret; m_connectionActiveAndValid = false; ret = connectToServer(url.url()); processLockouts(); return (ret != 0); } bool ProtoTerminalPart::closeURL() { disconnectFromServer(); m_url = KURL(); return true; } void ProtoTerminalPart::processLockouts() { if (m_connectionActiveAndValid) { m_base->setEnabled(true); } else { m_base->setEnabled(false); } } void ProtoTerminalPart::disconnectFromServerCallback() { m_updateTimeoutTimer->stop(); m_connectionActiveAndValid = false; } void ProtoTerminalPart::connectionFinishedCallback() { // Finish worker setup m_worker->m_socket = m_socket; m_worker->m_instrumentMutex = m_instrumentMutex; m_socket->moveToThread(m_workerThread); connect(m_socket, SIGNAL(readyRead()), m_socket, SLOT(processPendingData())); m_socket->processPendingData(); connect(m_socket, SIGNAL(newDataReceived()), m_worker, SLOT(dataReceived())); m_tickerState = 0; m_commHandlerState = 0; m_commHandlerMode = 0; m_socket->setDataTimeout(NETWORK_COMM_TIMEOUT_MS); m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE); // Start worker m_workerThread->start(); TQTimer::singleShot(0, m_worker, SLOT(run())); processLockouts(); networkTick(); return; } void ProtoTerminalPart::connectionStatusChangedCallback() { processLockouts(); } void ProtoTerminalPart::setTickerMessage(TQString message) { m_connectionActiveAndValid = true; TQString tickerChar; switch (m_tickerState) { case 0: tickerChar = "-"; break; case 1: tickerChar = "\\"; break; case 2: tickerChar = "|"; break; case 3: tickerChar = "/"; break; } setStatusMessage(message + TQString("... %1").arg(tickerChar)); m_tickerState++; if (m_tickerState > 3) { m_tickerState = 0; } } void ProtoTerminalPart::processOutboundQueue() { bool had_events = false; m_worker->lockOutboundQueue(); ProtoTerminalEventQueue* eventQueue = m_worker->outboundQueue(); ProtoTerminalEventQueue::iterator it; for (it = eventQueue->begin(); it != eventQueue->end(); ++it) { if ((*it).first == ConsoleTextReceive) { TQString input = (*it).second.toString(); if (input != "") { input.replace("\r", "\n"); m_base->textOutput->append(">>>" + input); } } had_events = true; } if (had_events) { networkTick(); eventQueue->clear(); } m_worker->unlockOutboundQueue(); processLockouts(); } void ProtoTerminalPart::networkTick() { m_updateTimeoutTimer->stop(); setTickerMessage(i18n("Connected")); m_connectionActiveAndValid = true; processLockouts(); } void ProtoTerminalPart::networkTimeout() { m_updateTimeoutTimer->stop(); m_socket->clearIncomingData(); setStatusMessage(i18n("Server ping timeout. Please verify the status of your network connection.")); m_connectionActiveAndValid = false; processLockouts(); } void ProtoTerminalPart::sendTextClicked() { if (!m_worker->syncPointActive()) { m_TextToSend = m_TextToSend + m_base->textInput->text(); m_base->textInput->setText(""); m_worker->appendItemToInboundQueue(ProtoTerminalEvent(ConsoleTextSend, TQVariant(m_TextToSend)), true); m_base->textOutput->append("<<<" + m_TextToSend); m_TextToSend = ""; emit wakeWorkerThread(); m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE); } } TDEAboutData* ProtoTerminalPart::createAboutData() { return new TDEAboutData( APP_NAME, I18N_NOOP( APP_PRETTYNAME ), APP_VERSION ); } } //namespace RemoteLab #include "part.moc"