You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
arts/flow/gsl/gslengine.c

754 lines
20 KiB

/* GSL Engine - Flow module operation engine
* Copyright (C) 2001 Tim Janik
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "gslengine.h"
#include "gslcommon.h"
#include "gslopnode.h"
#include "gslopmaster.h"
/* --- prototypes --- */
static void wakeup_master (void);
/* --- UserThread --- */
GslModule*
gsl_module_new (const GslClass *klass,
gpointer user_data)
{
EngineNode *node;
guint i;
g_return_val_if_fail (klass != NULL, NULL);
g_return_val_if_fail (klass->process != NULL || klass->process_defer != NULL, NULL);
if (klass->process_defer)
{
g_warning ("%s: Delay cycle processing not yet implemented", G_STRLOC);
return NULL;
}
node = gsl_new_struct0 (EngineNode, 1);
/* setup GslModule */
node->module.klass = klass;
node->module.user_data = user_data;
node->module.istreams = klass->n_istreams ? gsl_new_struct0 (GslIStream, ENGINE_NODE_N_ISTREAMS (node)) : NULL;
node->module.jstreams = klass->n_jstreams ? gsl_new_struct0 (GslJStream, ENGINE_NODE_N_JSTREAMS (node)) : NULL;
node->module.ostreams = _engine_alloc_ostreams (ENGINE_NODE_N_OSTREAMS (node));
/* setup EngineNode */
node->inputs = ENGINE_NODE_N_ISTREAMS (node) ? gsl_new_struct0 (EngineInput, ENGINE_NODE_N_ISTREAMS (node)) : NULL;
node->jinputs = ENGINE_NODE_N_JSTREAMS (node) ? gsl_new_struct0 (EngineJInput*, ENGINE_NODE_N_JSTREAMS (node)) : NULL;
node->outputs = ENGINE_NODE_N_OSTREAMS (node) ? gsl_new_struct0 (EngineOutput, ENGINE_NODE_N_OSTREAMS (node)) : NULL;
node->output_nodes = NULL;
node->integrated = FALSE;
gsl_rec_mutex_init (&node->rec_mutex);
for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++)
{
node->outputs[i].buffer = node->module.ostreams[i].values;
node->module.ostreams[i].sub_sample_pattern = gsl_engine_sub_sample_test (node->module.ostreams[i].values);
}
node->flow_jobs = NULL;
node->fjob_first = NULL;
node->fjob_last = NULL;
return &node->module;
}
/**
* gsl_module_tick_stamp
* @module: a GSL engine module
* @RETURNS: the module's tick stamp, indicating its process status
*
* Any thread may call this function on a valid engine module.
* The module specific tick stamp is updated to gsl_tick_stamp() +
* @n_values every time its GslProcessFunc() function was
* called. See also gsl_tick_stamp().
*/
guint64
gsl_module_tick_stamp (GslModule *module)
{
g_return_val_if_fail (module != NULL, 0);
return ENGINE_NODE (module)->counter;
}
/**
* gsl_job_integrate
* @module: The module to integrate
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job to integrate @module into the engine.
*/
GslJob*
gsl_job_integrate (GslModule *module)
{
GslJob *job;
g_return_val_if_fail (module != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_INTEGRATE;
job->data.node = ENGINE_NODE (module);
return job;
}
/**
* gsl_job_discard
* @module: The module to discard
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which removes @module from the
* engine and destroys it.
*/
GslJob*
gsl_job_discard (GslModule *module)
{
GslJob *job;
g_return_val_if_fail (module != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_DISCARD;
job->data.node = ENGINE_NODE (module);
return job;
}
/**
* gsl_job_connect
* @src_module: Module with output stream
* @src_ostream: Index of output stream of @src_module
* @dest_module: Module with unconnected input stream
* @dest_istream: Index of input stream of @dest_module
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which connects the output stream @src_ostream
* of module @src_module to the input stream @dest_istream of module @dest_module
* (it is an error if the input stream is already connected by the time the job
* is executed).
*/
GslJob*
gsl_job_connect (GslModule *src_module,
guint src_ostream,
GslModule *dest_module,
guint dest_istream)
{
GslJob *job;
g_return_val_if_fail (src_module != NULL, NULL);
g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL);
g_return_val_if_fail (dest_module != NULL, NULL);
g_return_val_if_fail (dest_istream < dest_module->klass->n_istreams, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_ICONNECT;
job->data.connection.dest_node = ENGINE_NODE (dest_module);
job->data.connection.dest_ijstream = dest_istream;
job->data.connection.src_node = ENGINE_NODE (src_module);
job->data.connection.src_ostream = src_ostream;
return job;
}
GslJob*
gsl_job_jconnect (GslModule *src_module,
guint src_ostream,
GslModule *dest_module,
guint dest_jstream)
{
GslJob *job;
g_return_val_if_fail (src_module != NULL, NULL);
g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL);
g_return_val_if_fail (dest_module != NULL, NULL);
g_return_val_if_fail (dest_jstream < dest_module->klass->n_jstreams, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_JCONNECT;
job->data.connection.dest_node = ENGINE_NODE (dest_module);
job->data.connection.dest_ijstream = dest_jstream;
job->data.connection.src_node = ENGINE_NODE (src_module);
job->data.connection.src_ostream = src_ostream;
return job;
}
/**
* gsl_job_disconnect
* @dest_module: Module with connected input stream
* @dest_istream: Index of input stream of @dest_module
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which causes the input stream @dest_istream
* of @dest_module to be disconnected (it is an error if the input stream isn't
* connected by the time the job is executed).
*/
GslJob*
gsl_job_disconnect (GslModule *dest_module,
guint dest_istream)
{
GslJob *job;
g_return_val_if_fail (dest_module != NULL, NULL);
g_return_val_if_fail (dest_istream < dest_module->klass->n_istreams, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_IDISCONNECT;
job->data.connection.dest_node = ENGINE_NODE (dest_module);
job->data.connection.dest_ijstream = dest_istream;
job->data.connection.src_node = NULL;
job->data.connection.src_ostream = ~0;
return job;
}
GslJob*
gsl_job_jdisconnect (GslModule *dest_module,
guint dest_jstream,
GslModule *src_module,
guint src_ostream)
{
GslJob *job;
g_return_val_if_fail (dest_module != NULL, NULL);
g_return_val_if_fail (dest_jstream < dest_module->klass->n_jstreams, NULL);
g_return_val_if_fail (src_module != NULL, NULL);
g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_JDISCONNECT;
job->data.connection.dest_node = ENGINE_NODE (dest_module);
job->data.connection.dest_ijstream = dest_jstream;
job->data.connection.src_node = ENGINE_NODE (src_module);
job->data.connection.src_ostream = src_ostream;
return job;
}
GslJob*
gsl_job_set_consumer (GslModule *module,
gboolean is_toplevel_consumer)
{
GslJob *job;
g_return_val_if_fail (module != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = is_toplevel_consumer ? ENGINE_JOB_SET_CONSUMER : ENGINE_JOB_UNSET_CONSUMER;
job->data.node = ENGINE_NODE (module);
return job;
}
/**
* GslAccessFunc
* @module: Module to operate on
* @data: Accessor data
*
* The GslAccessFunc is a user supplied callback function which can access
* a module in times it is not processing. Accessors are usually used to
* either read out a module's current state, or to modify its state. An
* accessor may only operate on the @data and the @module passed
* in to it.
*/
/**
* gsl_job_access
* @module: The module to access
* @access_func: The accessor function
* @data: Data passed in to the accessor
* @free_func: Function to free @data
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which will invoke @access_func
* on @module with @data when the transaction queue is processed
* to modify the module's state.
*/
GslJob*
gsl_job_access (GslModule *module,
GslAccessFunc access_func,
gpointer data,
GslFreeFunc free_func)
{
GslJob *job;
g_return_val_if_fail (module != NULL, NULL);
g_return_val_if_fail (access_func != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_ACCESS;
job->data.access.node = ENGINE_NODE (module);
job->data.access.access_func = access_func;
job->data.access.data = data;
job->data.access.free_func = free_func;
return job;
}
/**
* gsl_flow_job_access
*/
GslJob*
gsl_flow_job_access (GslModule *module,
guint64 tick_stamp,
GslAccessFunc access_func,
gpointer data,
GslFreeFunc free_func)
{
GslJob *job;
EngineFlowJob *fjob;
g_return_val_if_fail (module != NULL, NULL);
g_return_val_if_fail (access_func != NULL, NULL);
fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAccess, 1);
fjob->fjob_id = ENGINE_FLOW_JOB_ACCESS;
fjob->any.tick_stamp = tick_stamp;
fjob->access.access_func = access_func;
fjob->access.data = data;
fjob->access.free_func = free_func;
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_FLOW_JOB;
job->data.flow_job.node = ENGINE_NODE (module);
job->data.flow_job.fjob = fjob;
return job;
}
GslJob*
gsl_flow_job_suspend (GslModule *module,
guint64 tick_stamp)
{
GslJob *job;
EngineFlowJob *fjob;
g_return_val_if_fail (module != NULL, NULL);
fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAny, 1);
fjob->fjob_id = ENGINE_FLOW_JOB_SUSPEND;
fjob->any.tick_stamp = tick_stamp;
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_FLOW_JOB;
job->data.flow_job.node = ENGINE_NODE (module);
job->data.flow_job.fjob = fjob;
return job;
}
GslJob*
gsl_flow_job_resume (GslModule *module,
guint64 tick_stamp)
{
GslJob *job;
EngineFlowJob *fjob;
g_return_val_if_fail (module != NULL, NULL);
fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAny, 1);
fjob->fjob_id = ENGINE_FLOW_JOB_RESUME;
fjob->any.tick_stamp = tick_stamp;
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_FLOW_JOB;
job->data.flow_job.node = ENGINE_NODE (module);
job->data.flow_job.fjob = fjob;
return job;
}
/**
* GslPollFunc
* @data: Data of poll function
* @n_values: Minimum number of values the engine wants to process
* @timeout_p: Location of timeout value
* @n_fds: Number of file descriptors used for polling
* @fds: File descriptors to be used for polling
* @revents_filled: Indicates whether @fds actually have their ->revents field filled with valid data.
* @Returns: A boolean value indicating whether the engine should process data right now
*
* The GslPollFunc is a user supplied callback function which can be hooked into the
* GSL engine. The engine uses the poll functions to determine whether processing of
* @n_values in its module network is necessary.
* In order for the poll functions to react to extern events, such as device driver
* status changes, the engine will poll(2) the @fds of the poll function and invoke
* the callback with @revents_filled==%TRUE if any of its @fds changed state.
* The callback may also be invoked at other random times with @revents_filled=%FALSE.
* It is supposed to return %TRUE if network processing is currently necessary, and
* %FALSE if not.
* If %FALSE is returned, @timeout_p may be filled with the number of milliseconds
* the engine should use for polling at maximum.
*/
/**
* gsl_job_add_poll
* @poll_func: Poll function to add
* @data: Data of poll function
* @free_func: Function to free @data
* @n_fds: Number of poll file descriptors
* @fds: File descriptors to select(2) or poll(2) on
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which adds a poll function
* to the engine. The poll function is used by the engine to
* determine whether processing is currently necessary.
*/
GslJob*
gsl_job_add_poll (GslPollFunc poll_func,
gpointer data,
GslFreeFunc free_func,
guint n_fds,
const GPollFD *fds)
{
GslJob *job;
g_return_val_if_fail (poll_func != NULL, NULL);
if (n_fds)
g_return_val_if_fail (fds != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_ADD_POLL;
job->data.poll.poll_func = poll_func;
job->data.poll.data = data;
job->data.poll.n_fds = n_fds;
job->data.poll.fds = g_memdup (fds, sizeof (fds[0]) * n_fds);
return job;
}
/**
* gsl_job_remove_poll
* @poll_func: Poll function to remove
* @data: Data of poll function
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which removes a previously inserted poll
* function from the engine.
*/
GslJob*
gsl_job_remove_poll (GslPollFunc poll_func,
gpointer data)
{
GslJob *job;
g_return_val_if_fail (poll_func != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_REMOVE_POLL;
job->data.poll.poll_func = poll_func;
job->data.poll.data = data;
job->data.poll.free_func = NULL;
job->data.poll.fds = NULL;
return job;
}
/**
* gsl_job_debug
* @debug: Debug message
* @Returns: New job suitable for gsl_trans_add()
*
* Create a new transaction job which issues @debug message when
* the job is executed. This function is meant for debugging purposes
* during development phase only and shouldn't be used in production code.
*/
GslJob*
gsl_job_debug (const gchar *debug)
{
GslJob *job;
g_return_val_if_fail (debug != NULL, NULL);
job = gsl_new_struct0 (GslJob, 1);
job->job_id = ENGINE_JOB_DEBUG;
job->data.debug = g_strdup (debug);
return job;
}
/**
* gsl_trans_open
* @Returns: Newly opened empty transaction
*
* Open up a new transaction to commit jobs to the GSL engine.
* This function may cause garbage collection (see
* gsl_engine_garbage_collect()).
*/
GslTrans*
gsl_trans_open (void)
{
GslTrans *trans;
gsl_engine_garbage_collect ();
trans = gsl_new_struct0 (GslTrans, 1);
trans->jobs_head = NULL;
trans->jobs_tail = NULL;
trans->comitted = FALSE;
trans->cqt_next = NULL;
return trans;
}
/**
* gsl_trans_add
* @trans: Opened transaction
* @job: Job to add
*
* Append a job to an opened transaction.
*/
void
gsl_trans_add (GslTrans *trans,
GslJob *job)
{
g_return_if_fail (trans != NULL);
g_return_if_fail (trans->comitted == FALSE);
g_return_if_fail (job != NULL);
g_return_if_fail (job->next == NULL);
if (trans->jobs_tail)
trans->jobs_tail->next = job;
else
trans->jobs_head = job;
trans->jobs_tail = job;
}
/**
* gsl_trans_commit
* @trans: Opened transaction
*
* Close the transaction and commit it to the engine. The engine
* will execute the jobs contained in this transaction as soon as
* it has completed its current processing cycle. The jobs will be
* executed in the exact order they were added to the transaction.
*/
void
gsl_trans_commit (GslTrans *trans)
{
g_return_if_fail (trans != NULL);
g_return_if_fail (trans->comitted == FALSE);
g_return_if_fail (trans->cqt_next == NULL);
if (trans->jobs_head)
{
trans->comitted = TRUE;
_engine_enqueue_trans (trans);
wakeup_master ();
}
else
gsl_trans_dismiss (trans);
}
/**
* gsl_trans_dismiss
* @trans: Opened transaction
*
* Close and discard the transaction, destroy all jobs currently
* contained in it and do not execute them.
* This function may cause garbage collection (see
* gsl_engine_garbage_collect()).
*/
void
gsl_trans_dismiss (GslTrans *trans)
{
g_return_if_fail (trans != NULL);
g_return_if_fail (trans->comitted == FALSE);
g_return_if_fail (trans->cqt_next == NULL);
_engine_free_trans (trans);
gsl_engine_garbage_collect ();
}
/**
* gsl_transact
* @job: First job
* @...: %NULL terminated job list
*
* Convenience function which openes up a new transaction,
* collects the %NULL terminated job list passed to the function,
* and commits the transaction.
*/
void
gsl_transact (GslJob *job,
...)
{
GslTrans *trans = gsl_trans_open ();
va_list var_args;
va_start (var_args, job);
while (job)
{
gsl_trans_add (trans, job);
job = va_arg (var_args, GslJob*);
}
va_end (var_args);
gsl_trans_commit (trans);
}
/* --- initialization --- */
static void
slave (gpointer data)
{
gboolean run = TRUE;
while (run)
{
GslTrans *trans = gsl_trans_open ();
gchar *str = g_strdup_printf ("SLAVE(%p): idle", g_thread_self ());
gsl_trans_add (trans, gsl_job_debug (str));
g_free (str);
gsl_trans_add (trans, gsl_job_debug ("string2"));
gsl_trans_commit (trans);
trans = gsl_trans_open ();
gsl_trans_add (trans, gsl_job_debug ("trans2"));
gsl_trans_commit (trans);
g_usleep (1000*500);
}
}
/* --- setup & trigger --- */
static gboolean gsl_engine_initialized = FALSE;
static gboolean gsl_engine_threaded = FALSE;
static GslThread *master_thread = NULL;
guint gsl_externvar_bsize = 0;
guint gsl_externvar_sample_freq = 0;
guint gsl_externvar_sub_sample_tqmask = 0;
guint gsl_externvar_sub_sample_steps = 0;
/**
* gsl_engine_init
* @block_size: number of values to process block wise
*
* Initialize the GSL engine, this function must be called prior to
* any other engine related function and can only be invoked once.
* The @block_size determines the amount by which the global tick
* stamp (see gsl_tick_stamp()) is updated every time the whole
* module network completed processing @block_size values.
*/
void
gsl_engine_init (gboolean run_threaded,
guint block_size,
guint sample_freq,
guint sub_sample_tqmask)
{
g_return_if_fail (gsl_engine_initialized == FALSE);
g_return_if_fail (block_size > 0 && block_size <= GSL_STREAM_MAX_VALUES);
g_return_if_fail (sample_freq > 0);
g_return_if_fail (sub_sample_tqmask < block_size);
g_return_if_fail ((sub_sample_tqmask & (sub_sample_tqmask + 1)) == 0); /* power of 2 */
gsl_engine_initialized = TRUE;
gsl_engine_threaded = run_threaded;
gsl_externvar_bsize = block_size;
gsl_externvar_sample_freq = sample_freq;
gsl_externvar_sub_sample_tqmask = sub_sample_tqmask << 2; /* shift out sizeof (float) tqalignment */
gsl_externvar_sub_sample_steps = sub_sample_tqmask + 1;
_gsl_tick_stamp_set_leap (block_size);
ENG_DEBUG ("initialization: threaded=%s", gsl_engine_threaded ? "TRUE" : "FALSE");
if (gsl_engine_threaded)
{
if (!g_thread_supported ()) g_thread_init (NULL);
master_thread = gsl_thread_new (_engine_master_thread, NULL);
if (0)
gsl_thread_new (slave, NULL);
}
}
static void
wakeup_master (void)
{
if (master_thread)
gsl_thread_wakeup (master_thread);
}
gboolean
gsl_engine_prepare (GslEngineLoop *loop)
{
g_return_val_if_fail (loop != NULL, FALSE);
g_return_val_if_fail (gsl_engine_initialized == TRUE, FALSE);
if (!gsl_engine_threaded)
return _engine_master_prepare (loop);
else
{
loop->timeout = -1;
loop->fds_changed = FALSE;
loop->n_fds = 0;
loop->revents_filled = FALSE;
return FALSE;
}
}
gboolean
gsl_engine_check (const GslEngineLoop *loop)
{
g_return_val_if_fail (loop != NULL, FALSE);
if (loop->n_fds)
g_return_val_if_fail (loop->revents_filled == TRUE, FALSE);
if (!gsl_engine_threaded)
return _engine_master_check (loop);
else
return FALSE;
}
void
gsl_engine_dispatch (void)
{
g_return_if_fail (gsl_engine_initialized == TRUE);
if (!gsl_engine_threaded)
_engine_master_dispatch ();
}
/**
* gsl_engine_wait_on_trans
*
* Wait until all pending transactions have been processed
* by the GSL Engine.
* This function may cause garbage collection (see
* gsl_engine_garbage_collect()).
*/
void
gsl_engine_wait_on_trans (void)
{
g_return_if_fail (gsl_engine_initialized == TRUE);
/* non-threaded */
if (!gsl_engine_threaded)
_engine_master_dispatch_jobs ();
/* threaded */
_engine_wait_on_trans ();
/* call all free() functions */
gsl_engine_garbage_collect ();
}
/* vim:set ts=8 sts=2 sw=2: */