You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
arts/flow/gsl/gsloputil.c

722 lines
18 KiB

/* GSL Engine - Flow module operation engine
* Copyright (C) 2001 Tim Janik
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include "gsloputil.h"
#include "gslcommon.h"
#include "gslopnode.h"
#include "gslopschedule.h"
#include "gslsignal.h"
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <math.h>
/* --- UserThread --- */
GslOStream*
_engine_alloc_ostreams (guint n)
{
if (n)
{
guint i = sizeof (GslOStream) * n + sizeof (gfloat) * gsl_engine_block_size () * n;
GslOStream *streams = gsl_alloc_memblock0 (i);
gfloat *buffers = (gfloat*) (streams + n);
for (i = 0; i < n; i++)
{
streams[i].values = buffers;
buffers += gsl_engine_block_size ();
}
return streams;
}
else
return NULL;
}
static void
free_node (EngineNode *node)
{
guint j;
g_return_if_fail (node != NULL);
g_return_if_fail (node->output_nodes == NULL);
g_return_if_fail (node->integrated == FALSE);
g_return_if_fail (node->sched_tag == FALSE);
g_return_if_fail (node->sched_router_tag == FALSE);
if (node->module.klass->free)
node->module.klass->free (node->module.user_data, node->module.klass);
gsl_rec_mutex_destroy (&node->rec_mutex);
if (node->module.ostreams)
{
guint n = ENGINE_NODE_N_OSTREAMS (node);
guint i = sizeof (GslOStream) * n + sizeof (gfloat) * gsl_engine_block_size () * n;
gsl_free_memblock (i, node->module.ostreams);
gsl_delete_structs (EngineOutput, ENGINE_NODE_N_OSTREAMS (node), node->outputs);
}
if (node->module.istreams)
{
gsl_delete_structs (GslIStream, ENGINE_NODE_N_ISTREAMS (node), node->module.istreams);
gsl_delete_structs (EngineInput, ENGINE_NODE_N_ISTREAMS (node), node->inputs);
}
for (j = 0; j < ENGINE_NODE_N_JSTREAMS (node); j++)
{
g_free (node->jinputs[j]);
g_free (node->module.jstreams[j].values);
}
if (node->module.jstreams)
{
gsl_delete_structs (GslJStream, ENGINE_NODE_N_JSTREAMS (node), node->module.jstreams);
gsl_delete_structs (EngineJInput*, ENGINE_NODE_N_JSTREAMS (node), node->jinputs);
}
gsl_delete_struct (EngineNode, node);
}
static void
free_job (GslJob *job)
{
g_return_if_fail (job != NULL);
switch (job->job_id)
{
case ENGINE_JOB_ACCESS:
if (job->data.access.free_func)
job->data.access.free_func (job->data.access.data);
break;
case ENGINE_JOB_DEBUG:
g_free (job->data.debug);
break;
case ENGINE_JOB_ADD_POLL:
case ENGINE_JOB_REMOVE_POLL:
g_free (job->data.poll.fds);
if (job->data.poll.free_func)
job->data.poll.free_func (job->data.poll.data);
break;
case ENGINE_JOB_DISCARD:
free_node (job->data.node);
break;
default: ;
}
gsl_delete_struct (GslJob, job);
}
static void
free_flow_job (EngineFlowJob *fjob)
{
switch (fjob->fjob_id)
{
case ENGINE_FLOW_JOB_SUSPEND:
case ENGINE_FLOW_JOB_RESUME:
gsl_delete_struct (EngineFlowJobAny, &fjob->any);
break;
case ENGINE_FLOW_JOB_ACCESS:
if (fjob->access.free_func)
fjob->access.free_func (fjob->access.data);
gsl_delete_struct (EngineFlowJobAccess, &fjob->access);
break;
default:
g_assert_not_reached ();
}
}
void
_engine_free_trans (GslTrans *trans)
{
GslJob *job;
g_return_if_fail (trans != NULL);
g_return_if_fail (trans->comitted == FALSE);
if (trans->jobs_tail)
g_return_if_fail (trans->jobs_tail->next == NULL); /* paranoid */
job = trans->jobs_head;
while (job)
{
GslJob *tmp = job->next;
free_job (job);
job = tmp;
}
gsl_delete_struct (GslTrans, trans);
}
/* -- master node list --- */
static EngineNode *master_node_list_head = NULL;
static EngineNode *master_node_list_tail = NULL;
EngineNode*
_engine_mnl_head (void)
{
return master_node_list_head;
}
void
_engine_mnl_remove (EngineNode *node)
{
g_return_if_fail (node->integrated == TRUE);
node->integrated = FALSE;
/* remove */
if (node->mnl_prev)
node->mnl_prev->mnl_next = node->mnl_next;
else
master_node_list_head = node->mnl_next;
if (node->mnl_next)
node->mnl_next->mnl_prev = node->mnl_prev;
else
master_node_list_tail = node->mnl_prev;
node->mnl_prev = NULL;
node->mnl_next = NULL;
}
void
_engine_mnl_integrate (EngineNode *node)
{
g_return_if_fail (node->integrated == FALSE);
g_return_if_fail (node->flow_jobs == NULL);
node->integrated = TRUE;
/* append */
if (master_node_list_tail)
master_node_list_tail->mnl_next = node;
node->mnl_prev = master_node_list_tail;
master_node_list_tail = node;
if (!master_node_list_head)
master_node_list_head = master_node_list_tail;
g_assert (node->mnl_next == NULL);
}
void
_engine_mnl_reorder (EngineNode *node)
{
EngineNode *sibling;
g_return_if_fail (node->integrated == TRUE);
/* the master node list is partially sorted. that is, all
* nodes which are not scheduled and have pending flow_jobs
* are agglomerated at the head.
*/
sibling = node->mnl_prev ? node->mnl_prev : node->mnl_next;
if (sibling && GSL_MNL_HEAD_NODE (node) != GSL_MNL_HEAD_NODE (sibling))
{
/* remove */
if (node->mnl_prev)
node->mnl_prev->mnl_next = node->mnl_next;
else
master_node_list_head = node->mnl_next;
if (node->mnl_next)
node->mnl_next->mnl_prev = node->mnl_prev;
else
master_node_list_tail = node->mnl_prev;
if (GSL_MNL_HEAD_NODE (node)) /* move towards head */
{
/* prepend to non-NULL list */
master_node_list_head->mnl_prev = node;
node->mnl_next = master_node_list_head;
master_node_list_head = node;
node->mnl_prev = NULL;
}
else /* move towards tail */
{
/* append to non-NULL list */
master_node_list_tail->mnl_next = node;
node->mnl_prev = master_node_list_tail;
master_node_list_tail = node;
node->mnl_next = NULL;
}
}
}
/* --- const value blocks --- */
typedef struct
{
guint n_nodes;
gfloat **nodes;
guint8 *nodes_used;
} ConstValuesArray;
static const guint8 CONST_VALUES_EXPIRE = 16; /* expire value after being unused for 16 times */
static inline gfloat**
const_values_lookup_nextmost (ConstValuesArray *array,
gfloat key_value)
{
guint n_nodes = array->n_nodes;
if (n_nodes > 0)
{
gfloat **nodes = array->nodes;
gfloat **check;
nodes -= 1;
do
{
guint i;
gfloat cmp;
i = (n_nodes + 1) >> 1;
check = nodes + i;
cmp = key_value - **check;
if (cmp > GSL_SIGNAL_EPSILON)
{
n_nodes -= i;
nodes = check;
}
else if (cmp < -GSL_SIGNAL_EPSILON)
n_nodes = i - 1;
else /* cmp ~==~ 0.0 */
return check; /* matched */
}
while (n_nodes);
return check; /* nextmost */
}
return NULL;
}
static inline guint
upper_power2 (guint number)
{
return gsl_alloc_upper_power2 (MAX (number, 8));
}
static inline void
const_values_insert (ConstValuesArray *array,
guint index,
gfloat *value_block)
{
if (array->n_nodes == 0)
{
guint new_size = upper_power2 (sizeof (gfloat*));
array->nodes = g_realloc (array->nodes, new_size);
array->nodes_used = g_realloc (array->nodes_used, new_size / sizeof (gfloat*));
array->n_nodes = 1;
g_assert (index == 0);
}
else
{
guint n_nodes = array->n_nodes++;
if (*array->nodes[index] < *value_block)
index++;
if (1)
{
guint new_size = upper_power2 (array->n_nodes * sizeof (gfloat*));
guint old_size = upper_power2 (n_nodes * sizeof (gfloat*));
if (new_size != old_size)
{
array->nodes = g_realloc (array->nodes, new_size);
array->nodes_used = g_realloc (array->nodes_used, new_size / sizeof(gfloat*));
}
}
g_memmove (array->nodes + index + 1, array->nodes + index, (n_nodes - index) * sizeof (array->nodes[0]));
g_memmove (array->nodes_used + index + 1, array->nodes_used + index, (n_nodes - index) * sizeof (array->nodes_used[0]));
}
array->nodes[index] = value_block;
array->nodes_used[index] = CONST_VALUES_EXPIRE;
}
static ConstValuesArray cvalue_array = { 0, NULL, NULL };
gfloat*
gsl_engine_const_values (gfloat value)
{
extern const gfloat gsl_engine_master_zero_block[];
gfloat **block;
if (fabs (value) < GSL_SIGNAL_EPSILON)
return (gfloat*) gsl_engine_master_zero_block;
block = const_values_lookup_nextmost (&cvalue_array, value);
/* found correct match? */
if (block && fabs (**block - value) < GSL_SIGNAL_EPSILON)
{
cvalue_array.nodes_used[block - cvalue_array.nodes] = CONST_VALUES_EXPIRE;
return *block;
}
else
{
/* create new value block */
gfloat *values = g_new (gfloat, gsl_engine_block_size ());
guint i;
for (i = 0; i < gsl_engine_block_size (); i++)
values[i] = value;
if (block)
const_values_insert (&cvalue_array, block - cvalue_array.nodes, values);
else
const_values_insert (&cvalue_array, 0, values);
return values;
}
}
void
_engine_recycle_const_values (void)
{
gfloat **nodes = cvalue_array.nodes;
guint8 *used = cvalue_array.nodes_used;
guint count = cvalue_array.n_nodes, e = 0, i;
for (i = 0; i < count; i++)
{
used[i]--; /* invariant: use counts are never 0 */
if (used[i] == 0)
g_free (nodes[i]);
else /* preserve node */
{
if (e < i)
{
nodes[e] = nodes[i];
used[e] = used[i];
}
e++;
}
}
cvalue_array.n_nodes = e;
}
/* --- job transactions --- */
static GslMutex cqueue_trans = { 0, };
static GslTrans *cqueue_trans_pending_head = NULL;
static GslTrans *cqueue_trans_pending_tail = NULL;
static GslCond cqueue_trans_cond = { 0, };
static GslTrans *cqueue_trans_trash = NULL;
static GslTrans *cqueue_trans_active_head = NULL;
static GslTrans *cqueue_trans_active_tail = NULL;
static EngineFlowJob *cqueue_trash_fjobs = NULL;
static GslJob *cqueue_trans_job = NULL;
void
_engine_enqueue_trans (GslTrans *trans)
{
g_return_if_fail (trans != NULL);
g_return_if_fail (trans->comitted == TRUE);
g_return_if_fail (trans->jobs_head != NULL);
g_return_if_fail (trans->cqt_next == NULL);
GSL_SPIN_LOCK (&cqueue_trans);
if (cqueue_trans_pending_tail)
{
cqueue_trans_pending_tail->cqt_next = trans;
cqueue_trans_pending_tail->jobs_tail->next = trans->jobs_head;
}
else
cqueue_trans_pending_head = trans;
cqueue_trans_pending_tail = trans;
GSL_SPIN_UNLOCK (&cqueue_trans);
gsl_cond_signal (&cqueue_trans_cond);
}
void
_engine_wait_on_trans (void)
{
GSL_SPIN_LOCK (&cqueue_trans);
while (cqueue_trans_pending_head || cqueue_trans_active_head)
gsl_cond_wait (&cqueue_trans_cond, &cqueue_trans);
GSL_SPIN_UNLOCK (&cqueue_trans);
}
gboolean
_engine_job_pending (void)
{
gboolean pending = cqueue_trans_job != NULL;
if (!pending)
{
GSL_SPIN_LOCK (&cqueue_trans);
pending = cqueue_trans_pending_head != NULL;
GSL_SPIN_UNLOCK (&cqueue_trans);
}
return pending;
}
GslJob*
_engine_pop_job (void) /* (glong max_useconds) */
{
/* clean up if necessary and try fetching new jobs */
if (!cqueue_trans_job)
{
if (cqueue_trans_active_head)
{
GSL_SPIN_LOCK (&cqueue_trans);
/* get rid of processed transaction and
* signal UserThread which might be in
* op_com_wait_on_trans()
*/
cqueue_trans_active_tail->cqt_next = cqueue_trans_trash;
cqueue_trans_trash = cqueue_trans_active_head;
/* fetch new transaction */
cqueue_trans_active_head = cqueue_trans_pending_head;
cqueue_trans_active_tail = cqueue_trans_pending_tail;
cqueue_trans_pending_head = NULL;
cqueue_trans_pending_tail = NULL;
GSL_SPIN_UNLOCK (&cqueue_trans);
gsl_cond_signal (&cqueue_trans_cond);
}
else
{
GSL_SPIN_LOCK (&cqueue_trans);
/* fetch new transaction */
cqueue_trans_active_head = cqueue_trans_pending_head;
cqueue_trans_active_tail = cqueue_trans_pending_tail;
cqueue_trans_pending_head = NULL;
cqueue_trans_pending_tail = NULL;
GSL_SPIN_UNLOCK (&cqueue_trans);
}
cqueue_trans_job = cqueue_trans_active_head ? cqueue_trans_active_head->jobs_head : NULL;
}
/* pick new job and out of here */
if (cqueue_trans_job)
{
GslJob *job = cqueue_trans_job;
cqueue_trans_job = job->next;
return job;
}
#if 0
/* wait until jobs are present */
if (max_useconds != 0)
{
GSL_SPIN_LOCK (&cqueue_trans);
if (!cqueue_trans_pending_head)
gsl_cond_wait_timed (&cqueue_trans_cond,
&cqueue_trans,
max_useconds);
GSL_SPIN_UNLOCK (&cqueue_trans);
/* there may be jobs now, start from scratch */
return op_com_pop_job_timed (max_useconds < 0 ? -1 : 0);
}
#endif
/* time expired, no jobs... */
return NULL;
}
/* --- user thread garbage collection --- */
/**
* gsl_engine_garbage_collect
*
* GSL Engine user thread function. Collects processed jobs
* and transactions from the engine and frees them, this
* involves callback invocation of GslFreeFunc() functions,
* e.g. from gsl_job_access() or gsl_flow_job_access()
* jobs.
* This function may only be called from the user thread,
* as GslFreeFunc() functions are guranteed to be executed
* in the user thread.
*/
void
gsl_engine_garbage_collect (void)
{
GslTrans *trans;
EngineFlowJob *fjobs;
GSL_SPIN_LOCK (&cqueue_trans);
trans = cqueue_trans_trash;
cqueue_trans_trash = NULL;
fjobs = cqueue_trash_fjobs;
cqueue_trash_fjobs = NULL;
GSL_SPIN_UNLOCK (&cqueue_trans);
while (trans)
{
GslTrans *t = trans;
trans = t->cqt_next;
t->cqt_next = NULL;
t->jobs_tail->next = NULL;
t->comitted = FALSE;
_engine_free_trans (t);
}
while (fjobs)
{
EngineFlowJob *j = fjobs;
fjobs = j->any.next;
j->any.next = NULL;
free_flow_job (j);
}
}
/* --- node processing queue --- */
static GslMutex pqueue_mutex = { 0, };
static EngineSchedule *pqueue_schedule = NULL;
static guint pqueue_n_nodes = 0;
static guint pqueue_n_cycles = 0;
static GslCond pqueue_done_cond = { 0, };
static EngineFlowJob *pqueue_trash_fjobs_first = NULL;
static EngineFlowJob *pqueue_trash_fjobs_last = NULL;
void
_engine_set_schedule (EngineSchedule *sched)
{
g_return_if_fail (sched != NULL);
g_return_if_fail (sched->secured == TRUE);
GSL_SPIN_LOCK (&pqueue_mutex);
if_reject (pqueue_schedule)
{
GSL_SPIN_UNLOCK (&pqueue_mutex);
g_warning (G_STRLOC ": schedule already set");
return;
}
pqueue_schedule = sched;
sched->in_pqueue = TRUE;
GSL_SPIN_UNLOCK (&pqueue_mutex);
}
void
_engine_unset_schedule (EngineSchedule *sched)
{
EngineFlowJob *trash_fjobs_first, *trash_fjobs_last;
g_return_if_fail (sched != NULL);
GSL_SPIN_LOCK (&pqueue_mutex);
if_reject (pqueue_schedule != sched)
{
GSL_SPIN_UNLOCK (&pqueue_mutex);
g_warning (G_STRLOC ": schedule(%p) not currently set", sched);
return;
}
if_reject (pqueue_n_nodes || pqueue_n_cycles)
g_warning (G_STRLOC ": schedule(%p) still busy", sched);
sched->in_pqueue = FALSE;
pqueue_schedule = NULL;
trash_fjobs_first = pqueue_trash_fjobs_first;
trash_fjobs_last = pqueue_trash_fjobs_last;
pqueue_trash_fjobs_first = NULL;
pqueue_trash_fjobs_last = NULL;
GSL_SPIN_UNLOCK (&pqueue_mutex);
if (trash_fjobs_first) /* move trash flow jobs */
{
GSL_SPIN_LOCK (&cqueue_trans);
trash_fjobs_last->any.next = cqueue_trash_fjobs;
cqueue_trash_fjobs = trash_fjobs_first;
GSL_SPIN_UNLOCK (&cqueue_trans);
}
}
EngineNode*
_engine_pop_unprocessed_node (void)
{
EngineNode *node;
GSL_SPIN_LOCK (&pqueue_mutex);
node = pqueue_schedule ? _engine_schedule_pop_node (pqueue_schedule) : NULL;
if (node)
pqueue_n_nodes += 1;
GSL_SPIN_UNLOCK (&pqueue_mutex);
if (node)
ENGINE_NODE_LOCK (node);
return node;
}
void
_engine_push_processed_node (EngineNode *node)
{
g_return_if_fail (node != NULL);
g_return_if_fail (pqueue_n_nodes > 0);
g_return_if_fail (ENGINE_NODE_IS_SCHEDULED (node));
GSL_SPIN_LOCK (&pqueue_mutex);
g_assert (pqueue_n_nodes > 0); /* paranoid */
if (node->fjob_first) /* collect trash flow jobs */
{
node->fjob_last->any.next = pqueue_trash_fjobs_first;
pqueue_trash_fjobs_first = node->fjob_first;
if (!pqueue_trash_fjobs_last)
pqueue_trash_fjobs_last = node->fjob_last;
node->fjob_first = NULL;
node->fjob_last = NULL;
}
pqueue_n_nodes -= 1;
ENGINE_NODE_UNLOCK (node);
if (!pqueue_n_nodes && !pqueue_n_cycles && GSL_SCHEDULE_NONPOPABLE (pqueue_schedule))
gsl_cond_signal (&pqueue_done_cond);
GSL_SPIN_UNLOCK (&pqueue_mutex);
}
GslRing*
_engine_pop_unprocessed_cycle (void)
{
return NULL;
}
void
_engine_push_processed_cycle (GslRing *cycle)
{
g_return_if_fail (cycle != NULL);
g_return_if_fail (pqueue_n_cycles > 0);
g_return_if_fail (ENGINE_NODE_IS_SCHEDULED (cycle->data));
}
void
_engine_wait_on_unprocessed (void)
{
GSL_SPIN_LOCK (&pqueue_mutex);
while (pqueue_n_nodes || pqueue_n_cycles || !GSL_SCHEDULE_NONPOPABLE (pqueue_schedule))
gsl_cond_wait (&pqueue_done_cond, &pqueue_mutex);
GSL_SPIN_UNLOCK (&pqueue_mutex);
}
/* --- initialization --- */
void
_gsl_init_engine_utils (void)
{
static gboolean initialized = FALSE;
g_assert (initialized == FALSE); /* single invocation */
initialized++;
gsl_mutex_init (&cqueue_trans);
gsl_cond_init (&cqueue_trans_cond);
gsl_mutex_init (&pqueue_mutex);
gsl_cond_init (&pqueue_done_cond);
}
/* vim:set ts=8 sts=2 sw=2: */