summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornotsecure <notsecure@marek.ca>2014-08-01 10:18:14 -0400
committernotsecure <notsecure@marek.ca>2014-08-01 10:18:14 -0400
commit8ee3f645b158a85dcb921e471225b48c20f5559a (patch)
tree9cc764f26becb3aac5228bae32503126042022ad
parent7e806aef066ddcc97f90a7cf3a9279018490a57a (diff)
audio/video decoding on separate thread with a queue
-rw-r--r--toxav/toxav.c196
1 files changed, 151 insertions, 45 deletions
diff --git a/toxav/toxav.c b/toxav/toxav.c
index 2d2bcd98..280b57d9 100644
--- a/toxav/toxav.c
+++ b/toxav/toxav.c
@@ -96,6 +96,9 @@ typedef struct {
96 uint8_t data[0]; 96 uint8_t data[0];
97} DECODE_PACKET; 97} DECODE_PACKET;
98 98
99#define VIDEO_DECODE_QUEUE_SIZE 2
100#define AUDIO_DECODE_QUEUE_SIZE 8
101
99struct _ToxAv { 102struct _ToxAv {
100 Messenger *messenger; 103 Messenger *messenger;
101 MSISession *msi_session; /** Main msi session */ 104 MSISession *msi_session; /** Main msi session */
@@ -104,12 +107,15 @@ struct _ToxAv {
104 void (*audio_callback)(ToxAv *, int32_t, int16_t *, int); 107 void (*audio_callback)(ToxAv *, int32_t, int16_t *, int);
105 void (*video_callback)(ToxAv *, int32_t, vpx_image_t *); 108 void (*video_callback)(ToxAv *, int32_t, vpx_image_t *);
106 109
110 uint32_t max_calls;
111
112 /* used in the "decode on another thread" system */
107 volatile _Bool exit, decoding; 113 volatile _Bool exit, decoding;
114 uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write;
108 pthread_mutex_t decode_cond_mutex; 115 pthread_mutex_t decode_cond_mutex;
109 pthread_cond_t decode_cond; 116 pthread_cond_t decode_cond;
110 DECODE_PACKET* volatile video_packet, audio_packet; 117 DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE];
111 118 DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE];
112 uint32_t max_calls;
113}; 119};
114 120
115static void* toxav_decoding(void *arg); 121static void* toxav_decoding(void *arg);
@@ -191,7 +197,8 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
191 */ 197 */
192void toxav_kill ( ToxAv *av ) 198void toxav_kill ( ToxAv *av )
193{ 199{
194 int i = 0; 200 int i;
201 DECODE_PACKET *p;
195 202
196 av->exit = 1; 203 av->exit = 1;
197 pthread_mutex_lock(&av->decode_cond_mutex); 204 pthread_mutex_lock(&av->decode_cond_mutex);
@@ -204,11 +211,21 @@ void toxav_kill ( ToxAv *av )
204 pthread_mutex_destroy(&av->decode_cond_mutex); 211 pthread_mutex_destroy(&av->decode_cond_mutex);
205 pthread_cond_destroy(&av->decode_cond); 212 pthread_cond_destroy(&av->decode_cond);
206 213
207 if(av->video_packet) { 214 for(i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
208 free(av->video_packet); 215 p = av->video_decode_queue[i];
216 if(p) {
217 free(p);
218 }
219 }
220
221 for(i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
222 p = av->audio_decode_queue[i];
223 if(p) {
224 free(p);
225 }
209 } 226 }
210 227
211 for (; i < av->max_calls; i ++) { 228 for (i = 0; i < av->max_calls; i ++) {
212 if ( av->calls[i].crtps[audio_index] ) 229 if ( av->calls[i].crtps[audio_index] )
213 rtp_terminate_session(av->calls[i].crtps[audio_index], av->msi_session->messenger_handle); 230 rtp_terminate_session(av->calls[i].crtps[audio_index], av->msi_session->messenger_handle);
214 231
@@ -550,9 +567,23 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index )
550 567
551 568
552 pthread_mutex_lock(&av->decode_cond_mutex); 569 pthread_mutex_lock(&av->decode_cond_mutex);
553 if(av->video_packet && av->video_packet->call_index == call_index) { 570 int i;
554 free(av->video_packet); 571 DECODE_PACKET *p;
555 av->video_packet = NULL; 572
573 for(i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
574 p = av->video_decode_queue[i];
575 if(p && p->call_index == call_index) {
576 free(p);
577 av->video_decode_queue[i] = NULL;
578 }
579 }
580
581 for(i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
582 p = av->audio_decode_queue[i];
583 if(p && p->call_index == call_index) {
584 free(p);
585 av->audio_decode_queue[i] = NULL;
586 }
556 } 587 }
557 588
558 while(av->decoding) {} //use a pthread condition? 589 while(av->decoding) {} //use a pthread condition?
@@ -903,25 +934,81 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p)
903 free(p); 934 free(p);
904} 935}
905 936
937static void decode_audio(ToxAv *av, DECODE_PACKET *p)
938{
939 int32_t call_index = p->call_index;
940 CallSpecific *call = &av->calls[call_index];
941
942 // ToxAvCSettings csettings;
943 // toxav_get_peer_csettings(av, call_index, 0, &csettings);
944
945 int frame_size = 10000; /* FIXME: not static? */
946 int16_t dest[frame_size];
947
948 int dec_size = opus_decode(call->cs->audio_decoder, p->data, p->size, dest, frame_size, (p->size == 0));
949 free(p);
950 if (dec_size < 0) {
951 LOGGER_WARNING("Decoding error: %s", opus_strerror(dec_size));
952 return;
953 }
954
955 if ( av->audio_callback )
956 av->audio_callback(av, call_index, dest, dec_size);
957 else
958 LOGGER_WARNING("Audio packet dropped due to missing callback!");
959}
960
906static void* toxav_decoding(void *arg) 961static void* toxav_decoding(void *arg)
907{ 962{
908 ToxAv *av = arg; 963 ToxAv *av = arg;
909 964
910 while(!av->exit) { 965 while(!av->exit) {
911 DECODE_PACKET *p; 966 DECODE_PACKET *p;
967 _Bool video = 0;
968
912 av->decoding = 0; 969 av->decoding = 0;
913 pthread_mutex_lock(&av->decode_cond_mutex); 970 pthread_mutex_lock(&av->decode_cond_mutex);
914 p = av->video_packet; 971 uint8_t r;
972
973 /* first check for available packets, otherwise wait for condition*/
974 r = av->audio_decode_read;
975 p = av->audio_decode_queue[r];
915 if(!p) { 976 if(!p) {
916 pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); 977 r = av->video_decode_read;
917 p = av->video_packet; 978 p = av->video_decode_queue[r];
979 if(!p) {
980 pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex);
981 r = av->audio_decode_read;
982 p = av->audio_decode_queue[r];
983 if(!p) {
984 r = av->video_decode_read;
985 p = av->video_decode_queue[r];
986 video = 1;
987 }
988 } else {
989 video = 1;
990 }
918 } 991 }
919 av->video_packet = NULL; 992
993 if(video) {
994 if(p) {
995 av->video_decode_queue[r] = NULL;
996 av->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE;
997 }
998 } else {
999 av->audio_decode_queue[r] = NULL;
1000 av->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE;
1001 }
1002
920 av->decoding = 1; 1003 av->decoding = 1;
921 pthread_mutex_unlock(&av->decode_cond_mutex); 1004 pthread_mutex_unlock(&av->decode_cond_mutex);
922 1005
923 if(p) { 1006 if(p) {
924 decode_video(av, p); 1007 if(video) {
1008 decode_video(av, p);
1009 } else {
1010 decode_audio(av, p);
1011 }
925 } 1012 }
926 } 1013 }
927 1014
@@ -944,31 +1031,44 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
944 if (_session->payload_type == type_audio % 128) { 1031 if (_session->payload_type == type_audio % 128) {
945 queue(call->j_buf, _msg); 1032 queue(call->j_buf, _msg);
946 1033
947 int success = 0, dec_size; 1034 int success = 0;
948
949 ToxAvCSettings csettings;
950 toxav_get_peer_csettings(av, call_index, 0, &csettings);
951
952 int frame_size = 10000; /* FIXME: not static? */
953 int16_t dest[frame_size];
954 1035
955 while ((_msg = dequeue(call->j_buf, &success)) || success == 2) { 1036 while ((_msg = dequeue(call->j_buf, &success)) || success == 2) {
1037 DECODE_PACKET *p;
956 if (success == 2) { 1038 if (success == 2) {
957 dec_size = opus_decode(call->cs->audio_decoder, NULL, 0, dest, frame_size, 1); 1039 p = malloc(sizeof(DECODE_PACKET));
1040 if(p) {
1041 p->call_index = call_index;
1042 p->size = 0;
1043 }
958 } else { 1044 } else {
959 dec_size = opus_decode(call->cs->audio_decoder, _msg->data, _msg->length, dest, frame_size, 0); 1045 p = malloc(sizeof(DECODE_PACKET) + _msg->length);
1046 if(p) {
1047 p->call_index = call_index;
1048 p->size = _msg->length;
1049 memcpy(p->data, _msg->data, _msg->length);
1050 }
960 rtp_free_msg(NULL, _msg); 1051 rtp_free_msg(NULL, _msg);
961 } 1052 }
962 1053
963 if (dec_size < 0) { 1054 if(p) {
964 LOGGER_WARNING("Decoding error: %s", opus_strerror(dec_size)); 1055 /* do the decoding on another thread */
965 continue; 1056 pthread_mutex_lock(&av->decode_cond_mutex);
966 } 1057 uint8_t w = av->audio_decode_write;
1058
1059 if(av->audio_decode_queue[w] == NULL) {
1060 av->audio_decode_queue[w] = p;
1061 av->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE;
1062 pthread_cond_signal(&av->decode_cond);
1063 } else {
1064 printf("dropped audio frame\n");
1065 free(p);
1066 }
967 1067
968 if ( av->audio_callback ) 1068 pthread_mutex_unlock(&av->decode_cond_mutex);
969 av->audio_callback(av, call_index, dest, dec_size); 1069 } else {
970 else 1070 //malloc failed
971 LOGGER_WARNING("Audio packet dropped due to missing callback!"); 1071 }
972 } 1072 }
973 1073
974 } else { 1074 } else {
@@ -986,23 +1086,29 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
986 } else if (i > 0 && i < 128) { 1086 } else if (i > 0 && i < 128) {
987 /* recieved a piece of a frame ahead, flush current frame and start reading this new frame */ 1087 /* recieved a piece of a frame ahead, flush current frame and start reading this new frame */
988 DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit); 1088 DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit);
989 p->call_index = call_index; 1089 if(p) {
990 p->size = call->frame_limit; 1090 p->call_index = call_index;
991 memcpy(p->data, call->frame_buf, call->frame_limit); 1091 p->size = call->frame_limit;
992 1092 memcpy(p->data, call->frame_buf, call->frame_limit);
993 /* do the decoding on another thread */ 1093
994 pthread_mutex_lock(&av->decode_cond_mutex); 1094 /* do the decoding on another thread */
1095 pthread_mutex_lock(&av->decode_cond_mutex);
1096 uint8_t w = av->video_decode_write;
1097
1098 if(av->video_decode_queue[w] == NULL) {
1099 av->video_decode_queue[w] = p;
1100 av->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE;
1101 pthread_cond_signal(&av->decode_cond);
1102 } else {
1103 printf("dropped video frame\n");
1104 free(p);
1105 }
995 1106
996 if(!av->video_packet) { 1107 pthread_mutex_unlock(&av->decode_cond_mutex);
997 av->video_packet = p;
998 pthread_cond_signal(&av->decode_cond);
999 } else { 1108 } else {
1000 printf("dropped video frame\n"); 1109 //malloc failed
1001 free(p);
1002 } 1110 }
1003 1111
1004 pthread_mutex_unlock(&av->decode_cond_mutex);
1005
1006 call->frame_id = packet[0]; 1112 call->frame_id = packet[0];
1007 memset(call->frame_buf, 0, call->frame_limit); 1113 memset(call->frame_buf, 0, call->frame_limit);
1008 call->frame_limit = 0; 1114 call->frame_limit = 0;