You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
arts/flow/gsl/gslopnode.h

248 lines
6.3 KiB

/* GSL Engine - Flow module operation engine
* Copyright (C) 2001 Tim Janik
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General
* Public License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GSL_ENGINE_NODE_H__
#define __GSL_ENGINE_NODE_H__
#include "gslengine.h"
#include "gsloputil.h"
#include "gslcommon.h"
#ifdef __cplusplus
extern "C" {
#endif /* __cplusplus */
#define ENGINE_NODE(module) ((EngineNode*) (module))
#define ENGINE_NODE_N_OSTREAMS(node) ((node)->module.klass->n_ostreams)
#define ENGINE_NODE_N_ISTREAMS(node) ((node)->module.klass->n_istreams)
#define ENGINE_NODE_N_JSTREAMS(node) ((node)->module.klass->n_jstreams)
#define ENGINE_NODE_IS_CONSUMER(node) ((node)->is_consumer && \
(node)->output_nodes == NULL)
#define ENGINE_NODE_IS_DEFERRED(node) (FALSE)
#define ENGINE_NODE_IS_SCHEDULED(node) (ENGINE_NODE (node)->sched_tag)
#define ENGINE_NODE_IS_CHEAP(node) (((node)->module.klass->mflags & GSL_COST_CHEAP) != 0)
#define ENGINE_NODE_IS_EXPENSIVE(node) (((node)->module.klass->mflags & GSL_COST_EXPENSIVE) != 0)
#define ENGINE_NODE_LOCK(node) gsl_rec_mutex_lock (&(node)->rec_mutex)
#define ENGINE_NODE_UNLOCK(node) gsl_rec_mutex_unlock (&(node)->rec_mutex)
/* --- debugging and messages --- */
#define ENG_DEBUG GSL_DEBUG_FUNCTION (GSL_MSG_ENGINE, NULL)
#define MAS_DEBUG GSL_DEBUG_FUNCTION (GSL_MSG_MASTER, NULL)
#define JOB_DEBUG GSL_DEBUG_FUNCTION (GSL_MSG_JOBS, NULL)
#define SCHED_DEBUG GSL_DEBUG_FUNCTION (GSL_MSG_SCHED, NULL)
/* --- transactions --- */
typedef union _EngineFlowJob EngineFlowJob;
typedef enum {
ENGINE_JOB_NOP,
ENGINE_JOB_INTEGRATE,
ENGINE_JOB_DISCARD,
ENGINE_JOB_ICONNECT,
ENGINE_JOB_JCONNECT,
ENGINE_JOB_IDISCONNECT,
ENGINE_JOB_JDISCONNECT,
ENGINE_JOB_SET_CONSUMER,
ENGINE_JOB_UNSET_CONSUMER,
ENGINE_JOB_ACCESS,
ENGINE_JOB_ADD_POLL,
ENGINE_JOB_REMOVE_POLL,
ENGINE_JOB_FLOW_JOB,
ENGINE_JOB_DEBUG,
ENGINE_JOB_LAST
} EngineJobType;
struct _GslJob
{
EngineJobType job_id;
GslJob *next;
union {
EngineNode *node;
struct {
EngineNode *dest_node;
guint dest_ijstream;
EngineNode *src_node;
guint src_ostream;
} connection;
struct {
EngineNode *node;
GslAccessFunc access_func;
gpointer data;
GslFreeFunc free_func;
} access;
struct {
GslPollFunc poll_func;
gpointer data;
GslFreeFunc free_func;
guint n_fds;
GPollFD *fds;
} poll;
struct {
EngineNode *node;
EngineFlowJob *fjob;
} flow_job;
gchar *debug;
} data;
};
struct _GslTrans
{
GslJob *jobs_head;
GslJob *jobs_tail;
guint comitted : 1;
GslTrans *cqt_next; /* com-thread-queue */
};
typedef enum {
ENGINE_FLOW_JOB_NOP,
ENGINE_FLOW_JOB_SUSPEND,
ENGINE_FLOW_JOB_RESUME,
ENGINE_FLOW_JOB_ACCESS,
ENGINE_FLOW_JOB_LAST
} EngineFlowJobType;
typedef struct
{
EngineFlowJobType fjob_id;
EngineFlowJob *next;
guint64 tick_stamp;
} EngineFlowJobAny;
typedef struct
{
EngineFlowJobType fjob_id;
EngineFlowJob *next;
guint64 tick_stamp;
GslAccessFunc access_func;
gpointer data;
GslFreeFunc free_func;
} EngineFlowJobAccess;
union _EngineFlowJob
{
EngineFlowJobType fjob_id;
EngineFlowJobAny any;
EngineFlowJobAccess access;
};
/* --- module nodes --- */
typedef struct
{
EngineNode *src_node;
guint src_stream; /* ostream of src_node */
} EngineInput;
typedef struct
{
EngineNode *src_node;
guint src_stream; /* ostream of src_node */
} EngineJInput;
typedef struct
{
gfloat *buffer;
guint n_outputs;
} EngineOutput;
struct _EngineNode /* fields sorted by order of processing access */
{
GslModule module;
GslRecMutex rec_mutex; /* processing lock */
guint64 counter; /* <= GSL_TICK_STAMP */
EngineInput *inputs; /* [ENGINE_NODE_N_ISTREAMS()] */
EngineJInput **jinputs; /* [ENGINE_NODE_N_JSTREAMS()] */
EngineOutput *outputs; /* [ENGINE_NODE_N_OSTREAMS()] */
/* flow jobs */
EngineFlowJob *flow_jobs; /* active jobs */
EngineFlowJob *fjob_first, *fjob_last; /* trash list */
/* master-node-list */
EngineNode *mnl_next;
EngineNode *mnl_prev;
guint integrated : 1;
guint reconnected : 1;
guint is_consumer : 1;
/* scheduler */
guint sched_tag : 1;
guint sched_router_tag : 1;
guint sched_leaf_level;
EngineNode *toplevel_next; /* master-consumer-list, FIXME: overkill, using a GslRing is good enough */
GslRing *output_nodes; /* EngineNode* ring of nodes in ->outputs[] */
};
static void
_engine_node_insert_flow_job (EngineNode *node,
EngineFlowJob *fjob)
{
EngineFlowJob *last = NULL, *tmp = node->flow_jobs;
/* find next position */
while (tmp && tmp->any.tick_stamp <= fjob->any.tick_stamp)
{
last = tmp;
tmp = last->any.next;
}
/* insert before */
fjob->any.next = tmp;
if (last)
last->any.next = fjob;
else
node->flow_jobs = fjob;
}
static inline EngineFlowJob*
_engine_node_pop_flow_job (EngineNode *node,
guint64 tick_stamp)
{
EngineFlowJob *fjob = node->flow_jobs;
if_reject (fjob)
{
if (fjob->any.tick_stamp <= tick_stamp)
{
node->flow_jobs = fjob->any.next;
fjob->any.next = node->fjob_first;
node->fjob_first = fjob;
if (!node->fjob_last)
node->fjob_last = node->fjob_first;
}
else
fjob = NULL;
}
return fjob;
}
static inline guint64
_engine_node_peek_flow_job_stamp (EngineNode *node)
{
EngineFlowJob *fjob = node->flow_jobs;
if_reject (fjob)
return fjob->any.tick_stamp;
return GSL_MAX_TICK_STAMP;
}
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* __GSL_ENGINE_NODE_H__ */