From 95710edf400dca97f31c3e210bfba05796849b85 Mon Sep 17 00:00:00 2001 From: notsecure Date: Thu, 31 Jul 2014 14:56:32 -0400 Subject: decode video on separate thread --- toxav/toxav.c | 130 +++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 116 insertions(+), 14 deletions(-) diff --git a/toxav/toxav.c b/toxav/toxav.c index 0aa88919..219f4d8c 100644 --- a/toxav/toxav.c +++ b/toxav/toxav.c @@ -90,6 +90,11 @@ typedef struct _CallSpecific { pthread_mutex_t mutex; } CallSpecific; +typedef struct { + int32_t call_index; + uint32_t size; + uint8_t data[0]; +} DECODE_PACKET; struct _ToxAv { Messenger *messenger; @@ -99,9 +104,16 @@ struct _ToxAv { void (*audio_callback)(ToxAv *, int32_t, int16_t *, int); void (*video_callback)(ToxAv *, int32_t, vpx_image_t *); + volatile _Bool exit, decoding; + pthread_mutex_t decode_cond_mutex; + pthread_cond_t decode_cond; + DECODE_PACKET* volatile video_packet, audio_packet; + uint32_t max_calls; }; +static void* toxav_decoding(void *arg); + static MSICSettings msicsettings_cast (const ToxAvCSettings *from) { MSICSettings csettings; @@ -162,6 +174,12 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls) av->calls = calloc(sizeof(CallSpecific), max_calls); av->max_calls = max_calls; + pthread_t temp; + pthread_create(&temp, NULL, toxav_decoding, av); + + pthread_mutex_init(&av->decode_cond_mutex, NULL); + pthread_cond_init(&av->decode_cond, NULL); + return av; } @@ -175,6 +193,21 @@ void toxav_kill ( ToxAv *av ) { int i = 0; + av->exit = 1; + pthread_mutex_lock(&av->decode_cond_mutex); + pthread_cond_signal(&av->decode_cond); + if(av->exit) { + pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); + } + pthread_mutex_unlock(&av->decode_cond_mutex); + + pthread_mutex_destroy(&av->decode_cond_mutex); + pthread_cond_destroy(&av->decode_cond); + + if(av->video_packet) { + free(av->video_packet); + } + for (; i < av->max_calls; i ++) { if ( av->calls[i].crtps[audio_index] ) rtp_terminate_session(av->calls[i].crtps[audio_index], av->msi_session->messenger_handle); @@ -514,9 +547,21 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index ) call->crtps[video_index] = NULL; terminate_queue(call->j_buf); call->j_buf = NULL; + + + pthread_mutex_lock(&av->decode_cond_mutex); + if(av->video_packet && av->video_packet->call_index == call_index) { + free(av->video_packet); + av->video_packet = NULL; + } + + while(av->decoding) {} //use a pthread condition? + codec_terminate_session(call->cs); call->cs = NULL; + pthread_mutex_unlock(&av->decode_cond_mutex); + pthread_mutex_unlock(&call->mutex); pthread_mutex_destroy(&call->mutex); @@ -835,6 +880,59 @@ int toxav_has_activity(ToxAv *av, int32_t call_index, int16_t *PCM, uint16_t fra return energy_VAD(av->calls[call_index].cs, PCM, frame_size, ref_energy); } + +static void decode_video(ToxAv *av, DECODE_PACKET *p) +{ + CallSpecific *call = &av->calls[p->call_index]; + + int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US); + if (rc != VPX_CODEC_OK) { + LOGGER_ERROR("Error decoding video: %u %s\n", i, vpx_codec_err_to_string(rc)); + } + + vpx_codec_iter_t iter = NULL; + vpx_image_t *img; + img = vpx_codec_get_frame(&call->cs->v_decoder, &iter); + + if (img && av->video_callback) { + av->video_callback(av, p->call_index, img); + } else { + LOGGER_WARNING("Video packet dropped due to missing callback or no image!"); + } + + free(p); +} + +static void* toxav_decoding(void *arg) +{ + ToxAv *av = arg; + + while(!av->exit) { + DECODE_PACKET *p; + av->decoding = 0; + pthread_mutex_lock(&av->decode_cond_mutex); + p = av->video_packet; + if(!p) { + pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); + p = av->video_packet; + } + av->video_packet = NULL; + av->decoding = 1; + pthread_mutex_unlock(&av->decode_cond_mutex); + + if(p) { + decode_video(av, p); + } + } + + pthread_mutex_lock(&av->decode_cond_mutex); + av->exit = 0; + pthread_cond_signal(&av->decode_cond); + pthread_mutex_unlock(&av->decode_cond_mutex); + + return NULL; +} + void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) { ToxAv *av = _session->av; @@ -887,14 +985,27 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) /* piece of current frame */ } else if (i > 0 && i < 128) { /* recieved a piece of a frame ahead, flush current frame and start reading this new frame */ - int rc = vpx_codec_decode(&call->cs->v_decoder, call->frame_buf, call->frame_limit, NULL, MAX_DECODE_TIME_US); + DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit); + p->call_index = call_index; + p->size = call->frame_limit; + memcpy(p->data, call->frame_buf, call->frame_limit); + + /* do the decoding on another thread */ + pthread_mutex_lock(&av->decode_cond_mutex); + + if(!av->video_packet) { + av->video_packet = p; + pthread_cond_signal(&av->decode_cond); + } else { + printf("dropped video frame\n"); + free(p); + } + + pthread_mutex_unlock(&av->decode_cond_mutex); + call->frame_id = packet[0]; memset(call->frame_buf, 0, call->frame_limit); call->frame_limit = 0; - - if (rc != VPX_CODEC_OK) { - LOGGER_ERROR("Error decoding video: %u %s\n", i, vpx_codec_err_to_string(rc)); - } } else { /* old packet, dont read */ LOGGER_DEBUG("Old packet: %u\n", i); @@ -919,15 +1030,6 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) end: ; - vpx_codec_iter_t iter = NULL; - vpx_image_t *img; - img = vpx_codec_get_frame(&call->cs->v_decoder, &iter); - - if (img && av->video_callback) { - av->video_callback(av, call_index, img); - } else - LOGGER_WARNING("Video packet dropped due to missing callback or no image!"); - rtp_free_msg(NULL, _msg); } } -- cgit v1.2.3 From c0a7cbbf7388c1e4881cc4bd054e679760fa7c51 Mon Sep 17 00:00:00 2001 From: notsecure Date: Thu, 31 Jul 2014 15:32:22 -0400 Subject: fix test --- auto_tests/toxav_many_test.c | 1 + 1 file changed, 1 insertion(+) diff --git a/auto_tests/toxav_many_test.c b/auto_tests/toxav_many_test.c index df2be467..299e701d 100644 --- a/auto_tests/toxav_many_test.c +++ b/auto_tests/toxav_many_test.c @@ -127,6 +127,7 @@ static void callback_audio(ToxAv *av, int32_t call_index, int16_t *data, int len static void callback_video(ToxAv *av, int32_t call_index, vpx_image_t *img) { } + void register_callbacks(ToxAv *av, void *data) { toxav_register_callstate_callback(av, callback_call_started, av_OnStart, data); -- cgit v1.2.3 From 7e806aef066ddcc97f90a7cf3a9279018490a57a Mon Sep 17 00:00:00 2001 From: notsecure Date: Thu, 31 Jul 2014 15:35:59 -0400 Subject: fix initialization order --- toxav/toxav.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/toxav/toxav.c b/toxav/toxav.c index 219f4d8c..2d2bcd98 100644 --- a/toxav/toxav.c +++ b/toxav/toxav.c @@ -174,12 +174,12 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls) av->calls = calloc(sizeof(CallSpecific), max_calls); av->max_calls = max_calls; - pthread_t temp; - pthread_create(&temp, NULL, toxav_decoding, av); - pthread_mutex_init(&av->decode_cond_mutex, NULL); pthread_cond_init(&av->decode_cond, NULL); + pthread_t temp; + pthread_create(&temp, NULL, toxav_decoding, av); + return av; } -- cgit v1.2.3 From 8ee3f645b158a85dcb921e471225b48c20f5559a Mon Sep 17 00:00:00 2001 From: notsecure Date: Fri, 1 Aug 2014 10:18:14 -0400 Subject: audio/video decoding on separate thread with a queue --- toxav/toxav.c | 196 ++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 151 insertions(+), 45 deletions(-) diff --git a/toxav/toxav.c b/toxav/toxav.c index 2d2bcd98..280b57d9 100644 --- a/toxav/toxav.c +++ b/toxav/toxav.c @@ -96,6 +96,9 @@ typedef struct { uint8_t data[0]; } DECODE_PACKET; +#define VIDEO_DECODE_QUEUE_SIZE 2 +#define AUDIO_DECODE_QUEUE_SIZE 8 + struct _ToxAv { Messenger *messenger; MSISession *msi_session; /** Main msi session */ @@ -104,12 +107,15 @@ struct _ToxAv { void (*audio_callback)(ToxAv *, int32_t, int16_t *, int); void (*video_callback)(ToxAv *, int32_t, vpx_image_t *); + uint32_t max_calls; + + /* used in the "decode on another thread" system */ volatile _Bool exit, decoding; + uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write; pthread_mutex_t decode_cond_mutex; pthread_cond_t decode_cond; - DECODE_PACKET* volatile video_packet, audio_packet; - - uint32_t max_calls; + DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE]; + DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE]; }; static void* toxav_decoding(void *arg); @@ -191,7 +197,8 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls) */ void toxav_kill ( ToxAv *av ) { - int i = 0; + int i; + DECODE_PACKET *p; av->exit = 1; pthread_mutex_lock(&av->decode_cond_mutex); @@ -204,11 +211,21 @@ void toxav_kill ( ToxAv *av ) pthread_mutex_destroy(&av->decode_cond_mutex); pthread_cond_destroy(&av->decode_cond); - if(av->video_packet) { - free(av->video_packet); + for(i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { + p = av->video_decode_queue[i]; + if(p) { + free(p); + } + } + + for(i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) { + p = av->audio_decode_queue[i]; + if(p) { + free(p); + } } - for (; i < av->max_calls; i ++) { + for (i = 0; i < av->max_calls; i ++) { if ( av->calls[i].crtps[audio_index] ) rtp_terminate_session(av->calls[i].crtps[audio_index], av->msi_session->messenger_handle); @@ -550,9 +567,23 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index ) pthread_mutex_lock(&av->decode_cond_mutex); - if(av->video_packet && av->video_packet->call_index == call_index) { - free(av->video_packet); - av->video_packet = NULL; + int i; + DECODE_PACKET *p; + + for(i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { + p = av->video_decode_queue[i]; + if(p && p->call_index == call_index) { + free(p); + av->video_decode_queue[i] = NULL; + } + } + + for(i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) { + p = av->audio_decode_queue[i]; + if(p && p->call_index == call_index) { + free(p); + av->audio_decode_queue[i] = NULL; + } } while(av->decoding) {} //use a pthread condition? @@ -903,25 +934,81 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p) free(p); } +static void decode_audio(ToxAv *av, DECODE_PACKET *p) +{ + int32_t call_index = p->call_index; + CallSpecific *call = &av->calls[call_index]; + + // ToxAvCSettings csettings; + // toxav_get_peer_csettings(av, call_index, 0, &csettings); + + int frame_size = 10000; /* FIXME: not static? */ + int16_t dest[frame_size]; + + int dec_size = opus_decode(call->cs->audio_decoder, p->data, p->size, dest, frame_size, (p->size == 0)); + free(p); + if (dec_size < 0) { + LOGGER_WARNING("Decoding error: %s", opus_strerror(dec_size)); + return; + } + + if ( av->audio_callback ) + av->audio_callback(av, call_index, dest, dec_size); + else + LOGGER_WARNING("Audio packet dropped due to missing callback!"); +} + static void* toxav_decoding(void *arg) { ToxAv *av = arg; while(!av->exit) { DECODE_PACKET *p; + _Bool video = 0; + av->decoding = 0; pthread_mutex_lock(&av->decode_cond_mutex); - p = av->video_packet; + uint8_t r; + + /* first check for available packets, otherwise wait for condition*/ + r = av->audio_decode_read; + p = av->audio_decode_queue[r]; if(!p) { - pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); - p = av->video_packet; + r = av->video_decode_read; + p = av->video_decode_queue[r]; + if(!p) { + pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); + r = av->audio_decode_read; + p = av->audio_decode_queue[r]; + if(!p) { + r = av->video_decode_read; + p = av->video_decode_queue[r]; + video = 1; + } + } else { + video = 1; + } } - av->video_packet = NULL; + + if(video) { + if(p) { + av->video_decode_queue[r] = NULL; + av->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE; + } + } else { + av->audio_decode_queue[r] = NULL; + av->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE; + } + av->decoding = 1; pthread_mutex_unlock(&av->decode_cond_mutex); if(p) { - decode_video(av, p); + if(video) { + decode_video(av, p); + } else { + decode_audio(av, p); + } } } @@ -944,31 +1031,44 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) if (_session->payload_type == type_audio % 128) { queue(call->j_buf, _msg); - int success = 0, dec_size; - - ToxAvCSettings csettings; - toxav_get_peer_csettings(av, call_index, 0, &csettings); - - int frame_size = 10000; /* FIXME: not static? */ - int16_t dest[frame_size]; + int success = 0; while ((_msg = dequeue(call->j_buf, &success)) || success == 2) { + DECODE_PACKET *p; if (success == 2) { - dec_size = opus_decode(call->cs->audio_decoder, NULL, 0, dest, frame_size, 1); + p = malloc(sizeof(DECODE_PACKET)); + if(p) { + p->call_index = call_index; + p->size = 0; + } } else { - dec_size = opus_decode(call->cs->audio_decoder, _msg->data, _msg->length, dest, frame_size, 0); + p = malloc(sizeof(DECODE_PACKET) + _msg->length); + if(p) { + p->call_index = call_index; + p->size = _msg->length; + memcpy(p->data, _msg->data, _msg->length); + } rtp_free_msg(NULL, _msg); } - if (dec_size < 0) { - LOGGER_WARNING("Decoding error: %s", opus_strerror(dec_size)); - continue; - } + if(p) { + /* do the decoding on another thread */ + pthread_mutex_lock(&av->decode_cond_mutex); + uint8_t w = av->audio_decode_write; + + if(av->audio_decode_queue[w] == NULL) { + av->audio_decode_queue[w] = p; + av->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE; + pthread_cond_signal(&av->decode_cond); + } else { + printf("dropped audio frame\n"); + free(p); + } - if ( av->audio_callback ) - av->audio_callback(av, call_index, dest, dec_size); - else - LOGGER_WARNING("Audio packet dropped due to missing callback!"); + pthread_mutex_unlock(&av->decode_cond_mutex); + } else { + //malloc failed + } } } else { @@ -986,23 +1086,29 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg) } else if (i > 0 && i < 128) { /* recieved a piece of a frame ahead, flush current frame and start reading this new frame */ DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit); - p->call_index = call_index; - p->size = call->frame_limit; - memcpy(p->data, call->frame_buf, call->frame_limit); - - /* do the decoding on another thread */ - pthread_mutex_lock(&av->decode_cond_mutex); + if(p) { + p->call_index = call_index; + p->size = call->frame_limit; + memcpy(p->data, call->frame_buf, call->frame_limit); + + /* do the decoding on another thread */ + pthread_mutex_lock(&av->decode_cond_mutex); + uint8_t w = av->video_decode_write; + + if(av->video_decode_queue[w] == NULL) { + av->video_decode_queue[w] = p; + av->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE; + pthread_cond_signal(&av->decode_cond); + } else { + printf("dropped video frame\n"); + free(p); + } - if(!av->video_packet) { - av->video_packet = p; - pthread_cond_signal(&av->decode_cond); + pthread_mutex_unlock(&av->decode_cond_mutex); } else { - printf("dropped video frame\n"); - free(p); + //malloc failed } - pthread_mutex_unlock(&av->decode_cond_mutex); - call->frame_id = packet[0]; memset(call->frame_buf, 0, call->frame_limit); call->frame_limit = 0; -- cgit v1.2.3