summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornotsecure <notsecure@marek.ca>2014-08-04 09:50:32 -0400
committernotsecure <notsecure@marek.ca>2014-08-04 09:50:32 -0400
commitd3e66d73a7b961d424ebfaa3680212f24c241f27 (patch)
tree3c5c69e0a7df5b4a8299f172c3da436f7914d9af
parent401982008eabdc9fd60820f720e65636273a3422 (diff)
one decoding thread per call
-rw-r--r--toxav/toxav.c207
1 files changed, 92 insertions, 115 deletions
diff --git a/toxav/toxav.c b/toxav/toxav.c
index 870dd111..f7b35934 100644
--- a/toxav/toxav.c
+++ b/toxav/toxav.c
@@ -71,6 +71,14 @@ const uint32_t av_VADd = 40;
71 71
72static const uint8_t audio_index = 0, video_index = 1; 72static const uint8_t audio_index = 0, video_index = 1;
73 73
74typedef struct {
75 uint32_t size;
76 uint8_t data[0];
77} DECODE_PACKET;
78
79#define VIDEO_DECODE_QUEUE_SIZE 2
80#define AUDIO_DECODE_QUEUE_SIZE 16
81
74typedef struct _CallSpecific { 82typedef struct _CallSpecific {
75 RTPSession *crtps[2]; /** Audio is first and video is second */ 83 RTPSession *crtps[2]; /** Audio is first and video is second */
76 CodecState *cs;/** Each call have its own encoders and decoders. 84 CodecState *cs;/** Each call have its own encoders and decoders.
@@ -88,16 +96,15 @@ typedef struct _CallSpecific {
88 96
89 _Bool call_active; 97 _Bool call_active;
90 pthread_mutex_t mutex; 98 pthread_mutex_t mutex;
91} CallSpecific;
92 99
93typedef struct { 100 /* used in the "decode on another thread" system */
94 int32_t call_index; 101 volatile _Bool exit, decoding;
95 uint32_t size; 102 uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write;
96 uint8_t data[0]; 103 pthread_mutex_t decode_cond_mutex;
97} DECODE_PACKET; 104 pthread_cond_t decode_cond;
98 105 DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE];
99#define VIDEO_DECODE_QUEUE_SIZE 2 106 DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE];
100#define AUDIO_DECODE_QUEUE_SIZE 16 107} CallSpecific;
101 108
102struct _ToxAv { 109struct _ToxAv {
103 Messenger *messenger; 110 Messenger *messenger;
@@ -111,14 +118,6 @@ struct _ToxAv {
111 void *video_callback_userdata; 118 void *video_callback_userdata;
112 119
113 uint32_t max_calls; 120 uint32_t max_calls;
114
115 /* used in the "decode on another thread" system */
116 volatile _Bool exit, decoding;
117 uint8_t video_decode_read, video_decode_write, audio_decode_read, audio_decode_write;
118 pthread_mutex_t decode_cond_mutex;
119 pthread_cond_t decode_cond;
120 DECODE_PACKET *volatile video_decode_queue[VIDEO_DECODE_QUEUE_SIZE];
121 DECODE_PACKET *volatile audio_decode_queue[AUDIO_DECODE_QUEUE_SIZE];
122}; 121};
123 122
124static void *toxav_decoding(void *arg); 123static void *toxav_decoding(void *arg);
@@ -183,12 +182,6 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
183 av->calls = calloc(sizeof(CallSpecific), max_calls); 182 av->calls = calloc(sizeof(CallSpecific), max_calls);
184 av->max_calls = max_calls; 183 av->max_calls = max_calls;
185 184
186 pthread_mutex_init(&av->decode_cond_mutex, NULL);
187 pthread_cond_init(&av->decode_cond, NULL);
188
189 pthread_t temp;
190 pthread_create(&temp, NULL, toxav_decoding, av);
191
192 return av; 185 return av;
193} 186}
194 187
@@ -201,36 +194,6 @@ ToxAv *toxav_new( Tox *messenger, int32_t max_calls)
201void toxav_kill ( ToxAv *av ) 194void toxav_kill ( ToxAv *av )
202{ 195{
203 int i; 196 int i;
204 DECODE_PACKET *p;
205
206 av->exit = 1;
207 pthread_mutex_lock(&av->decode_cond_mutex);
208 pthread_cond_signal(&av->decode_cond);
209
210 if (av->exit) {
211 pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex);
212 }
213
214 pthread_mutex_unlock(&av->decode_cond_mutex);
215
216 pthread_mutex_destroy(&av->decode_cond_mutex);
217 pthread_cond_destroy(&av->decode_cond);
218
219 for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
220 p = av->video_decode_queue[i];
221
222 if (p) {
223 free(p);
224 }
225 }
226
227 for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
228 p = av->audio_decode_queue[i];
229
230 if (p) {
231 free(p);
232 }
233 }
234 197
235 for (i = 0; i < av->max_calls; i ++) { 198 for (i = 0; i < av->max_calls; i ++) {
236 if ( av->calls[i].crtps[audio_index] ) 199 if ( av->calls[i].crtps[audio_index] )
@@ -525,6 +488,23 @@ int toxav_prepare_transmission ( ToxAv *av, int32_t call_index, uint32_t jbuf_ca
525 488
526 if ( pthread_mutex_init(&call->mutex, NULL) != 0 ) goto error; 489 if ( pthread_mutex_init(&call->mutex, NULL) != 0 ) goto error;
527 490
491 //todo: add error checks
492 pthread_mutex_init(&call->decode_cond_mutex, NULL);
493 pthread_cond_init(&call->decode_cond, NULL);
494
495 void **arg = malloc(2 * sizeof(void*));
496 arg[0] = av;
497 arg[1] = call;
498
499 pthread_t temp;
500 pthread_attr_t attr;
501
502 pthread_attr_init(&attr);
503 pthread_attr_setstacksize(&attr, 1 << 18);
504 pthread_create(&temp, &attr, toxav_decoding, arg);
505 pthread_attr_destroy(&attr);
506
507
528 LOGGER_WARNING("Got here"); 508 LOGGER_WARNING("Got here");
529 call->call_active = 1; 509 call->call_active = 1;
530 510
@@ -576,36 +556,34 @@ int toxav_kill_transmission ( ToxAv *av, int32_t call_index )
576 terminate_queue(call->j_buf); 556 terminate_queue(call->j_buf);
577 call->j_buf = NULL; 557 call->j_buf = NULL;
578 558
579
580 pthread_mutex_lock(&av->decode_cond_mutex);
581 int i; 559 int i;
582 DECODE_PACKET *p; 560 DECODE_PACKET *p;
583 561
584 for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) { 562 call->exit = 1;
585 p = av->video_decode_queue[i]; 563 pthread_mutex_lock(&call->decode_cond_mutex);
564 pthread_cond_signal(&call->decode_cond);
565 pthread_cond_wait(&call->decode_cond, &call->decode_cond_mutex);
566 pthread_mutex_unlock(&call->decode_cond_mutex);
567 pthread_mutex_destroy(&call->decode_cond_mutex);
568 pthread_cond_destroy(&call->decode_cond);
586 569
587 if (p && p->call_index == call_index) { 570 for (i = 0; i != VIDEO_DECODE_QUEUE_SIZE; i++) {
571 p = call->video_decode_queue[i];
572 if (p) {
588 free(p); 573 free(p);
589 av->video_decode_queue[i] = NULL;
590 } 574 }
591 } 575 }
592 576
593 for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) { 577 for (i = 0; i != AUDIO_DECODE_QUEUE_SIZE; i++) {
594 p = av->audio_decode_queue[i]; 578 p = call->audio_decode_queue[i];
595 579 if (p) {
596 if (p && p->call_index == call_index) {
597 free(p); 580 free(p);
598 av->audio_decode_queue[i] = NULL;
599 } 581 }
600 } 582 }
601 583
602 while (av->decoding) {} //use a pthread condition?
603
604 codec_terminate_session(call->cs); 584 codec_terminate_session(call->cs);
605 call->cs = NULL; 585 call->cs = NULL;
606 586
607 pthread_mutex_unlock(&av->decode_cond_mutex);
608
609 pthread_mutex_unlock(&call->mutex); 587 pthread_mutex_unlock(&call->mutex);
610 pthread_mutex_destroy(&call->mutex); 588 pthread_mutex_destroy(&call->mutex);
611 589
@@ -925,9 +903,9 @@ int toxav_has_activity(ToxAv *av, int32_t call_index, int16_t *PCM, uint16_t fra
925} 903}
926 904
927 905
928static void decode_video(ToxAv *av, DECODE_PACKET *p) 906static void decode_video(ToxAv *av, CallSpecific *call, DECODE_PACKET *p)
929{ 907{
930 CallSpecific *call = &av->calls[p->call_index]; 908 int32_t call_index = call - av->calls;
931 909
932 int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US); 910 int rc = vpx_codec_decode(&call->cs->v_decoder, p->data, p->size, NULL, MAX_DECODE_TIME_US);
933 911
@@ -940,7 +918,7 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p)
940 img = vpx_codec_get_frame(&call->cs->v_decoder, &iter); 918 img = vpx_codec_get_frame(&call->cs->v_decoder, &iter);
941 919
942 if (img && av->video_callback) { 920 if (img && av->video_callback) {
943 av->video_callback(av, p->call_index, img, av->video_callback_userdata); 921 av->video_callback(av, call_index, img, av->video_callback_userdata);
944 } else { 922 } else {
945 LOGGER_WARNING("Video packet dropped due to missing callback or no image!"); 923 LOGGER_WARNING("Video packet dropped due to missing callback or no image!");
946 } 924 }
@@ -948,10 +926,9 @@ static void decode_video(ToxAv *av, DECODE_PACKET *p)
948 free(p); 926 free(p);
949} 927}
950 928
951static void decode_audio(ToxAv *av, DECODE_PACKET *p) 929static void decode_audio(ToxAv *av, CallSpecific *call, DECODE_PACKET *p)
952{ 930{
953 int32_t call_index = p->call_index; 931 int32_t call_index = call - av->calls;
954 CallSpecific *call = &av->calls[call_index];
955 932
956 // ToxAvCSettings csettings; 933 // ToxAvCSettings csettings;
957 // toxav_get_peer_csettings(av, call_index, 0, &csettings); 934 // toxav_get_peer_csettings(av, call_index, 0, &csettings);
@@ -975,32 +952,37 @@ static void decode_audio(ToxAv *av, DECODE_PACKET *p)
975 952
976static void *toxav_decoding(void *arg) 953static void *toxav_decoding(void *arg)
977{ 954{
978 ToxAv *av = arg; 955 void **pp = arg;
956 ToxAv *av = pp[0];
957 CallSpecific *call = pp[1];
958 free(pp);
979 959
980 while (!av->exit) { 960 while (!call->exit) {
981 DECODE_PACKET *p; 961 DECODE_PACKET *p;
982 _Bool video = 0; 962 _Bool video = 0;
983 963
984 av->decoding = 0; 964 pthread_mutex_lock(&call->decode_cond_mutex);
985 pthread_mutex_lock(&av->decode_cond_mutex); 965 if(call->exit) {
966 break;
967 }
986 uint8_t r; 968 uint8_t r;
987 969
988 /* first check for available packets, otherwise wait for condition*/ 970 /* first check for available packets, otherwise wait for condition*/
989 r = av->audio_decode_read; 971 r = call->audio_decode_read;
990 p = av->audio_decode_queue[r]; 972 p = call->audio_decode_queue[r];
991 973
992 if (!p) { 974 if (!p) {
993 r = av->video_decode_read; 975 r = call->video_decode_read;
994 p = av->video_decode_queue[r]; 976 p = call->video_decode_queue[r];
995 977
996 if (!p) { 978 if (!p) {
997 pthread_cond_wait(&av->decode_cond, &av->decode_cond_mutex); 979 pthread_cond_wait(&call->decode_cond, &call->decode_cond_mutex);
998 r = av->audio_decode_read; 980 r = call->audio_decode_read;
999 p = av->audio_decode_queue[r]; 981 p = call->audio_decode_queue[r];
1000 982
1001 if (!p) { 983 if (!p) {
1002 r = av->video_decode_read; 984 r = call->video_decode_read;
1003 p = av->video_decode_queue[r]; 985 p = call->video_decode_queue[r];
1004 video = 1; 986 video = 1;
1005 } 987 }
1006 } else { 988 } else {
@@ -1010,30 +992,28 @@ static void *toxav_decoding(void *arg)
1010 992
1011 if (video) { 993 if (video) {
1012 if (p) { 994 if (p) {
1013 av->video_decode_queue[r] = NULL; 995 call->video_decode_queue[r] = NULL;
1014 av->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE; 996 call->video_decode_read = (r + 1) % VIDEO_DECODE_QUEUE_SIZE;
1015 } 997 }
1016 } else { 998 } else {
1017 av->audio_decode_queue[r] = NULL; 999 call->audio_decode_queue[r] = NULL;
1018 av->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE; 1000 call->audio_decode_read = (r + 1) % AUDIO_DECODE_QUEUE_SIZE;
1019 } 1001 }
1020 1002
1021 av->decoding = 1; 1003 pthread_mutex_unlock(&call->decode_cond_mutex);
1022 pthread_mutex_unlock(&av->decode_cond_mutex);
1023 1004
1024 if (p) { 1005 if (p) {
1025 if (video) { 1006 if (video) {
1026 decode_video(av, p); 1007 decode_video(av, call, p);
1027 } else { 1008 } else {
1028 decode_audio(av, p); 1009 decode_audio(av, call, p);
1029 } 1010 }
1030 } 1011 }
1031 } 1012 }
1032 1013
1033 pthread_mutex_lock(&av->decode_cond_mutex); 1014 call->exit = 0;
1034 av->exit = 0; 1015 pthread_cond_signal(&call->decode_cond);
1035 pthread_cond_signal(&av->decode_cond); 1016 pthread_mutex_unlock(&call->decode_cond_mutex);
1036 pthread_mutex_unlock(&av->decode_cond_mutex);
1037 1017
1038 return NULL; 1018 return NULL;
1039} 1019}
@@ -1058,14 +1038,12 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
1058 p = malloc(sizeof(DECODE_PACKET)); 1038 p = malloc(sizeof(DECODE_PACKET));
1059 1039
1060 if (p) { 1040 if (p) {
1061 p->call_index = call_index;
1062 p->size = 0; 1041 p->size = 0;
1063 } 1042 }
1064 } else { 1043 } else {
1065 p = malloc(sizeof(DECODE_PACKET) + _msg->length); 1044 p = malloc(sizeof(DECODE_PACKET) + _msg->length);
1066 1045
1067 if (p) { 1046 if (p) {
1068 p->call_index = call_index;
1069 p->size = _msg->length; 1047 p->size = _msg->length;
1070 memcpy(p->data, _msg->data, _msg->length); 1048 memcpy(p->data, _msg->data, _msg->length);
1071 } 1049 }
@@ -1075,19 +1053,19 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
1075 1053
1076 if (p) { 1054 if (p) {
1077 /* do the decoding on another thread */ 1055 /* do the decoding on another thread */
1078 pthread_mutex_lock(&av->decode_cond_mutex); 1056 pthread_mutex_lock(&call->decode_cond_mutex);
1079 uint8_t w = av->audio_decode_write; 1057 uint8_t w = call->audio_decode_write;
1080 1058
1081 if (av->audio_decode_queue[w] == NULL) { 1059 if (call->audio_decode_queue[w] == NULL) {
1082 av->audio_decode_queue[w] = p; 1060 call->audio_decode_queue[w] = p;
1083 av->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE; 1061 call->audio_decode_write = (w + 1) % AUDIO_DECODE_QUEUE_SIZE;
1084 pthread_cond_signal(&av->decode_cond); 1062 pthread_cond_signal(&call->decode_cond);
1085 } else { 1063 } else {
1086 LOGGER_DEBUG("Dropped audio frame\n"); 1064 LOGGER_DEBUG("Dropped audio frame\n");
1087 free(p); 1065 free(p);
1088 } 1066 }
1089 1067
1090 pthread_mutex_unlock(&av->decode_cond_mutex); 1068 pthread_mutex_unlock(&call->decode_cond_mutex);
1091 } else { 1069 } else {
1092 //malloc failed 1070 //malloc failed
1093 } 1071 }
@@ -1110,24 +1088,23 @@ void toxav_handle_packet(RTPSession *_session, RTPMessage *_msg)
1110 DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit); 1088 DECODE_PACKET *p = malloc(sizeof(DECODE_PACKET) + call->frame_limit);
1111 1089
1112 if (p) { 1090 if (p) {
1113 p->call_index = call_index;
1114 p->size = call->frame_limit; 1091 p->size = call->frame_limit;
1115 memcpy(p->data, call->frame_buf, call->frame_limit); 1092 memcpy(p->data, call->frame_buf, call->frame_limit);
1116 1093
1117 /* do the decoding on another thread */ 1094 /* do the decoding on another thread */
1118 pthread_mutex_lock(&av->decode_cond_mutex); 1095 pthread_mutex_lock(&call->decode_cond_mutex);
1119 uint8_t w = av->video_decode_write; 1096 uint8_t w = call->video_decode_write;
1120 1097
1121 if (av->video_decode_queue[w] == NULL) { 1098 if (call->video_decode_queue[w] == NULL) {
1122 av->video_decode_queue[w] = p; 1099 call->video_decode_queue[w] = p;
1123 av->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE; 1100 call->video_decode_write = (w + 1) % VIDEO_DECODE_QUEUE_SIZE;
1124 pthread_cond_signal(&av->decode_cond); 1101 pthread_cond_signal(&call->decode_cond);
1125 } else { 1102 } else {
1126 LOGGER_DEBUG("Dropped video frame\n"); 1103 LOGGER_DEBUG("Dropped video frame\n");
1127 free(p); 1104 free(p);
1128 } 1105 }
1129 1106
1130 pthread_mutex_unlock(&av->decode_cond_mutex); 1107 pthread_mutex_unlock(&call->decode_cond_mutex);
1131 } else { 1108 } else {
1132 //malloc failed 1109 //malloc failed
1133 } 1110 }