You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
arts/flow/gslschedule.cc

1196 lines
26 KiB

/*
Copyright (C) 2000-2002 Stefan Westerfeld
stefan@space.twc.de
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public License
along with this library; see the file COPYING.LIB. If not, write to
the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1301, USA.
*/
#include "config.h"
#include "virtualports.h"
#include "startupmanager.h"
#include "gslschedule.h"
#include "debug.h"
#include "asyncschedule.h"
#include "audiosubsys.h"
#include <gsl/gslcommon.h>
#include <gsl/gslengine.h>
#include <algorithm>
#include <stdio.h>
#include <iostream>
#include <stack>
/* HACK */
class GslMainLoop {
protected:
std::list<GslClass *> freeClassList;
public:
GslEngineLoop loop;
static bool waitOnTransNeedData;
static bool gslDataCalculated;
/* static check function */
static gboolean gslCheck(gpointer /* data */, guint /* n_values */,
glong* /* timeout_p */,
guint /* n_fds */, const GPollFD* /* fds */,
gboolean /* revents_filled */)
{
return waitOnTransNeedData;
}
/* mainloop integration: initialize (called to get initial loop setup) */
void initialize()
{
gsl_transact(gsl_job_add_poll (gslCheck, 0, 0, 0, 0), 0);
gsl_engine_prepare(&loop);
for(unsigned int i = 0; i != loop.n_fds; i++)
{
printf("TODO: engine fd %d\n",i);
}
}
/* mainloop integration: process (TODO - should be called by IOManager) */
void process()
{
printf("TODO: mainloop wrapper for fd watches\n");
if(gsl_engine_check(&loop))
gsl_engine_dispatch();
}
/* wait for a transaction */
void waitOnTrans()
{
arts_return_if_fail(waitOnTransNeedData == false);
gsl_engine_wait_on_trans();
}
/* make the engine calculate something */
void run()
{
waitOnTransNeedData = true;
gslDataCalculated = false;
while(!gslDataCalculated && gsl_engine_check(&loop))
gsl_engine_dispatch();
gslDataCalculated = false;
waitOnTransNeedData = false;
if(!freeClassList.empty())
{
/*
* make sure that all transactions that are still pending
* get finished (especially important in threaded case,
* since an entry in the free list doesn't necessarily
* mean that the module has entierly been freed)
*/
waitOnTrans();
std::list<GslClass *>::iterator fi;
for(fi = freeClassList.begin(); fi != freeClassList.end(); fi++)
free(*fi);
freeClassList.clear();
}
}
void freeGslClass(GslClass *klass)
{
freeClassList.push_back(klass);
}
} gslMainLoop;
bool GslMainLoop::waitOnTransNeedData = false;
bool GslMainLoop::gslDataCalculated = false;
namespace Arts { extern void *gslGlobalMutexTable; }
using namespace std;
using namespace Arts;
// ----------- Port -----------
Port::Port(const string& name, void *ptr, long flags, StdScheduleNode* parent)
: _name(name), _ptr(ptr), _flags((AttributeType)flags),
parent(parent), _dynamicPort(false)
{
_vport = new VPort(this);
}
Port::~Port()
{
if(_vport)
delete _vport;
}
AttributeType Port::flags()
{
return _flags;
}
string Port::name()
{
return _name;
}
ASyncPort *Port::asyncPort()
{
return 0;
}
AudioPort *Port::audioPort()
{
return 0;
}
void Port::addAutoDisconnect(Port *source)
{
autoDisconnect.push_back(source);
source->autoDisconnect.push_back(this);
}
void Port::removeAutoDisconnect(Port *source)
{
std::list<Port *>::iterator adi;
// remove our autodisconnection entry for source port
adi = find(autoDisconnect.begin(),autoDisconnect.end(),source);
assert(adi != autoDisconnect.end());
autoDisconnect.erase(adi);
// remove the source port autodisconnection entry to us
adi=find(source->autoDisconnect.begin(),source->autoDisconnect.end(),this);
assert(adi != source->autoDisconnect.end());
source->autoDisconnect.erase(adi);
}
void Port::disconnectAll()
{
if(_vport)
delete _vport;
_vport = 0;
assert(autoDisconnect.empty());
while(!autoDisconnect.empty())
{
Port *other = *autoDisconnect.begin();
// syntax is disconnect(source)
if(_flags & streamIn)
// if we're incoming, other port is source
vport()->disconnect(other->vport());
else
// if we're outgoing, we're the source
other->vport()->disconnect(this->vport());
}
}
void Port::setPtr(void *ptr)
{
_ptr = ptr;
}
// ------- AudioPort ---------
AudioPort::AudioPort(const string& name,
void *ptr, long flags,StdScheduleNode *parent)
: Port(name,ptr,flags,parent)
{
destcount = 0;
sourcemodule = 0;
source = 0;
gslIsConstant = false;
}
AudioPort::~AudioPort()
{
//
}
AudioPort *AudioPort::audioPort()
{
return this;
}
void AudioPort::setFloatValue(float f)
{
gslIsConstant = true;
gslConstantValue = f;
parent->_connectionCountChanged = true;
}
void AudioPort::connect(Port *psource)
{
if (source) return; // Error, should not happen (See BR70028)
source = psource->audioPort();
assert(source);
addAutoDisconnect(psource);
source->parent->_connectionCountChanged = parent->_connectionCountChanged = true;
source->destcount++;
sourcemodule = source->parent;
// GSL connect
GslTrans *trans = gsl_trans_open();
gsl_trans_add(trans, gsl_job_connect(source->parent->gslModule,
source->gslEngineChannel,
parent->gslModule,
gslEngineChannel));
gsl_trans_commit(trans);
}
void AudioPort::disconnect(Port *psource)
{
if (!source || source != psource->audioPort()) return; // Error, should not happen (See BR70028)
assert(source);
assert(source == psource->audioPort());
removeAutoDisconnect(psource);
assert(sourcemodule == source->parent);
sourcemodule = 0;
source->parent->_connectionCountChanged = parent->_connectionCountChanged = true;
source->destcount--;
source = 0;
// GSL disconnect
GslTrans *trans = gsl_trans_open();
gsl_trans_add(trans, gsl_job_disconnect(parent->gslModule,
gslEngineChannel));
gsl_trans_commit(trans);
}
// --------- MultiPort ----------
MultiPort::MultiPort(const string& name,
void *ptr, long flags,StdScheduleNode *parent)
: Port(name,ptr,flags,parent)
{
conns = 0;
nextID = 0;
initConns();
}
MultiPort::~MultiPort()
{
if(conns)
{
delete[] conns;
conns = 0;
}
}
void MultiPort::initConns()
{
if(conns != 0) delete[] conns;
conns = new float_ptr[parts.size() + 1];
conns[parts.size()] = (float *)0;
*(float ***)_ptr = conns;
long n = 0;
std::list<Part>::iterator i;
for(i = parts.begin();i != parts.end(); i++)
{
AudioPort *p = i->dest;
p->setPtr((void *)&conns[n++]);
}
}
void MultiPort::connect(Port *port)
{
AudioPort *dport;
char sid[20];
sprintf(sid,"%ld",nextID++);
addAutoDisconnect(port);
dport = new AudioPort("_"+_name+string(sid),0,streamIn,parent);
Part part;
part.src = (AudioPort *)port;
part.dest = dport;
parts.push_back(part);
initConns();
parent->addDynamicPort(dport);
dport->vport()->connect(port->vport());
}
void MultiPort::disconnect(Port *sport)
{
AudioPort *port = (AudioPort *)sport;
removeAutoDisconnect(sport);
std::list<Part>::iterator i;
for(i = parts.begin(); i != parts.end(); i++)
{
if(i->src == port)
{
AudioPort *dport = i->dest;
parts.erase(i);
initConns();
dport->vport()->disconnect(port->vport());
parent->removeDynamicPort(dport);
delete dport;
return;
}
}
}
// -------- StdScheduleNode ---------
void StdScheduleNode::freeConn()
{
if(inConn)
{
delete[] inConn;
inConn = 0;
}
if(outConn)
{
delete[] outConn;
outConn = 0;
}
inConnCount = outConnCount = 0;
if(gslModule)
{
gsl_transact(gsl_job_discard(gslModule),0);
gslModule = 0;
gslRunning = false;
}
}
void StdScheduleNode::gslProcess(GslModule *module, guint n_values)
{
StdScheduleNode *node = (StdScheduleNode *)module->user_data;
if(!node->running) /* FIXME: need reasonable suspend in the engine */
return;
arts_return_if_fail(node->module != 0);
GslMainLoop::gslDataCalculated = true;
unsigned long j;
for(j=0;j<node->inConnCount;j++)
{
if(node->inConn[j]->gslIsConstant)
*((float **)node->inConn[j]->_ptr) =
gsl_engine_const_values(node->inConn[j]->gslConstantValue);
else
*((float **)node->inConn[j]->_ptr) = const_cast<float *>(module->istreams[j].values);
}
for(j=0;j<node->outConnCount;j++)
*((float **)node->outConn[j]->_ptr) = module->ostreams[j].values;
node->module->calculateBlock(n_values);
}
static void gslModuleFree(gpointer /* data */, const GslClass *klass)
{
gslMainLoop.freeGslClass(const_cast<GslClass *>(klass));
}
void StdScheduleNode::rebuildConn()
{
std::list<Port *>::iterator i;
freeConn();
inConnCount = outConnCount = 0;
inConn = new AudioPort_ptr[ports.size()];
outConn = new AudioPort_ptr[ports.size()];
for(i=ports.begin();i != ports.end();i++)
{
AudioPort *p = (*i)->audioPort();
if(p)
{
if(p->flags() & streamIn)
{
p->gslEngineChannel = inConnCount;
inConn[inConnCount++] = p;
}
if(p->flags() & streamOut)
{
p->gslEngineChannel = outConnCount;
outConn[outConnCount++] = p;
}
}
}
/* create GSL node */
GslClass *gslClass = (GslClass *)calloc(sizeof(GslClass),1);
gslClass->n_istreams = inConnCount;
gslClass->n_ostreams = outConnCount;
gslClass->process = gslProcess;
gslClass->free = gslModuleFree;
gslModule = gsl_module_new (gslClass, (StdScheduleNode *)this);
GslTrans *trans = gsl_trans_open();
gsl_trans_add(trans,gsl_job_integrate(gslModule));
gsl_trans_add(trans,gsl_job_set_consumer(gslModule, running));
gslRunning = running;
/* since destroying the old module and creating a new one will destroy
* all the connections, we need to restore them here
*/
unsigned int c;
for(c = 0; c < inConnCount; c++)
{
if(inConn[c]->source)
{
gsl_trans_add(trans,
gsl_job_connect(inConn[c]->source->parent->gslModule,
inConn[c]->source->gslEngineChannel,
inConn[c]->parent->gslModule,
inConn[c]->gslEngineChannel));
}
}
for(c = 0; c < outConnCount; c++)
{
std::list<Port *>::iterator ci;
for(ci = outConn[c]->autoDisconnect.begin();
ci != outConn[c]->autoDisconnect.end(); ci++)
{
AudioPort *dest = (*ci)->audioPort();
if( dest )
{
gsl_trans_add(trans,
gsl_job_connect(outConn[c]->parent->gslModule,
outConn[c]->gslEngineChannel,
dest->parent->gslModule,
dest->gslEngineChannel));
}
else
{
arts_debug( "no audio port: %s for %s", ( *ci )->name().c_str(), _object->_interfaceName().c_str() );
}
}
}
gsl_trans_commit(trans);
}
Object_skel *StdScheduleNode::object()
{
return _object;
}
void *StdScheduleNode::cast(const string &target)
{
if(target == "StdScheduleNode") return (StdScheduleNode *)this;
return 0;
}
void StdScheduleNode::accessModule()
{
if(module) return;
module = (SynthModule_base *)_object->_cast(Arts::SynthModule_base::_IID);
if(!module)
arts_warning("Error using interface %s in the flowsystem: only objects"
" implementing Arts::SynthModule should carry streams.",
_object->_interfaceName().c_str());
}
StdScheduleNode::StdScheduleNode(Object_skel *object, StdFlowSystem *flowSystem) : ScheduleNode(object)
{
_object = object;
this->flowSystem = flowSystem;
running = false;
suspended = false;
module = 0;
gslModule = 0;
gslRunning = false;
queryInitStreamFunc = 0;
inConn = outConn = 0;
inConnCount = outConnCount = 0;
}
StdScheduleNode::~StdScheduleNode()
{
/* stop module if still running */
if(running) stop();
/* disconnect all ports */
stack<Port *> disconnect_stack;
/*
* we must be a bit careful here, as dynamic ports (which are created
* for connections by MultiPorts) will suddenly start disappearing, so
* we better make a copy of those ports that will stay, and disconnect
* them then
*/
std::list<Port *>::iterator i;
for(i=ports.begin();i != ports.end();i++)
{
if(!(*i)->dynamicPort()) disconnect_stack.push(*i);
}
while(!disconnect_stack.empty())
{
disconnect_stack.top()->disconnectAll();
disconnect_stack.pop();
}
/* free them */
for(i=ports.begin();i != ports.end();i++)
delete (*i);
ports.clear();
freeConn();
}
void StdScheduleNode::initStream(const string& name, void *ptr, long flags)
{
if(flags == -1)
{
queryInitStreamFunc = (QueryInitStreamFunc)ptr;
}
else if(flags & streamAsync)
{
ports.push_back(new ASyncPort(name,ptr,flags,this));
}
else if(flags & streamMulti)
{
ports.push_back(new MultiPort(name,ptr,flags,this));
}
else
{
ports.push_back(new AudioPort(name,ptr,flags,this));
}
// TODO: maybe initialize a bit later
rebuildConn();
}
void StdScheduleNode::addDynamicPort(Port *port)
{
port->setDynamicPort();
ports.push_back(port);
rebuildConn();
}
void StdScheduleNode::removeDynamicPort(Port *port)
{
std::list<Port *>::iterator i;
for(i=ports.begin();i!=ports.end();i++)
{
Port *p = *i;
if(p->name() == port->name())
{
ports.erase(i);
rebuildConn();
return;
}
}
}
void StdScheduleNode::start()
{
assert(!running);
running = true;
//cout << "start" << endl;
accessModule();
module->streamInit();
module->streamStart();
flowSystem->startedChanged();
}
void StdScheduleNode::stop()
{
assert(running);
running = false;
accessModule();
module->streamEnd();
flowSystem->startedChanged();
}
void StdScheduleNode::requireFlow()
{
// cout << "rf" << module->_interfaceName() << endl;
flowSystem->updateStarted();
gslMainLoop.run();
}
AutoSuspendState StdScheduleNode::suspendable()
{
if(running) {
accessModule();
return module->autoSuspend();
}
// if its not running, who cares?
return asSuspend;
}
void StdScheduleNode::suspend()
{
if(running) {
accessModule();
suspended = true;
if((module->autoSuspend() & asSuspendMask) == asSuspendStop) stop();
}
}
void StdScheduleNode::restart()
{
if(suspended) {
accessModule();
suspended = false;
if(!running && (module->autoSuspend() & asSuspendMask) == asSuspendStop) start();
}
}
Port *StdScheduleNode::findPort(const string& name)
{
std::list<Port *>::iterator i;
for(i=ports.begin();i!=ports.end();i++)
{
Port *p = *i;
if(p->name() == name) return p;
}
if(queryInitStreamFunc)
{
if(queryInitStreamFunc(_object,name))
{
for(i=ports.begin();i!=ports.end();i++)
{
Port *p = *i;
if(p->name() == name) return p;
}
}
}
return 0;
}
void StdScheduleNode::virtualize(const std::string& port,
ScheduleNode *implNode,
const std::string& implPort)
{
StdScheduleNode *impl=(StdScheduleNode *)implNode->cast("StdScheduleNode");
if(impl)
{
Port *p1 = findPort(port);
Port *p2 = impl->findPort(implPort);
assert(p1);
assert(p2);
p1->vport()->virtualize(p2->vport());
}
}
void StdScheduleNode::devirtualize(const std::string& port,
ScheduleNode *implNode,
const std::string& implPort)
{
StdScheduleNode *impl=(StdScheduleNode *)implNode->cast("StdScheduleNode");
if(impl)
{
Port *p1 = findPort(port);
Port *p2 = impl->findPort(implPort);
p1->vport()->devirtualize(p2->vport());
}
}
void StdScheduleNode::connect(const string& port, ScheduleNode *dest,
const string& destport)
{
RemoteScheduleNode *rsn = dest->remoteScheduleNode();
if(rsn)
{
// RemoteScheduleNodes know better how to connect remotely
rsn->connect(destport,this,port);
return;
}
flowSystem->restart();
Port *p1 = findPort(port);
Port *p2 = ((StdScheduleNode *)dest)->findPort(destport);
if(p1 && p2)
{
if((p1->flags() & streamIn) && (p2->flags() & streamOut))
{
p1->vport()->connect(p2->vport());
}
else if((p2->flags() & streamIn) && (p1->flags() & streamOut))
{
p2->vport()->connect(p1->vport());
}
}
}
void StdScheduleNode::disconnect(const string& port, ScheduleNode *dest,
const string& destport)
{
RemoteScheduleNode *rsn = dest->remoteScheduleNode();
if(rsn)
{
// RemoteScheduleNodes know better how to disconnect remotely
rsn->disconnect(destport,this,port);
return;
}
flowSystem->restart();
Port *p1 = findPort(port);
Port *p2 = ((StdScheduleNode *)dest)->findPort(destport);
if(p1 && p2)
{
if((p1->flags() & streamIn) && (p2->flags() & streamOut))
{
p1->vport()->disconnect(p2->vport());
}
else if((p2->flags() & streamIn) && (p1->flags() & streamOut))
{
p2->vport()->disconnect(p1->vport());
}
}
}
AttributeType StdScheduleNode::queryFlags(const std::string& port)
{
arts_debug("findPort(%s)", port.c_str());
arts_debug("have %ld ports", ports.size());
Port *p1 = findPort(port);
arts_debug("done");
if(p1)
{
arts_debug("result %d",(long)p1->flags());
return p1->flags();
}
arts_debug("failed");
return (AttributeType)0;
}
void StdScheduleNode::setFloatValue(const string& port, float value)
{
AudioPort *p = findPort(port)->audioPort();
if(p) {
p->vport()->setFloatValue(value);
} else {
assert(false);
}
}
unsigned long StdScheduleNode::inputConnectionCount(const string& port)
{
unsigned long result = 0;
unsigned int c;
for(c = 0; c < inConnCount; c++)
{
if(inConn[c]->name() == port)
{
if(inConn[c]->source || inConn[c]->gslIsConstant)
result++;
}
}
return result;
}
unsigned long StdScheduleNode::outputConnectionCount(const string& port)
{
unsigned long result = 0;
unsigned int c;
for(c = 0; c < outConnCount; c++)
{
if(outConn[c]->name() == port)
result += outConn[c]->destcount;
}
return result;
}
StdFlowSystem::StdFlowSystem()
{
_suspended = false;
needUpdateStarted = false;
/* TODO: correct parameters */
static bool gsl_is_initialized = false;
if(!gsl_is_initialized)
{
GslConfigValue values[3] = {
{ "wave_chunk_padding", 8 },
{ "dcache_block_size", 4000, },
{ 0, 0 }
};
gsl_is_initialized = true;
if (!g_thread_supported ())
g_thread_init(0);
gsl_init(values, (GslMutexTable *)gslGlobalMutexTable);
/*
* FIXME: both of these are really supposed to be tunable
* - the 512 because of low-latency apps, where calculating smaller
* block sizes might be necessary
* - the 44100 because of the obvious reason, that not every artsd
* is running at that rate
*/
gsl_engine_init(false, 512, 44100, /* subsamplemask */ 63);
if(gslGlobalMutexTable)
arts_debug("gsl: using Unix98 pthreads directly for mutexes and conditions");
/*gsl_engine_debug_enable(GslEngineDebugLevel(GSL_ENGINE_DEBUG_JOBS | GSL_ENGINE_DEBUG_SCHED));*/
}
gslMainLoop.initialize();
}
ScheduleNode *StdFlowSystem::addObject(Object_skel *object)
{
// do not add new modules when being in suspended state
restart();
StdScheduleNode *node = new StdScheduleNode(object,this);
nodes.push_back(node);
return node;
}
void StdFlowSystem::removeObject(ScheduleNode *node)
{
StdScheduleNode *xnode = (StdScheduleNode *)node->cast("StdScheduleNode");
assert(xnode);
nodes.remove(xnode);
delete xnode;
}
bool StdFlowSystem::suspended()
{
return _suspended;
}
bool StdFlowSystem::suspendable()
{
/*
* What it does:
* -------------
*
* The suspension algorithm will first divide the graph of modules into
* subgraphs of interconnected modules. A subgraph is suspendable if
* all of its modules are suspendable and the subgraph does not contain
* producer(s) and consumer(s) at the same time.
*
* Finally, our module graph is suspendable if all its subgraphs are.
*
* How it is implemented:
* ----------------------
*
* For efficiency reasons, both steps are merged together. First all
* modules will be marked as unseen. Then a module is picked and
* all modules that it connects to are recursively added to the
* subgraph.
*/
/*
* initialization: no nodes are seen
*/
std::list<StdScheduleNode *>::iterator i;
for (i = nodes.begin(); i != nodes.end(); i++)
{
StdScheduleNode *node = *i;
node->suspendTag = false;
}
stack<StdScheduleNode*> todo;
for(i = nodes.begin(); i != nodes.end(); i++)
{
bool haveConsumer = false;
bool haveProducer = false;
/*
* examine the subgraph consisting of all nodes connected to (*i)
* (only will do anything if suspendTag is not already set)
*/
todo.push(*i);
do
{
StdScheduleNode *node = todo.top();
todo.pop();
if(!node->suspendTag)
{
node->suspendTag = true; // never examine this node again
switch (node->suspendable())
{
case asNoSuspend|asProducer:
case asNoSuspend|asConsumer:
case asNoSuspend:
return false;
break;
case asSuspend:
case asSuspendStop:
/* nothing */
break;
case asSuspend|asProducer:
case asSuspendStop|asProducer:
if(haveConsumer)
return false;
else
haveProducer = true;
break;
case asSuspend|asConsumer:
case asSuspendStop|asConsumer:
if(haveProducer)
return false;
else
haveConsumer = true;
break;
default:
arts_fatal("bad suspend value %d", node->suspendable());
}
unsigned int c;
for(c = 0; c < node->inConnCount; c++)
{
if(node->inConn[c]->source)
todo.push(node->inConn[c]->source->parent);
}
for(c = 0; c < node->outConnCount; c++)
{
std::list<Port *>::iterator ci;
for(ci = node->outConn[c]->autoDisconnect.begin();
ci != node->outConn[c]->autoDisconnect.end(); ci++)
{
AudioPort *dest = (*ci)->audioPort();
if(dest)
todo.push(dest->parent);
}
}
}
} while(!todo.empty());
}
return true;
}
void StdFlowSystem::suspend()
{
if(!_suspended)
{
std::list<StdScheduleNode *>::iterator i;
for(i = nodes.begin();i != nodes.end();i++)
{
StdScheduleNode *node = *i;
node->suspend();
}
_suspended = true;
}
}
void StdFlowSystem::restart()
{
if(_suspended)
{
std::list<StdScheduleNode *>::iterator i;
for(i = nodes.begin();i != nodes.end();i++)
{
StdScheduleNode *node = *i;
node->restart();
}
_suspended = false;
}
}
/* remote accessibility */
void StdFlowSystem::startObject(Object node)
{
StdScheduleNode *sn =
(StdScheduleNode *)node._node()->cast("StdScheduleNode");
sn->start();
}
void StdFlowSystem::stopObject(Object node)
{
StdScheduleNode *sn =
(StdScheduleNode *)node._node()->cast("StdScheduleNode");
sn->stop();
}
void StdFlowSystem::connectObject(Object sourceObject,const string& sourcePort,
Object destObject, const std::string& destPort)
{
arts_debug("connect port %s to %s", sourcePort.c_str(), destPort.c_str());
StdScheduleNode *sn =
(StdScheduleNode *)sourceObject._node()->cast("StdScheduleNode");
assert(sn);
Port *port = sn->findPort(sourcePort);
assert(port);
StdScheduleNode *destsn =
(StdScheduleNode *)destObject._node()->cast("StdScheduleNode");
if(destsn)
{
sn->connect(sourcePort,destsn,destPort);
return;
}
ASyncPort *ap = port->asyncPort();
if(ap)
{
FlowSystemSender sender;
FlowSystemReceiver receiver;
FlowSystem remoteFs;
string dest = destObject.toString() + ":" + destPort;
ASyncNetSend *netsend = new ASyncNetSend(ap, dest);
sender = FlowSystemSender::_from_base(netsend); // don't release netsend
remoteFs = destObject._flowSystem();
receiver = remoteFs.createReceiver(destObject, destPort, sender);
netsend->setReceiver(receiver);
arts_debug("connected an asyncnetsend");
}
}
void StdFlowSystem::disconnectObject(Object sourceObject,
const string& sourcePort, Object destObject, const std::string& destPort)
{
arts_debug("disconnect port %s and %s",sourcePort.c_str(),destPort.c_str());
StdScheduleNode *sn =
(StdScheduleNode *)sourceObject._node()->cast("StdScheduleNode");
assert(sn);
Port *port = sn->findPort(sourcePort);
assert(port);
StdScheduleNode *destsn =
(StdScheduleNode *)destObject._node()->cast("StdScheduleNode");
if(destsn)
{
sn->disconnect(sourcePort,destsn,destPort);
return;
}
ASyncPort *ap = port->asyncPort();
if(ap)
{
string dest = destObject.toString() + ":" + destPort;
ap->disconnectRemote(dest);
arts_debug("disconnected an asyncnetsend");
}
}
AttributeType StdFlowSystem::queryFlags(Object node, const std::string& port)
{
StdScheduleNode *sn =
(StdScheduleNode *)node._node()->cast("StdScheduleNode");
assert(sn);
return sn->queryFlags(port);
}
void StdFlowSystem::setFloatValue(Object node, const std::string& port,
float value)
{
StdScheduleNode *sn =
(StdScheduleNode *)node._node()->cast("StdScheduleNode");
assert(sn);
sn->setFloatValue(port,value);
}
FlowSystemReceiver StdFlowSystem::createReceiver(Object object,
const string &port, FlowSystemSender sender)
{
StdScheduleNode *sn =
(StdScheduleNode *)object._node()->cast("StdScheduleNode");
Port *p = sn->findPort(port);
assert(p);
ASyncPort *ap = p->asyncPort();
if(ap)
{
arts_debug("creating packet receiver");
return FlowSystemReceiver::_from_base(new ASyncNetReceive(ap, sender));
}
return FlowSystemReceiver::null();
}
void StdFlowSystem::updateStarted()
{
if(!needUpdateStarted)
return;
needUpdateStarted = false;
std::list<StdScheduleNode*>::iterator ni;
GslTrans *trans = 0;
for(ni = nodes.begin(); ni != nodes.end(); ni++)
{
StdScheduleNode *node = *ni;
if(node->running != node->gslRunning)
{
if(!trans)
trans = gsl_trans_open();
gsl_trans_add(trans, gsl_job_set_consumer(node->gslModule, node->running));
node->gslRunning = node->running;
}
}
if(trans)
gsl_trans_commit(trans);
}
void StdFlowSystem::startedChanged()
{
needUpdateStarted = true;
}
// hacked initialization of Dispatcher::the()->flowSystem ;)
namespace Arts {
static class SetFlowSystem : public StartupClass {
FlowSystem_impl *fs;
public:
void startup()
{
fs = new StdFlowSystem;
Dispatcher::the()->setFlowSystem(fs);
}
void shutdown()
{
fs->_release();
}
} sfs;
}