summaryrefslogtreecommitdiffstats
path: root/clients/tde/src/part
diff options
context:
space:
mode:
authorTimothy Pearson <kb9vqf@pearsoncomputing.net>2015-09-11 00:10:48 -0500
committerTimothy Pearson <kb9vqf@pearsoncomputing.net>2015-09-11 00:10:48 -0500
commitf6cc7c2a0aec01f7b607c63772e89b9127301a70 (patch)
tree9fec9424ddba4c9bdb421ec0e207b5f3b8633c06 /clients/tde/src/part
parentb32f713446eaf5ebb417281afe7740d149eb3f3b (diff)
downloadulab-f6cc7c2a0aec01f7b607c63772e89b9127301a70.tar.gz
ulab-f6cc7c2a0aec01f7b607c63772e89b9127301a70.zip
Update ProtoTerminal part to use threading and message queues
Diffstat (limited to 'clients/tde/src/part')
-rw-r--r--clients/tde/src/part/prototerminal/part.cpp327
-rw-r--r--clients/tde/src/part/prototerminal/part.h64
2 files changed, 281 insertions, 110 deletions
diff --git a/clients/tde/src/part/prototerminal/part.cpp b/clients/tde/src/part/prototerminal/part.cpp
index 7818988..b5e5a1d 100644
--- a/clients/tde/src/part/prototerminal/part.cpp
+++ b/clients/tde/src/part/prototerminal/part.cpp
@@ -15,11 +15,20 @@
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
- * (c) 2014 Timothy Pearson
+ * (c) 2014 - 2015 Timothy Pearson
* Raptor Engineering
* http://www.raptorengineeringinc.com
*/
+/* This part illustrates the correct method of transmitting and receiving
+ * data in a dedicated thread, using two separate message queues to enable
+ * fully non-blocking, event-driven execution.
+ *
+ * NOTE
+ * inboundQueue is filled by the GUI thread with data inbound to the worker thread
+ * outboundQueue is filled by the worker thread with data outbound to the GUI thread
+ */
+
#include "define.h"
#include "part.h"
@@ -60,6 +69,146 @@ typedef KParts::GenericFactory<RemoteLab::ProtoTerminalPart> Factory;
#define CLIENT_LIBRARY "libremotelab_prototerminal"
K_EXPORT_COMPONENT_FACTORY( libremotelab_prototerminal, RemoteLab::Factory )
+ProtoTerminalWorker::ProtoTerminalWorker() : TQObject() {
+ m_networkDataMutex = new TQMutex(false);
+ m_outboundQueueMutex = new TQMutex(false);
+ m_inboundQueueMutex = new TQMutex(false);
+ m_newData = false;
+}
+
+ProtoTerminalWorker::~ProtoTerminalWorker() {
+ delete m_networkDataMutex;
+ m_networkDataMutex = NULL;
+ delete m_inboundQueueMutex;
+ m_inboundQueueMutex = NULL;
+ delete m_outboundQueueMutex;
+ m_outboundQueueMutex = NULL;
+}
+
+void ProtoTerminalWorker::run() {
+ TQEventLoop* eventLoop = TQApplication::eventLoop();
+ if (!eventLoop) {
+ return;
+ }
+
+ while (1) {
+ m_instrumentMutex->lock();
+
+ // Handle inbound queue
+ m_inboundQueueMutex->lock();
+ if (m_inboundQueue.count() > 0) {
+ TQDataStream ds(m_socket);
+ ds.setPrintableData(true);
+
+ ProtoTerminalEventQueue::iterator it;
+ for (it = m_inboundQueue.begin(); it != m_inboundQueue.end(); ++it) {
+ if ((*it).first == TxRxSyncPoint) {
+ break;
+ }
+ if ((*it).first == ConsoleTextSend) {
+ ds << (*it).second.toString();
+ m_socket->writeEndOfFrame();
+ }
+ it = m_inboundQueue.erase(it);
+ }
+ m_socket->flush();
+ }
+ m_inboundQueueMutex->unlock();
+
+ // Handle outbound queue
+ if (m_newData) {
+ bool queue_modified = false;
+ m_networkDataMutex->lock();
+ m_newData = false;
+
+ // Receive data
+ if (m_socket->canReadFrame()) {
+ TQDataStream ds(m_socket);
+ ds.setPrintableData(true);
+
+ // Get command status
+ TQString input;
+ while (!ds.atEnd()) {
+ ds >> input;
+ m_outboundQueueMutex->lock();
+ m_outboundQueue.push_back(ProtoTerminalEvent(ConsoleTextReceive, TQVariant(input)));
+ m_outboundQueueMutex->unlock();
+ queue_modified = true;
+ }
+ m_socket->clearFrameTail();
+ }
+ m_networkDataMutex->unlock();
+
+ if (queue_modified) {
+ m_inboundQueueMutex->lock();
+ ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin();
+ if ((it) && (it != m_inboundQueue.end())) {
+ // Remove sync point
+ if ((*it).first == TxRxSyncPoint) {
+ it = m_inboundQueue.erase(it);
+ }
+ }
+ m_inboundQueueMutex->unlock();
+ emit(outboundQueueUpdated());
+ }
+ }
+
+ m_instrumentMutex->unlock();
+
+ // Wait for queue status change or new network activity
+ if (!eventLoop->processEvents(TQEventLoop::ExcludeUserInput)) {
+ eventLoop->processEvents(TQEventLoop::ExcludeUserInput | TQEventLoop::WaitForMore);
+ }
+ }
+
+ eventLoop->exit(0);
+}
+
+void ProtoTerminalWorker::appendItemToInboundQueue(ProtoTerminalEvent item, bool syncPoint) {
+ m_inboundQueueMutex->lock();
+ m_inboundQueue.push_back(item);
+ if (syncPoint) {
+ m_inboundQueue.push_back(ProtoTerminalEvent(TxRxSyncPoint, TQVariant()));
+ }
+ m_inboundQueueMutex->unlock();
+}
+
+bool ProtoTerminalWorker::syncPointActive() {
+ bool active = false;
+
+ m_inboundQueueMutex->lock();
+ ProtoTerminalEventQueue::iterator it = m_inboundQueue.begin();
+ if ((it) && (it != m_inboundQueue.end())) {
+ if ((*it).first == TxRxSyncPoint) {
+ active = true;
+ }
+ }
+ m_inboundQueueMutex->unlock();
+
+ return active;
+}
+
+void ProtoTerminalWorker::wake() {
+ // Do nothing -- the main event loop will wake when this is called
+}
+
+void ProtoTerminalWorker::dataReceived() {
+ m_networkDataMutex->lock();
+ m_newData = true;
+ m_networkDataMutex->unlock();
+}
+
+void ProtoTerminalWorker::lockOutboundQueue() {
+ m_outboundQueueMutex->lock();
+}
+
+void ProtoTerminalWorker::unlockOutboundQueue() {
+ m_outboundQueueMutex->unlock();
+}
+
+ProtoTerminalEventQueue* ProtoTerminalWorker::outboundQueue() {
+ return &m_outboundQueue;
+}
ProtoTerminalPart::ProtoTerminalPart( TQWidget *parentWidget, const char *widgetName, TQObject *parent, const char *name, const TQStringList& )
: RemoteInstrumentPart( parent, name ), m_commHandlerState(-1), m_commHandlerMode(0), m_commHandlerCommandState(0), m_connectionActiveAndValid(false), m_base(0)
@@ -74,11 +223,16 @@ ProtoTerminalPart::ProtoTerminalPart( TQWidget *parentWidget, const char *widget
setInstance(Factory::instance());
setWidget(new TQVBox(parentWidget, widgetName));
+ // Set up worker
+ m_worker = new ProtoTerminalWorker();
+ m_workerThread = new TQEventLoopThread();
+ m_worker->moveToThread(m_workerThread);
+ TQObject::connect(this, TQT_SIGNAL(wakeWorkerThread()), m_worker, TQT_SLOT(wake()));
+ TQObject::connect(m_worker, TQT_SIGNAL(outboundQueueUpdated()), this, TQT_SLOT(processOutboundQueue()));
+
// Create timers
- m_forcedUpdateTimer = new TQTimer(this);
- connect(m_forcedUpdateTimer, SIGNAL(timeout()), this, SLOT(mainEventLoop()));
m_updateTimeoutTimer = new TQTimer(this);
- connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(mainEventLoop()));
+ connect(m_updateTimeoutTimer, SIGNAL(timeout()), this, SLOT(networkTimeout()));
// Create widgets
m_base = new ProtoTerminalBase(widget());
@@ -99,6 +253,15 @@ ProtoTerminalPart::~ProtoTerminalPart() {
disconnectFromServer();
delete m_instrumentMutex;
+
+ if (m_workerThread) {
+ m_workerThread->terminate();
+ m_workerThread->wait();
+ delete m_workerThread;
+ m_workerThread = NULL;
+ delete m_worker;
+ m_worker = NULL;
+ }
}
void ProtoTerminalPart::postInit() {
@@ -129,22 +292,31 @@ void ProtoTerminalPart::processLockouts() {
}
void ProtoTerminalPart::disconnectFromServerCallback() {
- m_forcedUpdateTimer->stop();
m_updateTimeoutTimer->stop();
m_connectionActiveAndValid = false;
}
void ProtoTerminalPart::connectionFinishedCallback() {
+ // Finish worker setup
+ m_worker->m_socket = m_socket;
+ m_worker->m_instrumentMutex = m_instrumentMutex;
+ m_socket->moveToThread(m_workerThread);
+
connect(m_socket, SIGNAL(readyRead()), m_socket, SLOT(processPendingData()));
m_socket->processPendingData();
- connect(m_socket, SIGNAL(newDataReceived()), this, SLOT(mainEventLoop()));
+ connect(m_socket, SIGNAL(newDataReceived()), m_worker, SLOT(dataReceived()));
m_tickerState = 0;
m_commHandlerState = 0;
m_commHandlerMode = 0;
m_socket->setDataTimeout(NETWORK_COMM_TIMEOUT_MS);
m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE);
+
+ // Start worker
+ m_workerThread->start();
+ TQTimer::singleShot(0, m_worker, SLOT(run()));
+
processLockouts();
- mainEventLoop();
+ networkTick();
return;
}
@@ -176,119 +348,60 @@ void ProtoTerminalPart::setTickerMessage(TQString message) {
}
}
-#define UPDATEDISPLAY_TIMEOUT m_connectionActiveAndValid = false; \
- m_tickerState = 0; \
- m_commHandlerState = 2; \
- m_commHandlerMode = 0; \
- m_socket->clearIncomingData(); \
- setStatusMessage(i18n("Server ping timeout. Please verify the status of your network connection.")); \
- m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE); \
- m_instrumentMutex->unlock(); \
- return;
-
-#define COMMUNICATIONS_FAILED m_connectionActiveAndValid = false; \
- m_tickerState = 0; \
- m_commHandlerState = 2; \
- m_commHandlerMode = 0; \
- m_socket->clearIncomingData(); \
- setStatusMessage(i18n("Instrument communication failure. Please verify the status of your network connection.")); \
- m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE); \
- m_instrumentMutex->unlock(); \
- return;
-
-#define SET_WATCHDOG_TIMER if (!m_updateTimeoutTimer->isActive()) m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE);
-#define PAT_WATCHDOG_TIMER m_updateTimeoutTimer->stop(); m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE); \
- setTickerMessage(i18n("Connected"));
-
-#define SET_NEXT_STATE(x) if (m_commHandlerMode == 0) { \
- m_commHandlerState = x; \
- } \
- else { \
- m_commHandlerState = 255; \
- }
-
-#define EXEC_NEXT_STATE_IMMEDIATELY m_forcedUpdateTimer->start(0, TRUE);
-
-void ProtoTerminalPart::mainEventLoop() {
- TQDataStream ds(m_socket);
- ds.setPrintableData(true);
-
- if (!m_instrumentMutex->tryLock()) {
- EXEC_NEXT_STATE_IMMEDIATELY
- return;
- }
-
- if (m_socket) {
- if ((m_commHandlerMode == 0) || (m_commHandlerMode == 1)) {
- if (m_commHandlerState == 0) {
- // Setup functions go here
- setTickerMessage("Connection established");
-
- m_commHandlerState = 1;
- EXEC_NEXT_STATE_IMMEDIATELY
- }
- else if (m_commHandlerState == 1) {
- // Receive data
- if (m_socket->canReadFrame()) {
- PAT_WATCHDOG_TIMER
-
- // Get command status
- TQString input;
- while (!ds.atEnd()) {
- ds >> input;
-
- if (input != "") {
- input.replace("\r", "\n");
- m_base->textOutput->append(">>>" + input);
- }
- }
- m_socket->clearFrameTail();
- EXEC_NEXT_STATE_IMMEDIATELY
- }
- else {
- if (!m_updateTimeoutTimer->isActive()) {
- UPDATEDISPLAY_TIMEOUT
- }
- }
-
- // Send data
- if (m_TextToSend != "") {
- ds << m_TextToSend;
- m_socket->writeEndOfFrame();
+void ProtoTerminalPart::processOutboundQueue() {
+ bool had_events = false;
- m_base->textOutput->append("<<<" + m_TextToSend);
- m_TextToSend = "";
+ m_worker->lockOutboundQueue();
- EXEC_NEXT_STATE_IMMEDIATELY
- }
-
- // Never time out
- PAT_WATCHDOG_TIMER
- }
- else if (m_commHandlerState == 2) {
- // Ignore timeouts
- m_commHandlerState = 1;
- EXEC_NEXT_STATE_IMMEDIATELY
+ ProtoTerminalEventQueue* eventQueue = m_worker->outboundQueue();
+ ProtoTerminalEventQueue::iterator it;
+ for (it = eventQueue->begin(); it != eventQueue->end(); ++it) {
+ if ((*it).first == ConsoleTextReceive) {
+ TQString input = (*it).second.toString();
+ if (input != "") {
+ input.replace("\r", "\n");
+ m_base->textOutput->append(">>>" + input);
}
- SET_WATCHDOG_TIMER
}
+ had_events = true;
}
- else {
- m_commHandlerState = 0;
- m_commHandlerCommandState = 0;
+ if (had_events) {
+ networkTick();
+ eventQueue->clear();
}
+ m_worker->unlockOutboundQueue();
+
processLockouts();
+}
- m_instrumentMutex->unlock();
+void ProtoTerminalPart::networkTick() {
+ m_updateTimeoutTimer->stop();
+ setTickerMessage(i18n("Connected"));
+ m_connectionActiveAndValid = true;
+ processLockouts();
+}
+
+void ProtoTerminalPart::networkTimeout() {
+ m_updateTimeoutTimer->stop();
+ m_socket->clearIncomingData();
+ setStatusMessage(i18n("Server ping timeout. Please verify the status of your network connection."));
+ m_connectionActiveAndValid = false;
+ processLockouts();
}
void ProtoTerminalPart::sendTextClicked() {
- m_TextToSend = m_TextToSend + m_base->textInput->text();
- m_base->textInput->setText("");
+ if (!m_worker->syncPointActive()) {
+ m_TextToSend = m_TextToSend + m_base->textInput->text();
+ m_base->textInput->setText("");
- // Transmit!
- EXEC_NEXT_STATE_IMMEDIATELY
+ m_worker->appendItemToInboundQueue(ProtoTerminalEvent(ConsoleTextSend, TQVariant(m_TextToSend)), true);
+ m_base->textOutput->append("<<<" + m_TextToSend);
+ m_TextToSend = "";
+
+ emit wakeWorkerThread();
+ m_updateTimeoutTimer->start(NETWORK_COMM_TIMEOUT_MS, TRUE);
+ }
}
TDEAboutData* ProtoTerminalPart::createAboutData() {
diff --git a/clients/tde/src/part/prototerminal/part.h b/clients/tde/src/part/prototerminal/part.h
index 5bc0a02..a7ad658 100644
--- a/clients/tde/src/part/prototerminal/part.h
+++ b/clients/tde/src/part/prototerminal/part.h
@@ -15,7 +15,7 @@
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
- * (c) 2014 Timothy Pearson
+ * (c) 2014 - 2015 Timothy Pearson
* Raptor Engineering
* http://www.raptorengineeringinc.com
*/
@@ -23,6 +23,12 @@
#ifndef REMOTELAB_PROTOTERMINALPART_H
#define REMOTELAB_PROTOTERMINALPART_H
+#include <ntqthread.h>
+#include <ntqeventloop.h>
+
+#include <tqvariant.h>
+#include <tqvaluevector.h>
+
#include <tdeparts/browserextension.h>
#include <tdeparts/statusbarextension.h>
#include <tdeparts/part.h>
@@ -41,6 +47,51 @@ class ProtoTerminalBase;
namespace RemoteLab
{
+ typedef enum ProtoTerminalEventType {
+ ConsoleTextReceive = 0,
+ ConsoleTextSend = 1,
+ TxRxSyncPoint = 2
+ } ProtoTerminalEventType;
+
+ typedef TQPair<ProtoTerminalEventType, TQVariant> ProtoTerminalEvent;
+ typedef TQValueVector<ProtoTerminalEvent> ProtoTerminalEventQueue;
+
+ class ProtoTerminalWorker : public TQObject
+ {
+ TQ_OBJECT
+
+ public:
+ ProtoTerminalWorker();
+ ~ProtoTerminalWorker();
+
+ signals:
+ void outboundQueueUpdated();
+
+ public slots:
+ void run();
+ void wake();
+ void dataReceived();
+
+ public:
+ void appendItemToInboundQueue(ProtoTerminalEvent item, bool syncPoint=false);
+ bool syncPointActive();
+ void lockOutboundQueue();
+ void unlockOutboundQueue();
+ ProtoTerminalEventQueue* outboundQueue();
+
+ public:
+ TDEKerberosClientSocket* m_socket;
+ TQMutex* m_instrumentMutex;
+
+ private:
+ ProtoTerminalEventQueue m_outboundQueue;
+ ProtoTerminalEventQueue m_inboundQueue;
+ TQMutex* m_outboundQueueMutex;
+ TQMutex* m_inboundQueueMutex;
+ TQMutex* m_networkDataMutex;
+ bool m_newData;
+ };
+
class ProtoTerminalPart : public KParts::RemoteInstrumentPart
{
Q_OBJECT
@@ -53,8 +104,12 @@ namespace RemoteLab
virtual bool closeURL();
static TDEAboutData *createAboutData();
+ signals:
+ void wakeWorkerThread();
+
public slots:
virtual bool openURL(const KURL &url);
+ void processOutboundQueue();
private slots:
void postInit();
@@ -63,20 +118,23 @@ namespace RemoteLab
void disconnectFromServerCallback();
void connectionStatusChangedCallback();
void setTickerMessage(TQString message);
- void mainEventLoop();
+ void networkTick();
+ void networkTimeout();
void sendTextClicked();
private:
int m_commHandlerState;
int m_commHandlerMode;
int m_commHandlerCommandState;
- TQTimer* m_forcedUpdateTimer;
TQTimer* m_updateTimeoutTimer;
bool m_connectionActiveAndValid;
unsigned char m_tickerState;
ProtoTerminalBase* m_base;
TQMutex* m_instrumentMutex;
TQString m_TextToSend;
+
+ ProtoTerminalWorker* m_worker;
+ TQEventLoopThread* m_workerThread;
};
}