/* GSL Engine - Flow module operation engine * Copyright (C) 2001 Tim Janik * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, * Boston, MA 02110-1301, USA. */ #include #include #include #include #include "gslopmaster.h" #include "gslcommon.h" #include "gslopnode.h" #include "gsloputil.h" #include "gslopschedule.h" #include "gslieee754.h" #define NODE_FLAG_RECONNECT(node) G_STMT_START { (node)->reconnected = (node)->module.klass->reconnect != NULL; } G_STMT_END /* --- time stamping (debugging) --- */ #define ToyprofStamp struct timeval #define toyprof_clock_name() ("Glibc gettimeofday(2)") #define toyprof_stampinit() /* nothing */ #define toyprof_stamp(st) gettimeofday (&(st), 0) #define toyprof_stamp_ticks() (1000000) static inline guint64 toyprof_elapsed (ToyprofStamp fstamp, ToyprofStamp lstamp) { guint64 first = fstamp.tv_sec * toyprof_stamp_ticks () + fstamp.tv_usec; guint64 last = lstamp.tv_sec * toyprof_stamp_ticks () + lstamp.tv_usec; return last - first; } /* --- typedefs & structures --- */ typedef struct _Poll Poll; struct _Poll { Poll *next; GslPollFunc poll_func; gpointer data; guint n_fds; GPollFD *fds; GslFreeFunc free_func; }; /* --- prototypes --- */ static void master_schedule_discard (void); /* --- variables --- */ static gboolean master_need_reflow = FALSE; static gboolean master_need_process = FALSE; static EngineNode *master_consumer_list = NULL; const gfloat gsl_engine_master_zero_block[GSL_STREAM_MAX_VALUES] = { 0, }; /* FIXME */ static Poll *master_poll_list = NULL; static guint master_n_pollfds = 0; static guint master_pollfds_changed = FALSE; static GPollFD master_pollfds[GSL_ENGINE_MAX_POLLFDS]; static EngineSchedule *master_schedule = NULL; /* --- functions --- */ static void add_consumer (EngineNode *node) { g_return_if_fail (ENGINE_NODE_IS_CONSUMER (node) && node->toplevel_next == NULL && node->integrated); node->toplevel_next = master_consumer_list; master_consumer_list = node; } static void remove_consumer (EngineNode *node) { EngineNode *tmp, *last = NULL; g_return_if_fail (!ENGINE_NODE_IS_CONSUMER (node) || !node->integrated); for (tmp = master_consumer_list; tmp; last = tmp, tmp = last->toplevel_next) if (tmp == node) break; g_return_if_fail (tmp != NULL); if (last) last->toplevel_next = node->toplevel_next; else master_consumer_list = node->toplevel_next; node->toplevel_next = NULL; } static void master_idisconnect_node (EngineNode *node, guint istream) { EngineNode *src_node = node->inputs[istream].src_node; guint ostream = node->inputs[istream].src_stream; gboolean was_consumer; g_assert (ostream < ENGINE_NODE_N_OSTREAMS (src_node) && src_node->outputs[ostream].n_outputs > 0); /* these checks better pass */ node->inputs[istream].src_node = NULL; node->inputs[istream].src_stream = ~0; node->module.istreams[istream].connected = FALSE; was_consumer = ENGINE_NODE_IS_CONSUMER (src_node); src_node->outputs[ostream].n_outputs -= 1; src_node->module.ostreams[ostream].connected = src_node->outputs[ostream].n_outputs > 0; src_node->output_nodes = gsl_ring_remove (src_node->output_nodes, node); NODE_FLAG_RECONNECT (node); NODE_FLAG_RECONNECT (src_node); /* add to consumer list */ if (!was_consumer && ENGINE_NODE_IS_CONSUMER (src_node)) add_consumer (src_node); } static void master_jdisconnect_node (EngineNode *node, guint jstream, guint con) { EngineNode *src_node = node->jinputs[jstream][con].src_node; guint i, ostream = node->jinputs[jstream][con].src_stream; gboolean was_consumer; g_assert (ostream < ENGINE_NODE_N_OSTREAMS (src_node) && node->module.jstreams[jstream].n_connections > 0 && src_node->outputs[ostream].n_outputs > 0); /* these checks better pass */ i = --node->module.jstreams[jstream].n_connections; node->jinputs[jstream][con] = node->jinputs[jstream][i]; node->module.jstreams[jstream].values[i] = NULL; /* float**values 0-termination */ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node); src_node->outputs[ostream].n_outputs -= 1; src_node->module.ostreams[ostream].connected = src_node->outputs[ostream].n_outputs > 0; src_node->output_nodes = gsl_ring_remove (src_node->output_nodes, node); NODE_FLAG_RECONNECT (node); NODE_FLAG_RECONNECT (src_node); /* add to consumer list */ if (!was_consumer && ENGINE_NODE_IS_CONSUMER (src_node)) add_consumer (src_node); } static void master_disconnect_node_outputs (EngineNode *src_node, EngineNode *dest_node) { gint i, j; for (i = 0; i < ENGINE_NODE_N_ISTREAMS (dest_node); i++) if (dest_node->inputs[i].src_node == src_node) master_idisconnect_node (dest_node, i); for (j = 0; j < ENGINE_NODE_N_JSTREAMS (dest_node); j++) for (i = 0; i < dest_node->module.jstreams[j].n_connections; i++) if (dest_node->jinputs[j][i].src_node == src_node) master_jdisconnect_node (dest_node, j, i--); } static void master_process_job (GslJob *job) { switch (job->job_id) { EngineNode *node, *src_node; Poll *poll, *poll_last; guint istream, jstream, ostream, con; EngineFlowJob *fjob; gboolean was_consumer; case ENGINE_JOB_INTEGRATE: node = job->data.node; JOB_DEBUG ("integrate(%p)", node); g_return_if_fail (node->integrated == FALSE); g_return_if_fail (node->sched_tag == FALSE); _engine_mnl_integrate (node); if (ENGINE_NODE_IS_CONSUMER (node)) add_consumer (node); node->counter = 0; NODE_FLAG_RECONNECT (node); master_need_reflow |= TRUE; break; case ENGINE_JOB_DISCARD: /* FIXME: free pending flow jobs */ node = job->data.node; JOB_DEBUG ("discard(%p)", node); g_return_if_fail (node->integrated == TRUE); /* disconnect inputs */ for (istream = 0; istream < ENGINE_NODE_N_ISTREAMS (node); istream++) if (node->inputs[istream].src_node) master_idisconnect_node (node, istream); for (jstream = 0; jstream < ENGINE_NODE_N_JSTREAMS (node); jstream++) while (node->module.jstreams[jstream].n_connections) master_jdisconnect_node (node, jstream, node->module.jstreams[jstream].n_connections - 1); /* disconnect outputs */ while (node->output_nodes) master_disconnect_node_outputs (node, node->output_nodes->data); /* remove from consumer list */ if (ENGINE_NODE_IS_CONSUMER (node)) { _engine_mnl_remove (node); remove_consumer (node); } else _engine_mnl_remove (node); node->counter = 0; master_need_reflow |= TRUE; master_schedule_discard (); /* discard schedule so node may be freed */ break; case ENGINE_JOB_SET_CONSUMER: case ENGINE_JOB_UNSET_CONSUMER: node = job->data.node; JOB_DEBUG ("toggle_consumer(%p)", node); was_consumer = ENGINE_NODE_IS_CONSUMER (node); node->is_consumer = job->job_id == ENGINE_JOB_SET_CONSUMER; if (was_consumer != ENGINE_NODE_IS_CONSUMER (node)) { if (ENGINE_NODE_IS_CONSUMER (node)) add_consumer (node); else remove_consumer (node); master_need_reflow |= TRUE; } break; case ENGINE_JOB_ICONNECT: node = job->data.connection.dest_node; src_node = job->data.connection.src_node; istream = job->data.connection.dest_ijstream; ostream = job->data.connection.src_ostream; JOB_DEBUG ("connect(%p,%u,%p,%u)", node, istream, src_node, ostream); g_return_if_fail (node->integrated == TRUE); g_return_if_fail (src_node->integrated == TRUE); g_return_if_fail (node->inputs[istream].src_node == NULL); node->inputs[istream].src_node = src_node; node->inputs[istream].src_stream = ostream; node->module.istreams[istream].connected = TRUE; /* remove from consumer list */ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node); src_node->outputs[ostream].n_outputs += 1; src_node->module.ostreams[ostream].connected = TRUE; src_node->output_nodes = gsl_ring_append (src_node->output_nodes, node); NODE_FLAG_RECONNECT (node); NODE_FLAG_RECONNECT (src_node); src_node->counter = 0; /* FIXME: counter reset? */ if (was_consumer && !ENGINE_NODE_IS_CONSUMER (src_node)) remove_consumer (src_node); master_need_reflow |= TRUE; break; case ENGINE_JOB_JCONNECT: node = job->data.connection.dest_node; src_node = job->data.connection.src_node; jstream = job->data.connection.dest_ijstream; ostream = job->data.connection.src_ostream; JOB_DEBUG ("jconnect(%p,%u,%p,%u)", node, jstream, src_node, ostream); g_return_if_fail (node->integrated == TRUE); g_return_if_fail (src_node->integrated == TRUE); con = node->module.jstreams[jstream].n_connections++; node->jinputs[jstream] = g_renew (EngineJInput, node->jinputs[jstream], node->module.jstreams[jstream].n_connections); node->module.jstreams[jstream].values = g_renew (const gfloat*, node->module.jstreams[jstream].values, node->module.jstreams[jstream].n_connections + 1); node->module.jstreams[jstream].values[node->module.jstreams[jstream].n_connections] = NULL; /* float**values 0-termination */ node->jinputs[jstream][con].src_node = src_node; node->jinputs[jstream][con].src_stream = ostream; /* remove from consumer list */ was_consumer = ENGINE_NODE_IS_CONSUMER (src_node); src_node->outputs[ostream].n_outputs += 1; src_node->module.ostreams[ostream].connected = TRUE; src_node->output_nodes = gsl_ring_append (src_node->output_nodes, node); NODE_FLAG_RECONNECT (node); NODE_FLAG_RECONNECT (src_node); src_node->counter = 0; /* FIXME: counter reset? */ if (was_consumer && !ENGINE_NODE_IS_CONSUMER (src_node)) remove_consumer (src_node); master_need_reflow |= TRUE; break; case ENGINE_JOB_IDISCONNECT: node = job->data.connection.dest_node; JOB_DEBUG ("idisconnect(%p,%u)", node, job->data.connection.dest_ijstream); g_return_if_fail (node->integrated == TRUE); g_return_if_fail (node->inputs[job->data.connection.dest_ijstream].src_node != NULL); master_idisconnect_node (node, job->data.connection.dest_ijstream); master_need_reflow |= TRUE; break; case ENGINE_JOB_JDISCONNECT: node = job->data.connection.dest_node; jstream = job->data.connection.dest_ijstream; src_node = job->data.connection.src_node; ostream = job->data.connection.src_ostream; JOB_DEBUG ("jdisconnect(%p,%u,%p,%u)", node, jstream, src_node, ostream); g_return_if_fail (node->integrated == TRUE); g_return_if_fail (node->module.jstreams[jstream].n_connections > 0); for (con = 0; con < node->module.jstreams[jstream].n_connections; con++) if (node->jinputs[jstream][con].src_node == src_node && node->jinputs[jstream][con].src_stream == ostream) break; if (con < node->module.jstreams[jstream].n_connections) { master_jdisconnect_node (node, jstream, con); master_need_reflow |= TRUE; } else g_warning ("jdisconnect(dest:%p,%u,src:%p,%u): no such connection", node, jstream, src_node, ostream); break; case ENGINE_JOB_ACCESS: node = job->data.access.node; JOB_DEBUG ("access node(%p): %p(%p)", node, job->data.access.access_func, job->data.access.data); g_return_if_fail (node->integrated == TRUE); job->data.access.access_func (&node->module, job->data.access.data); break; case ENGINE_JOB_FLOW_JOB: node = job->data.flow_job.node; fjob = job->data.flow_job.fjob; JOB_DEBUG ("add flow_job(%p,%p)", node, fjob); g_return_if_fail (node->integrated == TRUE); job->data.flow_job.fjob = NULL; /* ownership taken over */ _engine_node_insert_flow_job (node, fjob); _engine_mnl_reorder (node); break; case ENGINE_JOB_DEBUG: JOB_DEBUG ("debug"); g_printerr ("JOB-DEBUG: %s\n", job->data.debug); break; case ENGINE_JOB_ADD_POLL: JOB_DEBUG ("add poll %p(%p,%u)", job->data.poll.poll_func, job->data.poll.data, job->data.poll.n_fds); if (job->data.poll.n_fds + master_n_pollfds > GSL_ENGINE_MAX_POLLFDS) g_error ("adding poll job exceeds maximum number of poll-fds (%u > %u)", job->data.poll.n_fds + master_n_pollfds, GSL_ENGINE_MAX_POLLFDS); poll = gsl_new_struct0 (Poll, 1); poll->poll_func = job->data.poll.poll_func; poll->data = job->data.poll.data; poll->free_func = job->data.poll.free_func; job->data.poll.free_func = NULL; /* don't free data this round */ poll->n_fds = job->data.poll.n_fds; poll->fds = poll->n_fds ? master_pollfds + master_n_pollfds : master_pollfds; master_n_pollfds += poll->n_fds; if (poll->n_fds) master_pollfds_changed = TRUE; memcpy (poll->fds, job->data.poll.fds, sizeof (poll->fds[0]) * poll->n_fds); poll->next = master_poll_list; master_poll_list = poll; break; case ENGINE_JOB_REMOVE_POLL: JOB_DEBUG ("remove poll %p(%p)", job->data.poll.poll_func, job->data.poll.data); for (poll = master_poll_list, poll_last = NULL; poll; poll_last = poll, poll = poll_last->next) if (poll->poll_func == job->data.poll.poll_func && poll->data == job->data.poll.data) { if (poll_last) poll_last->next = poll->next; else master_poll_list = poll->next; break; } if (poll) { job->data.poll.free_func = poll->free_func; /* free data with job */ poll_last = poll; if (poll_last->n_fds) { for (poll = master_poll_list; poll; poll = poll->next) if (poll->fds > poll_last->fds) poll->fds -= poll_last->n_fds; g_memmove (poll_last->fds, poll_last->fds + poll_last->n_fds, ((guint8*) (master_pollfds + master_n_pollfds)) - ((guint8*) (poll_last->fds + poll_last->n_fds))); master_n_pollfds -= poll_last->n_fds; master_pollfds_changed = TRUE; } gsl_delete_struct (Poll, poll_last); } else g_warning (G_STRLOC ": failed to remove unknown poll function %p(%p)", job->data.poll.poll_func, job->data.poll.data); break; default: g_assert_not_reached (); } JOB_DEBUG ("done"); } static void master_poll_check (glong *timeout_p, gboolean check_with_revents) { gboolean need_processing = FALSE; Poll *poll; if (master_need_process || *timeout_p == 0) { master_need_process = TRUE; return; } for (poll = master_poll_list; poll; poll = poll->next) { glong timeout = -1; if (poll->poll_func (poll->data, gsl_engine_block_size (), &timeout, poll->n_fds, poll->n_fds ? poll->fds : NULL, check_with_revents) || timeout == 0) { need_processing |= TRUE; *timeout_p = 0; break; } else if (timeout > 0) *timeout_p = *timeout_p < 0 ? timeout : MIN (*timeout_p, timeout); } master_need_process = need_processing; } static inline guint64 master_handle_flow_jobs (EngineNode *node, guint64 max_tick) { EngineFlowJob *fjob = _engine_node_pop_flow_job (node, max_tick); if_reject (fjob) do { g_printerr ("FJob: at:%lld from:%lld \n", node->counter, fjob->any.tick_stamp); switch (fjob->fjob_id) { case ENGINE_FLOW_JOB_ACCESS: fjob->access.access_func (&node->module, fjob->access.data); break; default: g_assert_not_reached (); /* FIXME */ } fjob = _engine_node_pop_flow_job (node, max_tick); } while (fjob); return _engine_node_peek_flow_job_stamp (node); } static void master_process_locked_node (EngineNode *node, guint n_values) { guint64 final_counter = GSL_TICK_STAMP + n_values; while (node->counter < final_counter) { guint64 next_counter = master_handle_flow_jobs (node, node->counter); guint64 new_counter = MIN (next_counter, final_counter); guint i, j, diff = node->counter - GSL_TICK_STAMP; for (i = 0; i < ENGINE_NODE_N_ISTREAMS (node); i++) { EngineNode *inode = node->inputs[i].src_node; if (inode) { ENGINE_NODE_LOCK (inode); if (inode->counter < final_counter) master_process_locked_node (inode, final_counter - node->counter); node->module.istreams[i].values = inode->outputs[node->inputs[i].src_stream].buffer; node->module.istreams[i].values += diff; ENGINE_NODE_UNLOCK (inode); } else node->module.istreams[i].values = gsl_engine_master_zero_block; } for (j = 0; j < ENGINE_NODE_N_JSTREAMS (node); j++) for (i = 0; i < node->module.jstreams[j].n_connections; i++) { EngineNode *inode = node->jinputs[j][i].src_node; ENGINE_NODE_LOCK (inode); if (inode->counter < final_counter) master_process_locked_node (inode, final_counter - node->counter); node->module.jstreams[j].values[i] = inode->outputs[node->jinputs[j][i].src_stream].buffer; node->module.jstreams[j].values[i] += diff; ENGINE_NODE_UNLOCK (inode); } for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++) node->module.ostreams[i].values = node->outputs[i].buffer + diff; if_reject (node->reconnected) { node->module.klass->reconnect (&node->module); node->reconnected = FALSE; } node->module.klass->process (&node->module, new_counter - node->counter); for (i = 0; i < ENGINE_NODE_N_OSTREAMS (node); i++) { /* FIXME: this takes the worst possible performance hit to support virtualization */ if (node->module.ostreams[i].values != node->outputs[i].buffer + diff) memcpy (node->outputs[i].buffer + diff, node->module.ostreams[i].values, (new_counter - node->counter) * sizeof (gfloat)); } node->counter = new_counter; } } static GslLong gsl_profile_modules = 0; /* set to 1 in gdb to get profile output */ static void master_process_flow (void) { guint64 new_counter = GSL_TICK_STAMP + gsl_engine_block_size (); GslLong profile_maxtime = 0; GslLong profile_modules = gsl_profile_modules; EngineNode *profile_node = NULL; g_return_if_fail (master_need_process == TRUE); g_assert (gsl_fpu_okround () == TRUE); MAS_DEBUG ("process_flow"); if (master_schedule) { EngineNode *node; _engine_schedule_restart (master_schedule); _engine_set_schedule (master_schedule); node = _engine_pop_unprocessed_node (); while (node) { ToyprofStamp profile_stamp1, profile_stamp2; if_reject (profile_modules) toyprof_stamp (profile_stamp1); master_process_locked_node (node, gsl_engine_block_size ()); if_reject (profile_modules) { GslLong duration; toyprof_stamp (profile_stamp2); duration = toyprof_elapsed (profile_stamp1, profile_stamp2); if (duration > profile_maxtime) { profile_maxtime = duration; profile_node = node; } } _engine_push_processed_node (node); node = _engine_pop_unprocessed_node (); } if_reject (profile_modules) { if (profile_node) { if (profile_maxtime > profile_modules) g_print ("Excess Node: %p Duration: %lu usecs ((void(*)())%p) \n", profile_node, profile_maxtime, profile_node->module.klass->process); else g_print ("Slowest Node: %p Duration: %lu usecs ((void(*)())%p) \r", profile_node, profile_maxtime, profile_node->module.klass->process); } } /* walk unscheduled nodes which have flow jobs */ node = _engine_mnl_head (); while (node && GSL_MNL_HEAD_NODE (node)) { EngineNode *tmp = node->mnl_next; EngineFlowJob *fjob = _engine_node_pop_flow_job (node, new_counter); if (fjob) { while (fjob) { g_printerr ("ignoring flow_job %p\n", fjob); fjob = _engine_node_pop_flow_job (node, new_counter); } _engine_mnl_reorder (node); } node = tmp; } /* nothing new to process, wait on slaves */ _engine_wait_on_unprocessed (); _engine_unset_schedule (master_schedule); _gsl_tick_stamp_inc (); _engine_recycle_const_values (); } master_need_process = FALSE; } static void master_reschedule_flow (void) { EngineNode *node; g_return_if_fail (master_need_reflow == TRUE); MAS_DEBUG ("flow_reschedule"); if (!master_schedule) master_schedule = _engine_schedule_new (); else { _engine_schedule_unsecure (master_schedule); _engine_schedule_clear (master_schedule); } for (node = master_consumer_list; node; node = node->toplevel_next) _engine_schedule_consumer_node (master_schedule, node); _engine_schedule_secure (master_schedule); master_need_reflow = FALSE; } static void master_schedule_discard (void) { g_return_if_fail (master_need_reflow == TRUE); if (master_schedule) { _engine_schedule_unsecure (master_schedule); _engine_schedule_destroy (master_schedule); master_schedule = NULL; } } /* --- MasterThread main loop --- */ gboolean _engine_master_prepare (GslEngineLoop *loop) { gboolean need_dispatch; guint i; g_return_val_if_fail (loop != NULL, FALSE); /* setup and clear pollfds here already, so master_poll_check() gets no junk (and IRIX can't handle non-0 revents) */ loop->fds_changed = master_pollfds_changed; master_pollfds_changed = FALSE; loop->n_fds = master_n_pollfds; loop->fds = master_pollfds; for (i = 0; i < loop->n_fds; i++) loop->fds[i].revents = 0; loop->revents_filled = FALSE; loop->timeout = -1; /* cached checks first */ need_dispatch = master_need_reflow || master_need_process; /* lengthy query */ if (!need_dispatch) need_dispatch = _engine_job_pending (); /* invoke custom poll checks */ if (!need_dispatch) { master_poll_check (&loop->timeout, FALSE); need_dispatch = master_need_process; } if (need_dispatch) loop->timeout = 0; MAS_DEBUG ("PREPARE: need_dispatch=%u timeout=%6ld n_fds=%u", need_dispatch, loop->timeout, loop->n_fds); return need_dispatch; } gboolean _engine_master_check (const GslEngineLoop *loop) { gboolean need_dispatch; g_return_val_if_fail (loop != NULL, FALSE); g_return_val_if_fail (loop->n_fds == master_n_pollfds, FALSE); g_return_val_if_fail (loop->fds == master_pollfds, FALSE); if (loop->n_fds) g_return_val_if_fail (loop->revents_filled == TRUE, FALSE); /* cached checks first */ need_dispatch = master_need_reflow || master_need_process; /* lengthy query */ if (!need_dispatch) need_dispatch = _engine_job_pending (); /* invoke custom poll checks */ if (!need_dispatch) { glong dummy = -1; master_poll_check (&dummy, TRUE); need_dispatch = master_need_process; } MAS_DEBUG ("CHECK: need_dispatch=%u", need_dispatch); return need_dispatch; } void _engine_master_dispatch_jobs (void) { GslJob *job; job = _engine_pop_job (); while (job) { master_process_job (job); job = _engine_pop_job (); /* have to process _all_ jobs */ } } void _engine_master_dispatch (void) { /* processing has prime priority, but we can't process the * network, until all jobs have been handled and if necessary * rescheduled the network. * that's why we have to handle everything at once and can't * preliminarily return after just handling jobs or rescheduling. */ _engine_master_dispatch_jobs (); if (master_need_reflow) master_reschedule_flow (); if (master_need_process) master_process_flow (); } void _engine_master_thread (gpointer data) { gboolean run = TRUE; /* assert sane configuration checks, since we're simply casting structures */ g_assert (sizeof (struct pollfd) == sizeof (GPollFD) && G_STRUCT_OFFSET (GPollFD, fd) == G_STRUCT_OFFSET (struct pollfd, fd) && G_STRUCT_OFFSET (GPollFD, events) == G_STRUCT_OFFSET (struct pollfd, events) && G_STRUCT_OFFSET (GPollFD, revents) == G_STRUCT_OFFSET (struct pollfd, revents)); /* add the thread wakeup pipe to master pollfds, so we get woken * up in time (even though we evaluate the pipe contents later) */ gsl_thread_get_pollfd (master_pollfds); master_n_pollfds += 1; master_pollfds_changed = TRUE; toyprof_stampinit (); while (run) { GslEngineLoop loop; gboolean need_dispatch; need_dispatch = _engine_master_prepare (&loop); if (!need_dispatch) { gint err; err = poll ((struct pollfd*) loop.fds, loop.n_fds, loop.timeout); if (err >= 0) loop.revents_filled = TRUE; else g_printerr (G_STRLOC ": poll() error: %s\n", g_strerror (errno)); if (loop.revents_filled) need_dispatch = _engine_master_check (&loop); } if (need_dispatch) _engine_master_dispatch (); /* handle thread pollfd messages */ run = gsl_thread_sleep (0); } }