/* This file implements the Weaver, Job and Thread classes. $ Author: Mirko Boehm $ $ Copyright: (C) 2004, Mirko Boehm $ $ Contact: mirko@kde.org http://www.kde.org http://www.hackerbuero.org $ $ License: LGPL with the following explicit clarification: This code may be linked against any version of the TQt toolkit from Troll Tech, Norway. $ */ extern "C" { #include } #include #include #include "weaver.h" namespace KPIM { namespace ThreadWeaver { bool Debug = true; int DebugLevel = 2; Job::Job (TQObject* parent, const char* name) : TQObject (parent, name), m_finished (false), m_mutex (new TQMutex (true) ), m_thread (0) { } Job::~Job() { } void Job::lock() { m_mutex->lock(); } void Job::unlock() { m_mutex->unlock(); } void Job::execute(Thread *th) { m_mutex->lock(); m_thread = th; m_mutex->unlock(); run (); m_mutex->lock(); setFinished (true); m_thread = 0; m_mutex->unlock(); } Thread *Job::thread () { TQMutexLocker l (m_mutex); return m_thread; } bool Job::isFinished() const { TQMutexLocker l (m_mutex); return m_finished; } void Job::setFinished(bool status) { TQMutexLocker l (m_mutex); m_finished = status; } void Job::processEvent (Event *e) { switch ( e->action() ) { case Event::JobStarted: emit ( started() ); break; case Event::JobFinished: emit ( done() ); break; case Event::JobSPR: emit ( SPR () ); m_wc->wakeOne (); break; case Event::JobAPR: emit ( APR () ); // no wake here ! break; default: break; } } void Job::triggerSPR () { m_mutex->lock (); m_wc = new TQWaitCondition; m_mutex->unlock (); thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this); m_wc->wait (); m_mutex->lock (); delete m_wc; m_wc = 0; m_mutex->unlock (); } void Job::triggerAPR () { m_mutex->lock (); m_wc = new TQWaitCondition; m_mutex->unlock (); thread()->post (KPIM::ThreadWeaver::Event::JobAPR, this); m_wc->wait (); } void Job::wakeAPR () { TQMutexLocker l(m_mutex); if ( m_wc!=0 ) { m_wc->wakeOne (); delete m_wc; m_wc = 0; } } const int Event::Type = TQEvent::User + 1000; Event::Event ( Action action, Thread *thread, Job *job) : TQCustomEvent ( type () ), m_action (action), m_thread (thread), m_job (job) { } int Event::type () { return Type; } Thread* Event::thread () const { if ( m_thread != 0) { return m_thread; } else { return 0; } } Job* Event::job () const { return m_job; } Event::Action Event::action () const { return m_action; } unsigned int Thread::sm_Id; Thread::Thread (Weaver *parent) : TQThread (), m_parent ( parent ), m_id ( makeId() ) { } Thread::~Thread() { } unsigned int Thread::makeId() { static TQMutex mutex; TQMutexLocker l (&mutex); return ++sm_Id; } unsigned int Thread::id() const { return m_id; } void Thread::run() { Job *job = 0; post ( Event::ThreadStarted ); while (true) { debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() ); job = m_parent->applyForWork ( this, job ); if (job == 0) { break; } else { post ( Event::JobStarted, job ); job->execute (this); post ( Event::JobFinished, job ); } } post ( Event::ThreadExiting ); } void Thread::post (Event::Action a, Job *j) { m_parent->post ( a, this, j); } void Thread::msleep(unsigned long msec) { TQThread::msleep(msec); } Weaver::Weaver(TQObject* parent, const char* name, int inventoryMin, int inventoryMax) : TQObject(parent, name), m_active(0), m_inventoryMin(inventoryMin), m_inventoryMax(inventoryMax), m_shuttingDown(false), m_running (false), m_suspend (false), m_mutex ( new TQMutex(true) ) { lock(); for ( int count = 0; count < m_inventoryMin; ++count) { Thread *th = new Thread(this); m_inventory.append(th); // this will idle the thread, waiting for a job th->start(); emit (threadCreated (th) ); } unlock(); } Weaver::~Weaver() { lock(); debug ( 1, "Weaver dtor: destroying inventory.\n" ); m_shuttingDown = true; unlock(); m_jobAvailable.wakeAll(); // problem: Some threads might not be asleep yet, just finding // out if a job is available. Those threads will suspend // waiting for their next job (a rare case, but not impossible). // Therefore, if we encounter a thread that has not exited, we // have to wake it again (which we do in the following for // loop). for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() ) { if ( !th->finished() ) { m_jobAvailable.wakeAll(); th->wait(); } emit (threadDestroyed (th) ); delete th; } m_inventory.clear(); delete m_mutex; debug ( 1, "Weaver dtor: done\n" ); } void Weaver::lock() { debug ( 3 , "Weaver::lock: lock (mutex is %s).\n", ( m_mutex->locked() ? "locked" : "not locked" ) ); m_mutex->lock(); } void Weaver::unlock() { m_mutex->unlock(); debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n", ( m_mutex->locked() ? "locked" : "not locked" ) ); } int Weaver::threads () const { TQMutexLocker l (m_mutex); return m_inventory.count (); } void Weaver::enqueue(Job* job) { lock(); m_assignments.append(job); m_running = true; unlock(); assignJobs(); } void Weaver::enqueue (TQPtrList jobs) { lock(); for ( Job * job = jobs.first(); job; job = jobs.next() ) { m_assignments.append (job); } unlock(); assignJobs(); } bool Weaver::dequeue ( Job* job ) { TQMutexLocker l (m_mutex); return m_assignments.remove (job); } void Weaver::dequeue () { TQMutexLocker l (m_mutex); m_assignments.clear(); } void Weaver::suspend (bool state) { lock(); if (state) { // no need to wake any threads here m_suspend = true; if ( m_active == 0 && isEmpty() ) { // instead of waking up threads: post (Event::Suspended); } } else { m_suspend = false; // make sure we emit suspended () even if all threads are sleeping: assignJobs (); debug (2, "Weaver::suspend: queueing resumed.\n" ); } unlock(); } void Weaver::assignJobs() { m_jobAvailable.wakeAll(); } bool Weaver::event (TQEvent *e ) { if ( e->type() >= TQEvent::User ) { if ( e->type() == Event::type() ) { Event *event = (Event*) e; switch (event->action() ) { case Event::JobFinished: if ( event->job() !=0 ) { emit (jobDone (event->job() ) ); } break; case Event::Finished: emit ( finished() ); break; case Event::Suspended: emit ( suspended() ); break; case Event::ThreadSuspended: if (!m_shuttingDown ) { emit (threadSuspended ( event->thread() ) ); } break; case Event::ThreadBusy: if (!m_shuttingDown ) { emit (threadBusy (event->thread() ) ); } break; default: break; } if ( event->job() !=0 ) { event->job()->processEvent (event); } } else { debug ( 0, "Weaver::event: Strange: received unknown user event.\n" ); } return true; } else { // others - please make sure we are a TQObject! return TQObject::event ( e ); } } void Weaver::post (Event::Action a, Thread* t, Job* j) { Event *e = new Event ( a, t, j); TQApplication::postEvent (this, e); } bool Weaver::isEmpty() const { TQMutexLocker l (m_mutex); return m_assignments.count()==0; } Job* Weaver::applyForWork(Thread *th, Job* previous) { Job *rc = 0; bool lastjob = false; bool suspended = false; while (true) { lock(); if (previous != 0) { // cleanup and send events: --m_active; debug ( 3, "Weaver::applyForWork: job done, %i jobs left, " "%i active jobs left.\n", queueLength(), m_active ); if ( m_active == 0 && isEmpty() ) { lastjob = true; m_running = false; post (Event::Finished); debug ( 3, "Weaver::applyForWork: last job.\n" ); } if (m_active == 0 && m_suspend == true) { suspended = true; post (Event::Suspended); debug ( 2, "Weaver::applyForWork: queueing suspended.\n" ); } m_jobFinished.wakeOne(); } previous = 0; if (m_shuttingDown == true) { unlock(); return 0; } else { if ( !isEmpty() && m_suspend == false ) { rc = m_assignments.getFirst(); m_assignments.removeFirst (); ++m_active; debug ( 3, "Weaver::applyForWork: job assigned, " "%i jobs in queue (%i active).\n", m_assignments.count(), m_active ); unlock(); post (Event::ThreadBusy, th); return rc; } else { unlock(); post (Event::ThreadSuspended, th); m_jobAvailable.wait(); } } } } int Weaver::queueLength() { TQMutexLocker l (m_mutex); return m_assignments.count(); } bool Weaver::isIdle () const { TQMutexLocker l (m_mutex); return isEmpty() && m_active == 0; } void Weaver::finish() { while ( !isIdle() ) { debug (2, "Weaver::finish: not done, waiting.\n" ); m_jobFinished.wait(); } debug (1, "Weaver::finish: done.\n\n\n" ); } } } #include "weaver.moc"