diff --git a/sesman/chansrv/pulse/module-xrdp-sink.c b/sesman/chansrv/pulse/module-xrdp-sink.c index b062b299..e985c4b3 100644 --- a/sesman/chansrv/pulse/module-xrdp-sink.c +++ b/sesman/chansrv/pulse/module-xrdp-sink.c @@ -109,14 +109,18 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, struct userdata *u = PA_SINK(o)->userdata; pa_usec_t now; + long lat; - //pa_log("sink_process_msg: code %d", code); + pa_log("sink_process_msg: code %d", code); switch (code) { - case PA_SINK_MESSAGE_SET_STATE: + case PA_SINK_MESSAGE_SET_STATE: /* 9 */ - if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) { + if (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING) /* 0 */ { + pa_log("sink_process_msg: running"); u->timestamp = pa_rtclock_now(); + } else { + pa_log("sink_process_msg: not running"); } break; @@ -124,7 +128,9 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, case PA_SINK_MESSAGE_GET_LATENCY: now = pa_rtclock_now(); - *((pa_usec_t*) data) = u->timestamp > now ? u->timestamp - now : 0ULL; + lat = u->timestamp > now ? u->timestamp - now : 0ULL; + pa_log("sink_process_msg: lat %ld", lat); + *((pa_usec_t*) data) = lat; return 0; } @@ -300,12 +306,47 @@ static int data_send(struct userdata *u) { static void process_render(struct userdata *u, pa_usec_t now) { + pa_memchunk chunk; + int request_bytes; + int index; + size_t ate = 0; + + pa_assert(u); + + /* This is the configured latency. Sink inputs connected to us + might not have a single frame more than the maxrequest value + queed. Hence: at maximum read this many bytes from the sink + inputs. */ + + index = 0; + /* Fill the buffer up the the latency size */ + while (u->timestamp < now + u->block_usec) { + + request_bytes = u->sink->thread_info.max_request; + request_bytes = MIN(request_bytes, 8192); + pa_sink_render(u->sink, request_bytes, &chunk); + index++; + pa_log("bytes %d index %d", chunk.length, index); + pa_memblock_unref(chunk.memblock); + +/* pa_log_debug("Ate %lu bytes.", (unsigned long) chunk.length); */ + u->timestamp += pa_bytes_to_usec(chunk.length, &u->sink->sample_spec); + + ate += chunk.length; + + //if (ate >= u->sink->thread_info.max_request) + // break; + } + +/* pa_log_debug("Ate in sum %lu bytes (of %lu)", (unsigned long) ate, (unsigned long) nbytes); */ + + //pa_log("%d", u->memchunk.length); //pa_log("a"); //if (u->memchunk.length <= 0) - pa_sink_render(u->sink, 8192, &u->memchunk); + // pa_sink_render(u->sink, 8192, &u->memchunk); //pa_log("b"); //data_send(u); @@ -314,7 +355,10 @@ static void process_render(struct userdata *u, pa_usec_t now) { } static void thread_func(void *userdata) { + struct userdata *u = userdata; + int ret; + pa_usec_t now; pa_assert(u); @@ -325,40 +369,44 @@ static void thread_func(void *userdata) { u->timestamp = pa_rtclock_now(); for (;;) { - int ret; - /* Render some data and drop it immediately */ - if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) { - pa_usec_t now; + if (u->sink->thread_info.state == PA_SINK_RUNNING) { now = pa_rtclock_now(); if (u->sink->thread_info.rewind_requested) { - if (u->sink->thread_info.rewind_nbytes > 0) + if (u->sink->thread_info.rewind_nbytes > 0) { process_rewind(u, now); - else + } else { pa_sink_process_rewind(u->sink, 0); + } } - if (u->timestamp <= now) + if (u->timestamp <= now) { process_render(u, now); + } pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp); - } else + + } else { pa_rtpoll_set_timer_disabled(u->rtpoll); + } - /* Hmm, nothing to do. Let's sleep */ - if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) + if ((ret = pa_rtpoll_run(u->rtpoll, TRUE)) < 0) { goto fail; + } - if (ret == 0) + if (ret == 0) { goto finish; + } } fail: /* If this was no regular exit from the loop we have to continue * processing messages until we received PA_MESSAGE_SHUTDOWN */ - pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL); + pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core), + PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, + NULL, NULL); pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN); finish: @@ -430,8 +478,12 @@ int pa__init(pa_module*m) { pa_memchunk_reset(&u->memchunk); -#if defined(PA_CHECK_VERSION) && (PA_CHECK_VERSION(0, 9, 22)) +#if defined(PA_CHECK_VERSION) +#if PA_CHECK_VERSION(0, 9, 22) if (!(u->thread = pa_thread_new("xrdp-sink", thread_func, u))) { +#else + if (!(u->thread = pa_thread_new(thread_func, u))) { +#endif #else if (!(u->thread = pa_thread_new(thread_func, u))) { #endif