/* GSL Engine - Flow module operation engine * Copyright (C) 2001 Tim Janik * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include "gslengine.h" #include "gslcommon.h" #include "gslopnode.h" #include "gslopmaster.h" /* --- prototypes --- */ static void wakeup_master (void); /* --- UserThread --- */ GslModule* gsl_module_new (const GslClass *klass, gpointer user_data) { EngineNode *node; guint i; g_return_val_if_fail (klass != NULL, NULL); g_return_val_if_fail (klass->process != NULL || klass->process_defer != NULL, NULL); if (klass->process_defer) { g_warning ("%s: Delay cycle processing not yet implemented", G_STRLOC); return NULL; } node = gsl_new_struct0 (EngineNode, 1); /* setup GslModule */ node->module.klass = klass; node->module.user_data = user_data; node->module.istreams = klass->n_istreams ? gsl_new_struct0 (GslIStream, ENGINE_NODE_N_ISTREAMS (node)) : NULL; node->module.jstreams = klass->n_jstreams ? gsl_new_struct0 (GslJStream, ENGINE_NODE_N_JSTREAMS (node)) : NULL; node->module.ostreams = _engine_alloc_ostreams (ENGINE_NODE_N_OSTREAMS (node)); /* setup EngineNode */ node->inputs = ENGINE_NODE_N_ISTREAMS (node) ? gsl_new_struct0 (EngineInput, ENGINE_NODE_N_ISTREAMS (node)) : NULL; node->jinputs = ENGINE_NODE_N_JSTREAMS (node) ? gsl_new_struct0 (EngineJInput*, ENGINE_NODE_N_JSTREAMS (node)) : NULL; node->outputs = ENGINE_NODE_N_OSTREAMS (node) ? gsl_new_struct0 (EngineOutput, ENGINE_NODE_N_OSTREAMS (node)) : NULL; node->output_nodes = NULL; node->integrated = FALSE; gsl_rec_mutex_init (&node->rec_mutex); for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++) { node->outputs[i].buffer = node->module.ostreams[i].values; node->module.ostreams[i].sub_sample_pattern = gsl_engine_sub_sample_test (node->module.ostreams[i].values); } node->flow_jobs = NULL; node->fjob_first = NULL; node->fjob_last = NULL; return &node->module; } /** * gsl_module_tick_stamp * @module: a GSL engine module * @RETURNS: the module's tick stamp, indicating its process status * * Any thread may call this function on a valid engine module. * The module specific tick stamp is updated to gsl_tick_stamp() + * @n_values every time its GslProcessFunc() function was * called. See also gsl_tick_stamp(). */ guint64 gsl_module_tick_stamp (GslModule *module) { g_return_val_if_fail (module != NULL, 0); return ENGINE_NODE (module)->counter; } /** * gsl_job_integrate * @module: The module to integrate * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job to integrate @module into the engine. */ GslJob* gsl_job_integrate (GslModule *module) { GslJob *job; g_return_val_if_fail (module != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_INTEGRATE; job->data.node = ENGINE_NODE (module); return job; } /** * gsl_job_discard * @module: The module to discard * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which removes @module from the * engine and destroys it. */ GslJob* gsl_job_discard (GslModule *module) { GslJob *job; g_return_val_if_fail (module != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_DISCARD; job->data.node = ENGINE_NODE (module); return job; } /** * gsl_job_connect * @src_module: Module with output stream * @src_ostream: Index of output stream of @src_module * @dest_module: Module with unconnected input stream * @dest_istream: Index of input stream of @dest_module * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which connects the output stream @src_ostream * of module @src_module to the input stream @dest_istream of module @dest_module * (it is an error if the input stream is already connected by the time the job * is executed). */ GslJob* gsl_job_connect (GslModule *src_module, guint src_ostream, GslModule *dest_module, guint dest_istream) { GslJob *job; g_return_val_if_fail (src_module != NULL, NULL); g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL); g_return_val_if_fail (dest_module != NULL, NULL); g_return_val_if_fail (dest_istream < dest_module->klass->n_istreams, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_ICONNECT; job->data.connection.dest_node = ENGINE_NODE (dest_module); job->data.connection.dest_ijstream = dest_istream; job->data.connection.src_node = ENGINE_NODE (src_module); job->data.connection.src_ostream = src_ostream; return job; } GslJob* gsl_job_jconnect (GslModule *src_module, guint src_ostream, GslModule *dest_module, guint dest_jstream) { GslJob *job; g_return_val_if_fail (src_module != NULL, NULL); g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL); g_return_val_if_fail (dest_module != NULL, NULL); g_return_val_if_fail (dest_jstream < dest_module->klass->n_jstreams, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_JCONNECT; job->data.connection.dest_node = ENGINE_NODE (dest_module); job->data.connection.dest_ijstream = dest_jstream; job->data.connection.src_node = ENGINE_NODE (src_module); job->data.connection.src_ostream = src_ostream; return job; } /** * gsl_job_disconnect * @dest_module: Module with connected input stream * @dest_istream: Index of input stream of @dest_module * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which causes the input stream @dest_istream * of @dest_module to be disconnected (it is an error if the input stream isn't * connected by the time the job is executed). */ GslJob* gsl_job_disconnect (GslModule *dest_module, guint dest_istream) { GslJob *job; g_return_val_if_fail (dest_module != NULL, NULL); g_return_val_if_fail (dest_istream < dest_module->klass->n_istreams, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_IDISCONNECT; job->data.connection.dest_node = ENGINE_NODE (dest_module); job->data.connection.dest_ijstream = dest_istream; job->data.connection.src_node = NULL; job->data.connection.src_ostream = ~0; return job; } GslJob* gsl_job_jdisconnect (GslModule *dest_module, guint dest_jstream, GslModule *src_module, guint src_ostream) { GslJob *job; g_return_val_if_fail (dest_module != NULL, NULL); g_return_val_if_fail (dest_jstream < dest_module->klass->n_jstreams, NULL); g_return_val_if_fail (src_module != NULL, NULL); g_return_val_if_fail (src_ostream < src_module->klass->n_ostreams, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_JDISCONNECT; job->data.connection.dest_node = ENGINE_NODE (dest_module); job->data.connection.dest_ijstream = dest_jstream; job->data.connection.src_node = ENGINE_NODE (src_module); job->data.connection.src_ostream = src_ostream; return job; } GslJob* gsl_job_set_consumer (GslModule *module, gboolean is_toplevel_consumer) { GslJob *job; g_return_val_if_fail (module != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = is_toplevel_consumer ? ENGINE_JOB_SET_CONSUMER : ENGINE_JOB_UNSET_CONSUMER; job->data.node = ENGINE_NODE (module); return job; } /** * GslAccessFunc * @module: Module to operate on * @data: Accessor data * * The GslAccessFunc is a user supplied callback function which can access * a module in times it is not processing. Accessors are usually used to * either read out a module's current state, or to modify its state. An * accessor may only operate on the @data and the @module passed * in to it. */ /** * gsl_job_access * @module: The module to access * @access_func: The accessor function * @data: Data passed in to the accessor * @free_func: Function to free @data * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which will invoke @access_func * on @module with @data when the transaction queue is processed * to modify the module's state. */ GslJob* gsl_job_access (GslModule *module, GslAccessFunc access_func, gpointer data, GslFreeFunc free_func) { GslJob *job; g_return_val_if_fail (module != NULL, NULL); g_return_val_if_fail (access_func != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_ACCESS; job->data.access.node = ENGINE_NODE (module); job->data.access.access_func = access_func; job->data.access.data = data; job->data.access.free_func = free_func; return job; } /** * gsl_flow_job_access */ GslJob* gsl_flow_job_access (GslModule *module, guint64 tick_stamp, GslAccessFunc access_func, gpointer data, GslFreeFunc free_func) { GslJob *job; EngineFlowJob *fjob; g_return_val_if_fail (module != NULL, NULL); g_return_val_if_fail (access_func != NULL, NULL); fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAccess, 1); fjob->fjob_id = ENGINE_FLOW_JOB_ACCESS; fjob->any.tick_stamp = tick_stamp; fjob->access.access_func = access_func; fjob->access.data = data; fjob->access.free_func = free_func; job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_FLOW_JOB; job->data.flow_job.node = ENGINE_NODE (module); job->data.flow_job.fjob = fjob; return job; } GslJob* gsl_flow_job_suspend (GslModule *module, guint64 tick_stamp) { GslJob *job; EngineFlowJob *fjob; g_return_val_if_fail (module != NULL, NULL); fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAny, 1); fjob->fjob_id = ENGINE_FLOW_JOB_SUSPEND; fjob->any.tick_stamp = tick_stamp; job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_FLOW_JOB; job->data.flow_job.node = ENGINE_NODE (module); job->data.flow_job.fjob = fjob; return job; } GslJob* gsl_flow_job_resume (GslModule *module, guint64 tick_stamp) { GslJob *job; EngineFlowJob *fjob; g_return_val_if_fail (module != NULL, NULL); fjob = (EngineFlowJob*) gsl_new_struct0 (EngineFlowJobAny, 1); fjob->fjob_id = ENGINE_FLOW_JOB_RESUME; fjob->any.tick_stamp = tick_stamp; job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_FLOW_JOB; job->data.flow_job.node = ENGINE_NODE (module); job->data.flow_job.fjob = fjob; return job; } /** * GslPollFunc * @data: Data of poll function * @n_values: Minimum number of values the engine wants to process * @timeout_p: Location of timeout value * @n_fds: Number of file descriptors used for polling * @fds: File descriptors to be used for polling * @revents_filled: Indicates whether @fds actually have their ->revents field filled with valid data. * @Returns: A boolean value indicating whether the engine should process data right now * * The GslPollFunc is a user supplied callback function which can be hooked into the * GSL engine. The engine uses the poll functions to determine whether processing of * @n_values in its module network is necessary. * In order for the poll functions to react to extern events, such as device driver * status changes, the engine will poll(2) the @fds of the poll function and invoke * the callback with @revents_filled==%TRUE if any of its @fds changed state. * The callback may also be invoked at other random times with @revents_filled=%FALSE. * It is supposed to return %TRUE if network processing is currently necessary, and * %FALSE if not. * If %FALSE is returned, @timeout_p may be filled with the number of milliseconds * the engine should use for polling at maximum. */ /** * gsl_job_add_poll * @poll_func: Poll function to add * @data: Data of poll function * @free_func: Function to free @data * @n_fds: Number of poll file descriptors * @fds: File descriptors to select(2) or poll(2) on * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which adds a poll function * to the engine. The poll function is used by the engine to * determine whether processing is currently necessary. */ GslJob* gsl_job_add_poll (GslPollFunc poll_func, gpointer data, GslFreeFunc free_func, guint n_fds, const GPollFD *fds) { GslJob *job; g_return_val_if_fail (poll_func != NULL, NULL); if (n_fds) g_return_val_if_fail (fds != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_ADD_POLL; job->data.poll.poll_func = poll_func; job->data.poll.data = data; job->data.poll.n_fds = n_fds; job->data.poll.fds = g_memdup (fds, sizeof (fds[0]) * n_fds); return job; } /** * gsl_job_remove_poll * @poll_func: Poll function to remove * @data: Data of poll function * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which removes a previously inserted poll * function from the engine. */ GslJob* gsl_job_remove_poll (GslPollFunc poll_func, gpointer data) { GslJob *job; g_return_val_if_fail (poll_func != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_REMOVE_POLL; job->data.poll.poll_func = poll_func; job->data.poll.data = data; job->data.poll.free_func = NULL; job->data.poll.fds = NULL; return job; } /** * gsl_job_debug * @debug: Debug message * @Returns: New job suitable for gsl_trans_add() * * Create a new transaction job which issues @debug message when * the job is executed. This function is meant for debugging purposes * during development phase only and shouldn't be used in production code. */ GslJob* gsl_job_debug (const gchar *debug) { GslJob *job; g_return_val_if_fail (debug != NULL, NULL); job = gsl_new_struct0 (GslJob, 1); job->job_id = ENGINE_JOB_DEBUG; job->data.debug = g_strdup (debug); return job; } /** * gsl_trans_open * @Returns: Newly opened empty transaction * * Open up a new transaction to commit jobs to the GSL engine. * This function may cause garbage collection (see * gsl_engine_garbage_collect()). */ GslTrans* gsl_trans_open (void) { GslTrans *trans; gsl_engine_garbage_collect (); trans = gsl_new_struct0 (GslTrans, 1); trans->jobs_head = NULL; trans->jobs_tail = NULL; trans->comitted = FALSE; trans->cqt_next = NULL; return trans; } /** * gsl_trans_add * @trans: Opened transaction * @job: Job to add * * Append a job to an opened transaction. */ void gsl_trans_add (GslTrans *trans, GslJob *job) { g_return_if_fail (trans != NULL); g_return_if_fail (trans->comitted == FALSE); g_return_if_fail (job != NULL); g_return_if_fail (job->next == NULL); if (trans->jobs_tail) trans->jobs_tail->next = job; else trans->jobs_head = job; trans->jobs_tail = job; } /** * gsl_trans_commit * @trans: Opened transaction * * Close the transaction and commit it to the engine. The engine * will execute the jobs contained in this transaction as soon as * it has completed its current processing cycle. The jobs will be * executed in the exact order they were added to the transaction. */ void gsl_trans_commit (GslTrans *trans) { g_return_if_fail (trans != NULL); g_return_if_fail (trans->comitted == FALSE); g_return_if_fail (trans->cqt_next == NULL); if (trans->jobs_head) { trans->comitted = TRUE; _engine_enqueue_trans (trans); wakeup_master (); } else gsl_trans_dismiss (trans); } /** * gsl_trans_dismiss * @trans: Opened transaction * * Close and discard the transaction, destroy all jobs currently * contained in it and do not execute them. * This function may cause garbage collection (see * gsl_engine_garbage_collect()). */ void gsl_trans_dismiss (GslTrans *trans) { g_return_if_fail (trans != NULL); g_return_if_fail (trans->comitted == FALSE); g_return_if_fail (trans->cqt_next == NULL); _engine_free_trans (trans); gsl_engine_garbage_collect (); } /** * gsl_transact * @job: First job * @...: %NULL terminated job list * * Convenience function which openes up a new transaction, * collects the %NULL terminated job list passed to the function, * and commits the transaction. */ void gsl_transact (GslJob *job, ...) { GslTrans *trans = gsl_trans_open (); va_list var_args; va_start (var_args, job); while (job) { gsl_trans_add (trans, job); job = va_arg (var_args, GslJob*); } va_end (var_args); gsl_trans_commit (trans); } /* --- initialization --- */ static void slave (gpointer data) { gboolean run = TRUE; while (run) { GslTrans *trans = gsl_trans_open (); gchar *str = g_strdup_printf ("SLAVE(%p): idle", g_thread_self ()); gsl_trans_add (trans, gsl_job_debug (str)); g_free (str); gsl_trans_add (trans, gsl_job_debug ("string2")); gsl_trans_commit (trans); trans = gsl_trans_open (); gsl_trans_add (trans, gsl_job_debug ("trans2")); gsl_trans_commit (trans); g_usleep (1000*500); } } /* --- setup & trigger --- */ static gboolean gsl_engine_initialized = FALSE; static gboolean gsl_engine_threaded = FALSE; static GslThread *master_thread = NULL; guint gsl_externvar_bsize = 0; guint gsl_externvar_sample_freq = 0; guint gsl_externvar_sub_sample_mask = 0; guint gsl_externvar_sub_sample_steps = 0; /** * gsl_engine_init * @block_size: number of values to process block wise * * Initialize the GSL engine, this function must be called prior to * any other engine related function and can only be invoked once. * The @block_size determines the amount by which the global tick * stamp (see gsl_tick_stamp()) is updated every time the whole * module network completed processing @block_size values. */ void gsl_engine_init (gboolean run_threaded, guint block_size, guint sample_freq, guint sub_sample_mask) { g_return_if_fail (gsl_engine_initialized == FALSE); g_return_if_fail (block_size > 0 && block_size <= GSL_STREAM_MAX_VALUES); g_return_if_fail (sample_freq > 0); g_return_if_fail (sub_sample_mask < block_size); g_return_if_fail ((sub_sample_mask & (sub_sample_mask + 1)) == 0); /* power of 2 */ gsl_engine_initialized = TRUE; gsl_engine_threaded = run_threaded; gsl_externvar_bsize = block_size; gsl_externvar_sample_freq = sample_freq; gsl_externvar_sub_sample_mask = sub_sample_mask << 2; /* shift out sizeof (float) tqalignment */ gsl_externvar_sub_sample_steps = sub_sample_mask + 1; _gsl_tick_stamp_set_leap (block_size); ENG_DEBUG ("initialization: threaded=%s", gsl_engine_threaded ? "TRUE" : "FALSE"); if (gsl_engine_threaded) { if (!g_thread_supported ()) g_thread_init (NULL); master_thread = gsl_thread_new (_engine_master_thread, NULL); if (0) gsl_thread_new (slave, NULL); } } static void wakeup_master (void) { if (master_thread) gsl_thread_wakeup (master_thread); } gboolean gsl_engine_prepare (GslEngineLoop *loop) { g_return_val_if_fail (loop != NULL, FALSE); g_return_val_if_fail (gsl_engine_initialized == TRUE, FALSE); if (!gsl_engine_threaded) return _engine_master_prepare (loop); else { loop->timeout = -1; loop->fds_changed = FALSE; loop->n_fds = 0; loop->revents_filled = FALSE; return FALSE; } } gboolean gsl_engine_check (const GslEngineLoop *loop) { g_return_val_if_fail (loop != NULL, FALSE); if (loop->n_fds) g_return_val_if_fail (loop->revents_filled == TRUE, FALSE); if (!gsl_engine_threaded) return _engine_master_check (loop); else return FALSE; } void gsl_engine_dispatch (void) { g_return_if_fail (gsl_engine_initialized == TRUE); if (!gsl_engine_threaded) _engine_master_dispatch (); } /** * gsl_engine_wait_on_trans * * Wait until all pending transactions have been processed * by the GSL Engine. * This function may cause garbage collection (see * gsl_engine_garbage_collect()). */ void gsl_engine_wait_on_trans (void) { g_return_if_fail (gsl_engine_initialized == TRUE); /* non-threaded */ if (!gsl_engine_threaded) _engine_master_dispatch_jobs (); /* threaded */ _engine_wait_on_trans (); /* call all free() functions */ gsl_engine_garbage_collect (); } /* vim:set ts=8 sts=2 sw=2: */