You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
arts/flow/gsl/gslopmaster.c

783 lines
25 KiB

/* GSL Engine - Flow module operation engine
* Copyright (C) 2001 Tim Janik
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <string.h>
#include <poll.h>
#include <sys/time.h>
#include <errno.h>
#include "gslopmaster.h"
#include "gslcommon.h"
#include "gslopnode.h"
#include "gsloputil.h"
#include "gslopschedule.h"
#include "gslieee754.h"
#define NODE_FLAG_RECONNECT(node) G_STMT_START { (node)->reconnected = (node)->module.klass->reconnect != NULL; } G_STMT_END
/* --- time stamping (debugging) --- */
#define ToyprofStamp struct timeval
#define toyprof_clock_name() ("Glibc gettimeofday(2)")
#define toyprof_stampinit() /* nothing */
#define toyprof_stamp(st) gettimeofday (&(st), 0)
#define toyprof_stamp_ticks() (1000000)
static inline guint64
toyprof_elapsed (ToyprofStamp fstamp,
ToyprofStamp lstamp)
{
guint64 first = fstamp.tv_sec * toyprof_stamp_ticks () + fstamp.tv_usec;
guint64 last = lstamp.tv_sec * toyprof_stamp_ticks () + lstamp.tv_usec;
return last - first;
}
/* --- typedefs & structures --- */
typedef struct _Poll Poll;
struct _Poll
{
Poll *next;
GslPollFunc poll_func;
gpointer data;
guint n_fds;
GPollFD *fds;
GslFreeFunc free_func;
};
/* --- prototypes --- */
static void master_schedule_discard (void);
/* --- variables --- */
static gboolean master_need_reflow = FALSE;
static gboolean master_need_process = FALSE;
static EngineNode *master_consumer_list = NULL;
const gfloat gsl_engine_master_zero_block[GSL_STREAM_MAX_VALUES] = { 0, }; /* FIXME */
static Poll *master_poll_list = NULL;
static guint master_n_pollfds = 0;
static guint master_pollfds_changed = FALSE;
static GPollFD master_pollfds[GSL_ENGINE_MAX_POLLFDS];
static EngineSchedule *master_schedule = NULL;
/* --- functions --- */
static void
add_consumer (EngineNode *node)
{
g_return_if_fail (ENGINE_NODE_IS_CONSUMER (node) && node->toplevel_next == NULL && node->integrated);
node->toplevel_next = master_consumer_list;
master_consumer_list = node;
}
static void
remove_consumer (EngineNode *node)
{
EngineNode *tmp, *last = NULL;
g_return_if_fail (!ENGINE_NODE_IS_CONSUMER (node) || !node->integrated);
for (tmp = master_consumer_list; tmp; last = tmp, tmp = last->toplevel_next)
if (tmp == node)
break;
g_return_if_fail (tmp != NULL);
if (last)
last->toplevel_next = node->toplevel_next;
else
master_consumer_list = node->toplevel_next;
node->toplevel_next = NULL;
}
static void
master_idisconnect_node (EngineNode *node,
guint istream)
{
EngineNode *src_node = node->inputs[istream].src_node;
guint ostream = node->inputs[istream].src_stream;
gboolean was_consumer;
g_assert (ostream < ENGINE_NODE_N_OSTREAMS (src_node) &&
src_node->outputs[ostream].n_outputs > 0); /* these checks better pass */
node->inputs[istream].src_node = NULL;
node->inputs[istream].src_stream = ~0;
node->module.istreams[istream].connected = FALSE;
was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
src_node->outputs[ostream].n_outputs -= 1;
src_node->module.ostreams[ostream].connected = src_node->outputs[ostream].n_outputs > 0;
src_node->output_nodes = gsl_ring_remove (src_node->output_nodes, node);
NODE_FLAG_RECONNECT (node);
NODE_FLAG_RECONNECT (src_node);
/* add to consumer list */
if (!was_consumer && ENGINE_NODE_IS_CONSUMER (src_node))
add_consumer (src_node);
}
static void
master_jdisconnect_node (EngineNode *node,
guint jstream,
guint con)
{
EngineNode *src_node = node->jinputs[jstream][con].src_node;
guint i, ostream = node->jinputs[jstream][con].src_stream;
gboolean was_consumer;
g_assert (ostream < ENGINE_NODE_N_OSTREAMS (src_node) &&
node->module.jstreams[jstream].n_connections > 0 &&
src_node->outputs[ostream].n_outputs > 0); /* these checks better pass */
i = --node->module.jstreams[jstream].n_connections;
node->jinputs[jstream][con] = node->jinputs[jstream][i];
node->module.jstreams[jstream].values[i] = NULL; /* float**values 0-termination */
was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
src_node->outputs[ostream].n_outputs -= 1;
src_node->module.ostreams[ostream].connected = src_node->outputs[ostream].n_outputs > 0;
src_node->output_nodes = gsl_ring_remove (src_node->output_nodes, node);
NODE_FLAG_RECONNECT (node);
NODE_FLAG_RECONNECT (src_node);
/* add to consumer list */
if (!was_consumer && ENGINE_NODE_IS_CONSUMER (src_node))
add_consumer (src_node);
}
static void
master_disconnect_node_outputs (EngineNode *src_node,
EngineNode *dest_node)
{
gint i, j;
for (i = 0; i < ENGINE_NODE_N_ISTREAMS (dest_node); i++)
if (dest_node->inputs[i].src_node == src_node)
master_idisconnect_node (dest_node, i);
for (j = 0; j < ENGINE_NODE_N_JSTREAMS (dest_node); j++)
for (i = 0; i < dest_node->module.jstreams[j].n_connections; i++)
if (dest_node->jinputs[j][i].src_node == src_node)
master_jdisconnect_node (dest_node, j, i--);
}
static void
master_process_job (GslJob *job)
{
switch (job->job_id)
{
EngineNode *node, *src_node;
Poll *poll, *poll_last;
guint istream, jstream, ostream, con;
EngineFlowJob *fjob;
gboolean was_consumer;
case ENGINE_JOB_INTEGRATE:
node = job->data.node;
JOB_DEBUG ("integrate(%p)", node);
g_return_if_fail (node->integrated == FALSE);
g_return_if_fail (node->sched_tag == FALSE);
_engine_mnl_integrate (node);
if (ENGINE_NODE_IS_CONSUMER (node))
add_consumer (node);
node->counter = 0;
NODE_FLAG_RECONNECT (node);
master_need_reflow |= TRUE;
break;
case ENGINE_JOB_DISCARD:
/* FIXME: free pending flow jobs */
node = job->data.node;
JOB_DEBUG ("discard(%p)", node);
g_return_if_fail (node->integrated == TRUE);
/* disconnect inputs */
for (istream = 0; istream < ENGINE_NODE_N_ISTREAMS (node); istream++)
if (node->inputs[istream].src_node)
master_idisconnect_node (node, istream);
for (jstream = 0; jstream < ENGINE_NODE_N_JSTREAMS (node); jstream++)
while (node->module.jstreams[jstream].n_connections)
master_jdisconnect_node (node, jstream, node->module.jstreams[jstream].n_connections - 1);
/* disconnect outputs */
while (node->output_nodes)
master_disconnect_node_outputs (node, node->output_nodes->data);
/* remove from consumer list */
if (ENGINE_NODE_IS_CONSUMER (node))
{
_engine_mnl_remove (node);
remove_consumer (node);
}
else
_engine_mnl_remove (node);
node->counter = 0;
master_need_reflow |= TRUE;
master_schedule_discard (); /* discard schedule so node may be freed */
break;
case ENGINE_JOB_SET_CONSUMER:
case ENGINE_JOB_UNSET_CONSUMER:
node = job->data.node;
JOB_DEBUG ("toggle_consumer(%p)", node);
was_consumer = ENGINE_NODE_IS_CONSUMER (node);
node->is_consumer = job->job_id == ENGINE_JOB_SET_CONSUMER;
if (was_consumer != ENGINE_NODE_IS_CONSUMER (node))
{
if (ENGINE_NODE_IS_CONSUMER (node))
add_consumer (node);
else
remove_consumer (node);
master_need_reflow |= TRUE;
}
break;
case ENGINE_JOB_ICONNECT:
node = job->data.connection.dest_node;
src_node = job->data.connection.src_node;
istream = job->data.connection.dest_ijstream;
ostream = job->data.connection.src_ostream;
JOB_DEBUG ("connect(%p,%u,%p,%u)", node, istream, src_node, ostream);
g_return_if_fail (node->integrated == TRUE);
g_return_if_fail (src_node->integrated == TRUE);
g_return_if_fail (node->inputs[istream].src_node == NULL);
node->inputs[istream].src_node = src_node;
node->inputs[istream].src_stream = ostream;
node->module.istreams[istream].connected = TRUE;
/* remove from consumer list */
was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
src_node->outputs[ostream].n_outputs += 1;
src_node->module.ostreams[ostream].connected = TRUE;
src_node->output_nodes = gsl_ring_append (src_node->output_nodes, node);
NODE_FLAG_RECONNECT (node);
NODE_FLAG_RECONNECT (src_node);
src_node->counter = 0; /* FIXME: counter reset? */
if (was_consumer && !ENGINE_NODE_IS_CONSUMER (src_node))
remove_consumer (src_node);
master_need_reflow |= TRUE;
break;
case ENGINE_JOB_JCONNECT:
node = job->data.connection.dest_node;
src_node = job->data.connection.src_node;
jstream = job->data.connection.dest_ijstream;
ostream = job->data.connection.src_ostream;
JOB_DEBUG ("jconnect(%p,%u,%p,%u)", node, jstream, src_node, ostream);
g_return_if_fail (node->integrated == TRUE);
g_return_if_fail (src_node->integrated == TRUE);
con = node->module.jstreams[jstream].n_connections++;
node->jinputs[jstream] = g_renew (EngineJInput, node->jinputs[jstream], node->module.jstreams[jstream].n_connections);
node->module.jstreams[jstream].values = g_renew (const gfloat*, node->module.jstreams[jstream].values, node->module.jstreams[jstream].n_connections + 1);
node->module.jstreams[jstream].values[node->module.jstreams[jstream].n_connections] = NULL; /* float**values 0-termination */
node->jinputs[jstream][con].src_node = src_node;
node->jinputs[jstream][con].src_stream = ostream;
/* remove from consumer list */
was_consumer = ENGINE_NODE_IS_CONSUMER (src_node);
src_node->outputs[ostream].n_outputs += 1;
src_node->module.ostreams[ostream].connected = TRUE;
src_node->output_nodes = gsl_ring_append (src_node->output_nodes, node);
NODE_FLAG_RECONNECT (node);
NODE_FLAG_RECONNECT (src_node);
src_node->counter = 0; /* FIXME: counter reset? */
if (was_consumer && !ENGINE_NODE_IS_CONSUMER (src_node))
remove_consumer (src_node);
master_need_reflow |= TRUE;
break;
case ENGINE_JOB_IDISCONNECT:
node = job->data.connection.dest_node;
JOB_DEBUG ("idisconnect(%p,%u)", node, job->data.connection.dest_ijstream);
g_return_if_fail (node->integrated == TRUE);
g_return_if_fail (node->inputs[job->data.connection.dest_ijstream].src_node != NULL);
master_idisconnect_node (node, job->data.connection.dest_ijstream);
master_need_reflow |= TRUE;
break;
case ENGINE_JOB_JDISCONNECT:
node = job->data.connection.dest_node;
jstream = job->data.connection.dest_ijstream;
src_node = job->data.connection.src_node;
ostream = job->data.connection.src_ostream;
JOB_DEBUG ("jdisconnect(%p,%u,%p,%u)", node, jstream, src_node, ostream);
g_return_if_fail (node->integrated == TRUE);
g_return_if_fail (node->module.jstreams[jstream].n_connections > 0);
for (con = 0; con < node->module.jstreams[jstream].n_connections; con++)
if (node->jinputs[jstream][con].src_node == src_node &&
node->jinputs[jstream][con].src_stream == ostream)
break;
if (con < node->module.jstreams[jstream].n_connections)
{
master_jdisconnect_node (node, jstream, con);
master_need_reflow |= TRUE;
}
else
g_warning ("jdisconnect(dest:%p,%u,src:%p,%u): no such connection", node, jstream, src_node, ostream);
break;
case ENGINE_JOB_ACCESS:
node = job->data.access.node;
JOB_DEBUG ("access node(%p): %p(%p)", node, job->data.access.access_func, job->data.access.data);
g_return_if_fail (node->integrated == TRUE);
job->data.access.access_func (&node->module, job->data.access.data);
break;
case ENGINE_JOB_FLOW_JOB:
node = job->data.flow_job.node;
fjob = job->data.flow_job.fjob;
JOB_DEBUG ("add flow_job(%p,%p)", node, fjob);
g_return_if_fail (node->integrated == TRUE);
job->data.flow_job.fjob = NULL; /* ownership taken over */
_engine_node_insert_flow_job (node, fjob);
_engine_mnl_reorder (node);
break;
case ENGINE_JOB_DEBUG:
JOB_DEBUG ("debug");
g_printerr ("JOB-DEBUG: %s\n", job->data.debug);
break;
case ENGINE_JOB_ADD_POLL:
JOB_DEBUG ("add poll %p(%p,%u)", job->data.poll.poll_func, job->data.poll.data, job->data.poll.n_fds);
if (job->data.poll.n_fds + master_n_pollfds > GSL_ENGINE_MAX_POLLFDS)
g_error ("adding poll job exceeds maximum number of poll-fds (%u > %u)",
job->data.poll.n_fds + master_n_pollfds, GSL_ENGINE_MAX_POLLFDS);
poll = gsl_new_struct0 (Poll, 1);
poll->poll_func = job->data.poll.poll_func;
poll->data = job->data.poll.data;
poll->free_func = job->data.poll.free_func;
job->data.poll.free_func = NULL; /* don't free data this round */
poll->n_fds = job->data.poll.n_fds;
poll->fds = poll->n_fds ? master_pollfds + master_n_pollfds : master_pollfds;
master_n_pollfds += poll->n_fds;
if (poll->n_fds)
master_pollfds_changed = TRUE;
memcpy (poll->fds, job->data.poll.fds, sizeof (poll->fds[0]) * poll->n_fds);
poll->next = master_poll_list;
master_poll_list = poll;
break;
case ENGINE_JOB_REMOVE_POLL:
JOB_DEBUG ("remove poll %p(%p)", job->data.poll.poll_func, job->data.poll.data);
for (poll = master_poll_list, poll_last = NULL; poll; poll_last = poll, poll = poll_last->next)
if (poll->poll_func == job->data.poll.poll_func && poll->data == job->data.poll.data)
{
if (poll_last)
poll_last->next = poll->next;
else
master_poll_list = poll->next;
break;
}
if (poll)
{
job->data.poll.free_func = poll->free_func; /* free data with job */
poll_last = poll;
if (poll_last->n_fds)
{
for (poll = master_poll_list; poll; poll = poll->next)
if (poll->fds > poll_last->fds)
poll->fds -= poll_last->n_fds;
g_memmove (poll_last->fds, poll_last->fds + poll_last->n_fds,
((guint8*) (master_pollfds + master_n_pollfds)) -
((guint8*) (poll_last->fds + poll_last->n_fds)));
master_n_pollfds -= poll_last->n_fds;
master_pollfds_changed = TRUE;
}
gsl_delete_struct (Poll, poll_last);
}
else
g_warning (G_STRLOC ": failed to remove unknown poll function %p(%p)",
job->data.poll.poll_func, job->data.poll.data);
break;
default:
g_assert_not_reached ();
}
JOB_DEBUG ("done");
}
static void
master_poll_check (glong *timeout_p,
gboolean check_with_revents)
{
gboolean need_processing = FALSE;
Poll *poll;
if (master_need_process || *timeout_p == 0)
{
master_need_process = TRUE;
return;
}
for (poll = master_poll_list; poll; poll = poll->next)
{
glong timeout = -1;
if (poll->poll_func (poll->data, gsl_engine_block_size (), &timeout,
poll->n_fds, poll->n_fds ? poll->fds : NULL, check_with_revents)
|| timeout == 0)
{
need_processing |= TRUE;
*timeout_p = 0;
break;
}
else if (timeout > 0)
*timeout_p = *timeout_p < 0 ? timeout : MIN (*timeout_p, timeout);
}
master_need_process = need_processing;
}
static inline guint64
master_handle_flow_jobs (EngineNode *node,
guint64 max_tick)
{
EngineFlowJob *fjob = _engine_node_pop_flow_job (node, max_tick);
if_reject (fjob)
do
{
g_printerr ("FJob: at:%lld from:%lld \n", node->counter, fjob->any.tick_stamp);
switch (fjob->fjob_id)
{
case ENGINE_FLOW_JOB_ACCESS:
fjob->access.access_func (&node->module, fjob->access.data);
break;
default:
g_assert_not_reached (); /* FIXME */
}
fjob = _engine_node_pop_flow_job (node, max_tick);
}
while (fjob);
return _engine_node_peek_flow_job_stamp (node);
}
static void
master_process_locked_node (EngineNode *node,
guint n_values)
{
guint64 final_counter = GSL_TICK_STAMP + n_values;
while (node->counter < final_counter)
{
guint64 next_counter = master_handle_flow_jobs (node, node->counter);
guint64 new_counter = MIN (next_counter, final_counter);
guint i, j, diff = node->counter - GSL_TICK_STAMP;
for (i = 0; i < ENGINE_NODE_N_ISTREAMS (node); i++)
{
EngineNode *inode = node->inputs[i].src_node;
if (inode)
{
ENGINE_NODE_LOCK (inode);
if (inode->counter < final_counter)
master_process_locked_node (inode, final_counter - node->counter);
node->module.istreams[i].values = inode->outputs[node->inputs[i].src_stream].buffer;
node->module.istreams[i].values += diff;
ENGINE_NODE_UNLOCK (inode);
}
else
node->module.istreams[i].values = gsl_engine_master_zero_block;
}
for (j = 0; j < ENGINE_NODE_N_JSTREAMS (node); j++)
for (i = 0; i < node->module.jstreams[j].n_connections; i++)
{
EngineNode *inode = node->jinputs[j][i].src_node;
ENGINE_NODE_LOCK (inode);
if (inode->counter < final_counter)
master_process_locked_node (inode, final_counter - node->counter);
node->module.jstreams[j].values[i] = inode->outputs[node->jinputs[j][i].src_stream].buffer;
node->module.jstreams[j].values[i] += diff;
ENGINE_NODE_UNLOCK (inode);
}
for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++)
node->module.ostreams[i].values = node->outputs[i].buffer + diff;
if_reject (node->reconnected)
{
node->module.klass->reconnect (&node->module);
node->reconnected = FALSE;
}
node->module.klass->process (&node->module, new_counter - node->counter);
for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++)
{
/* FIXME: this takes the worst possible performance hit to support virtualization */
if (node->module.ostreams[i].values != node->outputs[i].buffer + diff)
memcpy (node->outputs[i].buffer + diff, node->module.ostreams[i].values,
(new_counter - node->counter) * sizeof (gfloat));
}
node->counter = new_counter;
}
}
static GslLong gsl_profile_modules = 0; /* set to 1 in gdb to get profile output */
static void
master_process_flow (void)
{
guint64 new_counter = GSL_TICK_STAMP + gsl_engine_block_size ();
GslLong profile_maxtime = 0;
GslLong profile_modules = gsl_profile_modules;
EngineNode *profile_node = NULL;
g_return_if_fail (master_need_process == TRUE);
g_assert (gsl_fpu_okround () == TRUE);
MAS_DEBUG ("process_flow");
if (master_schedule)
{
EngineNode *node;
_engine_schedule_restart (master_schedule);
_engine_set_schedule (master_schedule);
node = _engine_pop_unprocessed_node ();
while (node)
{
ToyprofStamp profile_stamp1, profile_stamp2;
if_reject (profile_modules)
toyprof_stamp (profile_stamp1);
master_process_locked_node (node, gsl_engine_block_size ());
if_reject (profile_modules)
{
GslLong duration;
toyprof_stamp (profile_stamp2);
duration = toyprof_elapsed (profile_stamp1, profile_stamp2);
if (duration > profile_maxtime)
{
profile_maxtime = duration;
profile_node = node;
}
}
_engine_push_processed_node (node);
node = _engine_pop_unprocessed_node ();
}
if_reject (profile_modules)
{
if (profile_node)
{
if (profile_maxtime > profile_modules)
g_print ("Excess Node: %p Duration: %lu usecs ((void(*)())%p) \n",
profile_node, profile_maxtime, profile_node->module.klass->process);
else
g_print ("Slowest Node: %p Duration: %lu usecs ((void(*)())%p) \r",
profile_node, profile_maxtime, profile_node->module.klass->process);
}
}
/* walk unscheduled nodes which have flow jobs */
node = _engine_mnl_head ();
while (node && GSL_MNL_HEAD_NODE (node))
{
EngineNode *tmp = node->mnl_next;
EngineFlowJob *fjob = _engine_node_pop_flow_job (node, new_counter);
if (fjob)
{
while (fjob)
{
g_printerr ("ignoring flow_job %p\n", fjob);
fjob = _engine_node_pop_flow_job (node, new_counter);
}
_engine_mnl_reorder (node);
}
node = tmp;
}
/* nothing new to process, wait on slaves */
_engine_wait_on_unprocessed ();
_engine_unset_schedule (master_schedule);
_gsl_tick_stamp_inc ();
_engine_recycle_const_values ();
}
master_need_process = FALSE;
}
static void
master_reschedule_flow (void)
{
EngineNode *node;
g_return_if_fail (master_need_reflow == TRUE);
MAS_DEBUG ("flow_reschedule");
if (!master_schedule)
master_schedule = _engine_schedule_new ();
else
{
_engine_schedule_unsecure (master_schedule);
_engine_schedule_clear (master_schedule);
}
for (node = master_consumer_list; node; node = node->toplevel_next)
_engine_schedule_consumer_node (master_schedule, node);
_engine_schedule_secure (master_schedule);
master_need_reflow = FALSE;
}
static void
master_schedule_discard (void)
{
g_return_if_fail (master_need_reflow == TRUE);
if (master_schedule)
{
_engine_schedule_unsecure (master_schedule);
_engine_schedule_destroy (master_schedule);
master_schedule = NULL;
}
}
/* --- MasterThread main loop --- */
gboolean
_engine_master_prepare (GslEngineLoop *loop)
{
gboolean need_dispatch;
guint i;
g_return_val_if_fail (loop != NULL, FALSE);
/* setup and clear pollfds here already, so master_poll_check() gets no junk (and IRIX can't handle non-0 revents) */
loop->fds_changed = master_pollfds_changed;
master_pollfds_changed = FALSE;
loop->n_fds = master_n_pollfds;
loop->fds = master_pollfds;
for (i = 0; i < loop->n_fds; i++)
loop->fds[i].revents = 0;
loop->revents_filled = FALSE;
loop->timeout = -1;
/* cached checks first */
need_dispatch = master_need_reflow || master_need_process;
/* lengthy query */
if (!need_dispatch)
need_dispatch = _engine_job_pending ();
/* invoke custom poll checks */
if (!need_dispatch)
{
master_poll_check (&loop->timeout, FALSE);
need_dispatch = master_need_process;
}
if (need_dispatch)
loop->timeout = 0;
MAS_DEBUG ("PREPARE: need_dispatch=%u timeout=%6ld n_fds=%u",
need_dispatch,
loop->timeout, loop->n_fds);
return need_dispatch;
}
gboolean
_engine_master_check (const GslEngineLoop *loop)
{
gboolean need_dispatch;
g_return_val_if_fail (loop != NULL, FALSE);
g_return_val_if_fail (loop->n_fds == master_n_pollfds, FALSE);
g_return_val_if_fail (loop->fds == master_pollfds, FALSE);
if (loop->n_fds)
g_return_val_if_fail (loop->revents_filled == TRUE, FALSE);
/* cached checks first */
need_dispatch = master_need_reflow || master_need_process;
/* lengthy query */
if (!need_dispatch)
need_dispatch = _engine_job_pending ();
/* invoke custom poll checks */
if (!need_dispatch)
{
glong dummy = -1;
master_poll_check (&dummy, TRUE);
need_dispatch = master_need_process;
}
MAS_DEBUG ("CHECK: need_dispatch=%u", need_dispatch);
return need_dispatch;
}
void
_engine_master_dispatch_jobs (void)
{
GslJob *job;
job = _engine_pop_job ();
while (job)
{
master_process_job (job);
job = _engine_pop_job (); /* have to process _all_ jobs */
}
}
void
_engine_master_dispatch (void)
{
/* processing has prime priority, but we can't process the
* network, until all jobs have been handled and if necessary
* rescheduled the network.
* that's why we have to handle everything at once and can't
* preliminarily return after just handling jobs or rescheduling.
*/
_engine_master_dispatch_jobs ();
if (master_need_reflow)
master_reschedule_flow ();
if (master_need_process)
master_process_flow ();
}
void
_engine_master_thread (gpointer data)
{
gboolean run = TRUE;
/* assert sane configuration checks, since we're simply casting structures */
g_assert (sizeof (struct pollfd) == sizeof (GPollFD) &&
G_STRUCT_OFFSET (GPollFD, fd) == G_STRUCT_OFFSET (struct pollfd, fd) &&
G_STRUCT_OFFSET (GPollFD, events) == G_STRUCT_OFFSET (struct pollfd, events) &&
G_STRUCT_OFFSET (GPollFD, revents) == G_STRUCT_OFFSET (struct pollfd, revents));
/* add the thread wakeup pipe to master pollfds, so we get woken
* up in time (even though we evaluate the pipe contents later)
*/
gsl_thread_get_pollfd (master_pollfds);
master_n_pollfds += 1;
master_pollfds_changed = TRUE;
toyprof_stampinit ();
while (run)
{
GslEngineLoop loop;
gboolean need_dispatch;
need_dispatch = _engine_master_prepare (&loop);
if (!need_dispatch)
{
gint err;
err = poll ((struct pollfd*) loop.fds, loop.n_fds, loop.timeout);
if (err >= 0)
loop.revents_filled = TRUE;
else
g_printerr (G_STRLOC ": poll() error: %s\n", g_strerror (errno));
if (loop.revents_filled)
need_dispatch = _engine_master_check (&loop);
}
if (need_dispatch)
_engine_master_dispatch ();
/* handle thread pollfd messages */
run = gsl_thread_sleep (0);
}
}