diff options
Diffstat (limited to 'toxav/rtp.c')
-rw-r--r-- | toxav/rtp.c | 874 |
1 files changed, 629 insertions, 245 deletions
diff --git a/toxav/rtp.c b/toxav/rtp.c index 06bc6df5..c5d940e2 100644 --- a/toxav/rtp.c +++ b/toxav/rtp.c | |||
@@ -33,11 +33,557 @@ | |||
33 | #include <errno.h> | 33 | #include <errno.h> |
34 | #include <stdlib.h> | 34 | #include <stdlib.h> |
35 | 35 | ||
36 | enum { | ||
37 | /** | ||
38 | * The number of milliseconds we want to keep a keyframe in the buffer for, | ||
39 | * even though there are no free slots for incoming frames. | ||
40 | */ | ||
41 | VIDEO_KEEP_KEYFRAME_IN_BUFFER_FOR_MS = 15, | ||
42 | }; | ||
43 | |||
44 | // allocate_len is NOT including header! | ||
45 | static struct RTPMessage *new_message(const struct RTPHeader *header, size_t allocate_len, const uint8_t *data, | ||
46 | uint16_t data_length) | ||
47 | { | ||
48 | assert(allocate_len >= data_length); | ||
49 | struct RTPMessage *msg = (struct RTPMessage *)calloc(1, sizeof(struct RTPMessage) + allocate_len); | ||
50 | |||
51 | if (msg == nullptr) { | ||
52 | return nullptr; | ||
53 | } | ||
54 | |||
55 | msg->len = data_length; // result without header | ||
56 | msg->header = *header; | ||
57 | memcpy(msg->data, data, msg->len); | ||
58 | return msg; | ||
59 | } | ||
60 | |||
61 | enum { | ||
62 | /** | ||
63 | * Instruct the caller to clear slot 0. | ||
64 | */ | ||
65 | GET_SLOT_RESULT_DROP_OLDEST_SLOT = -1, | ||
66 | /** | ||
67 | * Instruct the caller to drop the incoming packet. | ||
68 | */ | ||
69 | GET_SLOT_RESULT_DROP_INCOMING = -2, | ||
70 | }; | ||
71 | |||
72 | /** | ||
73 | * Find the next free slot in work_buffer for the incoming data packet. | ||
74 | * | ||
75 | * - If the data packet belongs to a frame thats already in the work_buffer then | ||
76 | * use that slot. | ||
77 | * - If there is no free slot return GET_SLOT_RESULT_DROP_OLDEST_SLOT. | ||
78 | * - If the data packet is too old return GET_SLOT_RESULT_DROP_INCOMING. | ||
79 | * | ||
80 | * If there is a keyframe beeing assembled in slot 0, keep it a bit longer and | ||
81 | * do not kick it out right away if all slots are full instead kick out the new | ||
82 | * incoming interframe. | ||
83 | */ | ||
84 | static int8_t get_slot(Logger *log, struct RTPWorkBufferList *wkbl, bool is_keyframe, | ||
85 | const struct RTPHeader *header, bool is_multipart) | ||
86 | { | ||
87 | if (is_multipart) { | ||
88 | // This RTP message is part of a multipart frame, so we try to find an | ||
89 | // existing slot with the previous parts of the frame in it. | ||
90 | for (uint8_t i = 0; i < wkbl->next_free_entry; i++) { | ||
91 | const struct RTPWorkBuffer *slot = &wkbl->work_buffer[i]; | ||
92 | |||
93 | if ((slot->buf->header.sequnum == header->sequnum) && (slot->buf->header.timestamp == header->timestamp)) { | ||
94 | // Sequence number and timestamp match, so this slot belongs to | ||
95 | // the same frame. | ||
96 | // | ||
97 | // In reality, these will almost certainly either both match or | ||
98 | // both not match. Only if somehow there were 65535 frames | ||
99 | // between, the timestamp will matter. | ||
100 | return i; | ||
101 | } | ||
102 | } | ||
103 | } | ||
104 | |||
105 | // The message may or may not be part of a multipart frame. | ||
106 | // | ||
107 | // If it is part of a multipart frame, then this is an entirely new frame | ||
108 | // for which we did not have a slot *or* the frame is so old that its slot | ||
109 | // has been evicted by now. | ||
110 | // | ||
111 | // |----------- time -----------> | ||
112 | // _________________ | ||
113 | // slot 0 | | | ||
114 | // ----------------- | ||
115 | // _________________ | ||
116 | // slot 1 | | | ||
117 | // ----------------- | ||
118 | // ____________ | ||
119 | // slot 2 | | -> frame too old, drop | ||
120 | // ------------ | ||
121 | // | ||
122 | // | ||
123 | // | ||
124 | // |----------- time -----------> | ||
125 | // _________________ | ||
126 | // slot 0 | | | ||
127 | // ----------------- | ||
128 | // _________________ | ||
129 | // slot 1 | | | ||
130 | // ----------------- | ||
131 | // ____________ | ||
132 | // slot 2 | | -> ok, start filling in a new slot | ||
133 | // ------------ | ||
134 | |||
135 | // If there is a free slot: | ||
136 | if (wkbl->next_free_entry < USED_RTP_WORKBUFFER_COUNT) { | ||
137 | // If there is at least one filled slot: | ||
138 | if (wkbl->next_free_entry > 0) { | ||
139 | // Get the most recently filled slot. | ||
140 | const struct RTPWorkBuffer *slot = &wkbl->work_buffer[wkbl->next_free_entry - 1]; | ||
141 | |||
142 | // If the incoming packet is older than our newest slot, drop it. | ||
143 | // This is the first situation in the above diagram. | ||
144 | if (slot->buf->header.timestamp > header->timestamp) { | ||
145 | LOGGER_DEBUG(log, "workbuffer:2:timestamp too old"); | ||
146 | return GET_SLOT_RESULT_DROP_INCOMING; | ||
147 | } | ||
148 | } | ||
149 | |||
150 | // Not all slots are filled, and the packet is newer than our most | ||
151 | // recent slot, so it's a new frame we want to start assembling. This is | ||
152 | // the second situation in the above diagram. | ||
153 | return wkbl->next_free_entry; | ||
154 | } | ||
155 | |||
156 | // If the incoming frame is a key frame, then stop assembling the oldest | ||
157 | // slot, regardless of whether there was a keyframe in that or not. | ||
158 | if (is_keyframe) { | ||
159 | return GET_SLOT_RESULT_DROP_OLDEST_SLOT; | ||
160 | } | ||
161 | |||
162 | // The incoming slot is not a key frame, so we look at slot 0 to see what to | ||
163 | // do next. | ||
164 | const struct RTPWorkBuffer *slot = &wkbl->work_buffer[0]; | ||
165 | |||
166 | // The incoming frame is not a key frame, but the existing slot 0 is also | ||
167 | // not a keyframe, so we stop assembling the existing frame and make space | ||
168 | // for the new one. | ||
169 | if (!slot->is_keyframe) { | ||
170 | return GET_SLOT_RESULT_DROP_OLDEST_SLOT; | ||
171 | } | ||
172 | |||
173 | // If this key frame is fully received, we also stop assembling and clear | ||
174 | // slot 0. This also means sending the frame to the decoder. | ||
175 | if (slot->received_len == slot->buf->header.data_length_full) { | ||
176 | return GET_SLOT_RESULT_DROP_OLDEST_SLOT; | ||
177 | } | ||
178 | |||
179 | // This is a key frame, not fully received yet, but it's already much older | ||
180 | // than the incoming frame, so we stop assembling it and send whatever part | ||
181 | // we did receive to the decoder. | ||
182 | if (slot->buf->header.timestamp + VIDEO_KEEP_KEYFRAME_IN_BUFFER_FOR_MS <= header->timestamp) { | ||
183 | return GET_SLOT_RESULT_DROP_OLDEST_SLOT; | ||
184 | } | ||
185 | |||
186 | // This is a key frame, it's not too old yet, so we keep it in its slot for | ||
187 | // a little longer. | ||
188 | LOGGER_INFO(log, "keep KEYFRAME in workbuffer"); | ||
189 | return GET_SLOT_RESULT_DROP_INCOMING; | ||
190 | } | ||
191 | |||
192 | /** | ||
193 | * Returns an assembled frame (as much data as we currently have for this frame, | ||
194 | * some pieces may be missing) | ||
195 | * | ||
196 | * If there are no frames ready, we return NULL. If this function returns | ||
197 | * non-NULL, it transfers ownership of the message to the caller, i.e. the | ||
198 | * caller is responsible for storing it elsewhere or calling free(). | ||
199 | */ | ||
200 | static struct RTPMessage *process_frame(Logger *log, struct RTPWorkBufferList *wkbl, uint8_t slot_id) | ||
201 | { | ||
202 | assert(wkbl->next_free_entry >= 0); | ||
203 | |||
204 | if (wkbl->next_free_entry == 0) { | ||
205 | // There are no frames in any slot. | ||
206 | return nullptr; | ||
207 | } | ||
208 | |||
209 | // Slot 0 contains a key frame, slot_id points at an interframe that is | ||
210 | // relative to that key frame, so we don't use it yet. | ||
211 | if (wkbl->work_buffer[0].is_keyframe && slot_id != 0) { | ||
212 | LOGGER_DEBUG(log, "process_frame:KEYFRAME waiting in slot 0"); | ||
213 | return nullptr; | ||
214 | } | ||
215 | |||
216 | // Either slot_id is 0 and slot 0 is a key frame, or there is no key frame | ||
217 | // in slot 0 (and slot_id is anything). | ||
218 | struct RTPWorkBuffer *const slot = &wkbl->work_buffer[slot_id]; | ||
219 | |||
220 | // Move ownership of the frame out of the slot into m_new. | ||
221 | struct RTPMessage *const m_new = slot->buf; | ||
222 | slot->buf = nullptr; | ||
223 | |||
224 | assert(wkbl->next_free_entry >= 1); | ||
225 | |||
226 | if (slot_id != wkbl->next_free_entry - 1) { | ||
227 | // The slot is not the last slot, so we created a gap. We move all the | ||
228 | // entries after it one step up. | ||
229 | for (uint8_t i = slot_id; i < wkbl->next_free_entry - 1; i++) { | ||
230 | // Move entry (i+1) into entry (i). | ||
231 | wkbl->work_buffer[i] = wkbl->work_buffer[i + 1]; | ||
232 | } | ||
233 | } | ||
234 | |||
235 | // We now have a free entry at the end of the array. | ||
236 | wkbl->next_free_entry--; | ||
237 | |||
238 | // Clear the newly freed entry. | ||
239 | const struct RTPWorkBuffer empty = {0}; | ||
240 | wkbl->work_buffer[wkbl->next_free_entry] = empty; | ||
241 | |||
242 | // Move ownership of the frame to the caller. | ||
243 | return m_new; | ||
244 | } | ||
245 | |||
246 | /** | ||
247 | * @param log A logger. | ||
248 | * @param wkbl The list of in-progress frames, i.e. all the slots. | ||
249 | * @param slot_id The slot we want to fill the data into. | ||
250 | * @param is_keyframe Whether the data is part of a key frame. | ||
251 | * @param header The RTP header from the incoming packet. | ||
252 | * @param incoming_data The pure payload without header. | ||
253 | * @param incoming_data_length The length in bytes of the incoming data payload. | ||
254 | */ | ||
255 | static bool fill_data_into_slot(Logger *log, struct RTPWorkBufferList *wkbl, const uint8_t slot_id, bool is_keyframe, | ||
256 | const struct RTPHeader *header, const uint8_t *incoming_data, uint16_t incoming_data_length) | ||
257 | { | ||
258 | // We're either filling the data into an existing slot, or in a new one that | ||
259 | // is the next free entry. | ||
260 | assert(slot_id <= wkbl->next_free_entry); | ||
261 | struct RTPWorkBuffer *const slot = &wkbl->work_buffer[slot_id]; | ||
262 | |||
263 | assert(header != nullptr); | ||
264 | assert(is_keyframe == (bool)(header->flags & RTP_KEY_FRAME)); | ||
265 | |||
266 | if (slot->received_len == 0) { | ||
267 | assert(slot->buf == nullptr); | ||
268 | |||
269 | // No data for this slot has been received, yet, so we create a new | ||
270 | // message for it with enough memory for the entire frame. | ||
271 | struct RTPMessage *msg = (struct RTPMessage *)calloc(1, sizeof(struct RTPMessage) + header->data_length_full); | ||
272 | |||
273 | if (msg == nullptr) { | ||
274 | LOGGER_ERROR(log, "Out of memory while trying to allocate for frame of size %u\n", | ||
275 | (unsigned)header->data_length_full); | ||
276 | // Out of memory: throw away the incoming data. | ||
277 | return false; | ||
278 | } | ||
279 | |||
280 | // Unused in the new video receiving code, as it's 16 bit and can't hold | ||
281 | // the full length of large frames. Instead, we use slot->received_len. | ||
282 | msg->len = 0; | ||
283 | msg->header = *header; | ||
284 | |||
285 | slot->buf = msg; | ||
286 | slot->is_keyframe = is_keyframe; | ||
287 | slot->received_len = 0; | ||
288 | |||
289 | assert(wkbl->next_free_entry < USED_RTP_WORKBUFFER_COUNT); | ||
290 | wkbl->next_free_entry++; | ||
291 | } | ||
292 | |||
293 | // We already checked this when we received the packet, but we rely on it | ||
294 | // here, so assert again. | ||
295 | assert(header->offset_full < header->data_length_full); | ||
296 | |||
297 | // Copy the incoming chunk of data into the correct position in the full | ||
298 | // frame data array. | ||
299 | memcpy( | ||
300 | slot->buf->data + header->offset_full, | ||
301 | incoming_data, | ||
302 | incoming_data_length | ||
303 | ); | ||
304 | |||
305 | // Update the total received length of this slot. | ||
306 | slot->received_len += incoming_data_length; | ||
307 | |||
308 | // Update received length also in the header of the message, for later use. | ||
309 | slot->buf->header.received_length_full = slot->received_len; | ||
310 | |||
311 | return slot->received_len == header->data_length_full; | ||
312 | } | ||
313 | |||
314 | static void update_bwc_values(Logger *log, RTPSession *session, const struct RTPMessage *msg) | ||
315 | { | ||
316 | if (session->first_packets_counter < DISMISS_FIRST_LOST_VIDEO_PACKET_COUNT) { | ||
317 | session->first_packets_counter++; | ||
318 | } else { | ||
319 | uint32_t data_length_full = msg->header.data_length_full; // without header | ||
320 | uint32_t received_length_full = msg->header.received_length_full; // without header | ||
321 | bwc_add_recv(session->bwc, data_length_full); | ||
322 | |||
323 | if (received_length_full < data_length_full) { | ||
324 | LOGGER_DEBUG(log, "BWC: full length=%u received length=%d", data_length_full, received_length_full); | ||
325 | bwc_add_lost(session->bwc, (data_length_full - received_length_full)); | ||
326 | } | ||
327 | } | ||
328 | } | ||
329 | |||
330 | /** | ||
331 | * Handle a single RTP video packet. | ||
332 | * | ||
333 | * The packet may or may not be part of a multipart frame. This function will | ||
334 | * find out and handle it appropriately. | ||
335 | * | ||
336 | * @param session The current RTP session with: | ||
337 | * session->mcb == vc_queue_message() // this function is called from here | ||
338 | * session->mp == struct RTPMessage * | ||
339 | * session->cs == call->video.second // == VCSession created by vc_new() call | ||
340 | * @param header The RTP header deserialised from the packet. | ||
341 | * @param incoming_data The packet data *not* header, i.e. this is the actual | ||
342 | * payload. | ||
343 | * @param incoming_data_length The packet length *not* including header, i.e. | ||
344 | * this is the actual payload length. | ||
345 | * @param log A logger. | ||
346 | * | ||
347 | * @return -1 on error, 0 on success. | ||
348 | */ | ||
349 | static int handle_video_packet(RTPSession *session, const struct RTPHeader *header, | ||
350 | const uint8_t *incoming_data, uint16_t incoming_data_length, Logger *log) | ||
351 | { | ||
352 | // Full frame length in bytes. The frame may be split into multiple packets, | ||
353 | // but this value is the complete assembled frame size. | ||
354 | const uint32_t full_frame_length = header->data_length_full; | ||
355 | |||
356 | // Current offset in the frame. If this is the first packet of a multipart | ||
357 | // frame or it's not a multipart frame, then this value is 0. | ||
358 | const uint32_t offset = header->offset_full; // without header | ||
359 | |||
360 | // The sender tells us whether this is a key frame. | ||
361 | const bool is_keyframe = (header->flags & RTP_KEY_FRAME) != 0; | ||
362 | |||
363 | LOGGER_DEBUG(log, "-- handle_video_packet -- full lens=%u len=%u offset=%u is_keyframe=%s", | ||
364 | (unsigned)incoming_data_length, (unsigned)full_frame_length, (unsigned)offset, is_keyframe ? "K" : "."); | ||
365 | LOGGER_DEBUG(log, "wkbl->next_free_entry:003=%d", session->work_buffer_list->next_free_entry); | ||
366 | |||
367 | const bool is_multipart = full_frame_length != incoming_data_length; | ||
368 | |||
369 | /* The message was sent in single part */ | ||
370 | int8_t slot_id = get_slot(log, session->work_buffer_list, is_keyframe, header, is_multipart); | ||
371 | LOGGER_DEBUG(log, "slot num=%d", slot_id); | ||
372 | |||
373 | // get_slot told us to drop the packet, so we ignore it. | ||
374 | if (slot_id == GET_SLOT_RESULT_DROP_INCOMING) { | ||
375 | return -1; | ||
376 | } | ||
377 | |||
378 | // get_slot said there is no free slot. | ||
379 | if (slot_id == GET_SLOT_RESULT_DROP_OLDEST_SLOT) { | ||
380 | LOGGER_DEBUG(log, "there was no free slot, so we process the oldest frame"); | ||
381 | // We now own the frame. | ||
382 | struct RTPMessage *m_new = process_frame(log, session->work_buffer_list, 0); | ||
383 | |||
384 | // The process_frame function returns NULL if there is no slot 0, i.e. | ||
385 | // the work buffer list is completely empty. It can't be empty, because | ||
386 | // get_slot just told us it's full, so process_frame must return non-null. | ||
387 | assert(m_new != nullptr); | ||
388 | |||
389 | LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-001a b0=%d b1=%d", (int)m_new->data[0], (int)m_new->data[1]); | ||
390 | update_bwc_values(log, session, m_new); | ||
391 | // Pass ownership of m_new to the callback. | ||
392 | session->mcb(session->cs, m_new); | ||
393 | // Now we no longer own m_new. | ||
394 | m_new = nullptr; | ||
395 | |||
396 | // Now we must have a free slot, so we either get that slot, i.e. >= 0, | ||
397 | // or get told to drop the incoming packet if it's too old. | ||
398 | slot_id = get_slot(log, session->work_buffer_list, is_keyframe, header, /* is_multipart */false); | ||
399 | |||
400 | if (slot_id == GET_SLOT_RESULT_DROP_INCOMING) { | ||
401 | // The incoming frame is too old, so we drop it. | ||
402 | return -1; | ||
403 | } | ||
404 | } | ||
405 | |||
406 | // We must have a valid slot here. | ||
407 | assert(slot_id >= 0); | ||
408 | |||
409 | LOGGER_DEBUG(log, "fill_data_into_slot.1"); | ||
410 | |||
411 | // fill in this part into the slot buffer at the correct offset | ||
412 | if (!fill_data_into_slot( | ||
413 | log, | ||
414 | session->work_buffer_list, | ||
415 | slot_id, | ||
416 | is_keyframe, | ||
417 | header, | ||
418 | incoming_data, | ||
419 | incoming_data_length)) { | ||
420 | // Memory allocation failed. Return error. | ||
421 | return -1; | ||
422 | } | ||
423 | |||
424 | struct RTPMessage *m_new = process_frame(log, session->work_buffer_list, slot_id); | ||
425 | |||
426 | if (m_new) { | ||
427 | LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-003a b0=%d b1=%d", (int)m_new->data[0], (int)m_new->data[1]); | ||
428 | update_bwc_values(log, session, m_new); | ||
429 | session->mcb(session->cs, m_new); | ||
430 | |||
431 | m_new = nullptr; | ||
432 | } | ||
433 | |||
434 | return 0; | ||
435 | } | ||
436 | |||
437 | /** | ||
438 | * @return -1 on error, 0 on success. | ||
439 | */ | ||
440 | static int handle_rtp_packet(Messenger *m, uint32_t friendnumber, const uint8_t *data, uint16_t length, void *object) | ||
441 | { | ||
442 | RTPSession *session = (RTPSession *)object; | ||
443 | |||
444 | if (!session || length < RTP_HEADER_SIZE + 1) { | ||
445 | LOGGER_WARNING(m->log, "No session or invalid length of received buffer!"); | ||
446 | return -1; | ||
447 | } | ||
448 | |||
449 | // Get the packet type. | ||
450 | const uint8_t packet_type = data[0]; | ||
451 | ++data; | ||
452 | --length; | ||
453 | |||
454 | // Unpack the header. | ||
455 | struct RTPHeader header; | ||
456 | rtp_header_unpack(data, &header); | ||
457 | |||
458 | if (header.pt != packet_type % 128) { | ||
459 | LOGGER_WARNING(m->log, "RTPHeader packet type and Tox protocol packet type did not agree: %d != %d", | ||
460 | header.pt, packet_type % 128); | ||
461 | return -1; | ||
462 | } | ||
463 | |||
464 | if (header.pt != session->payload_type % 128) { | ||
465 | LOGGER_WARNING(m->log, "RTPHeader packet type does not match this session's payload type: %d != %d", | ||
466 | header.pt, session->payload_type % 128); | ||
467 | return -1; | ||
468 | } | ||
469 | |||
470 | if (header.offset_full >= header.data_length_full) { | ||
471 | LOGGER_ERROR(m->log, "Invalid video packet: frame offset (%u) >= full frame length (%u)", | ||
472 | (unsigned)header.offset_full, (unsigned)header.data_length_full); | ||
473 | return -1; | ||
474 | } | ||
475 | |||
476 | if (header.offset_lower >= header.data_length_lower) { | ||
477 | LOGGER_ERROR(m->log, "Invalid old protocol video packet: frame offset (%u) >= full frame length (%u)", | ||
478 | (unsigned)header.offset_lower, (unsigned)header.data_length_lower); | ||
479 | return -1; | ||
480 | } | ||
481 | |||
482 | LOGGER_DEBUG(m->log, "header.pt %d, video %d", (uint8_t)header.pt, (rtp_TypeVideo % 128)); | ||
483 | |||
484 | // The sender uses the new large-frame capable protocol and is sending a | ||
485 | // video packet. | ||
486 | if ((header.flags & RTP_LARGE_FRAME) && header.pt == (rtp_TypeVideo % 128)) { | ||
487 | return handle_video_packet(session, &header, data + RTP_HEADER_SIZE, length - RTP_HEADER_SIZE, m->log); | ||
488 | } | ||
489 | |||
490 | // everything below here is for the old 16 bit protocol ------------------ | ||
491 | |||
492 | if (header.data_length_lower == length - RTP_HEADER_SIZE) { | ||
493 | /* The message is sent in single part */ | ||
494 | |||
495 | /* Message is not late; pick up the latest parameters */ | ||
496 | session->rsequnum = header.sequnum; | ||
497 | session->rtimestamp = header.timestamp; | ||
498 | bwc_add_recv(session->bwc, length); | ||
499 | |||
500 | /* Invoke processing of active multiparted message */ | ||
501 | if (session->mp) { | ||
502 | session->mcb(session->cs, session->mp); | ||
503 | session->mp = nullptr; | ||
504 | } | ||
505 | |||
506 | /* The message came in the allowed time; | ||
507 | */ | ||
508 | |||
509 | return session->mcb(session->cs, new_message(&header, length - RTP_HEADER_SIZE, data + RTP_HEADER_SIZE, | ||
510 | length - RTP_HEADER_SIZE)); | ||
511 | } | ||
512 | |||
513 | /* The message is sent in multiple parts */ | ||
514 | |||
515 | if (session->mp) { | ||
516 | /* There are 2 possible situations in this case: | ||
517 | * 1) being that we got the part of already processing message. | ||
518 | * 2) being that we got the part of a new/old message. | ||
519 | * | ||
520 | * We handle them differently as we only allow a single multiparted | ||
521 | * processing message | ||
522 | */ | ||
523 | if (session->mp->header.sequnum == header.sequnum && | ||
524 | session->mp->header.timestamp == header.timestamp) { | ||
525 | /* First case */ | ||
526 | |||
527 | /* Make sure we have enough allocated memory */ | ||
528 | if (session->mp->header.data_length_lower - session->mp->len < length - RTP_HEADER_SIZE || | ||
529 | session->mp->header.data_length_lower <= header.offset_lower) { | ||
530 | /* There happened to be some corruption on the stream; | ||
531 | * continue wihtout this part | ||
532 | */ | ||
533 | return 0; | ||
534 | } | ||
535 | |||
536 | memcpy(session->mp->data + header.offset_lower, data + RTP_HEADER_SIZE, | ||
537 | length - RTP_HEADER_SIZE); | ||
538 | session->mp->len += length - RTP_HEADER_SIZE; | ||
539 | bwc_add_recv(session->bwc, length); | ||
540 | |||
541 | if (session->mp->len == session->mp->header.data_length_lower) { | ||
542 | /* Received a full message; now push it for the further | ||
543 | * processing. | ||
544 | */ | ||
545 | session->mcb(session->cs, session->mp); | ||
546 | session->mp = nullptr; | ||
547 | } | ||
548 | } else { | ||
549 | /* Second case */ | ||
550 | if (session->mp->header.timestamp > header.timestamp) { | ||
551 | /* The received message part is from the old message; | ||
552 | * discard it. | ||
553 | */ | ||
554 | return 0; | ||
555 | } | ||
556 | |||
557 | /* Push the previous message for processing */ | ||
558 | session->mcb(session->cs, session->mp); | ||
559 | |||
560 | session->mp = nullptr; | ||
561 | goto NEW_MULTIPARTED; | ||
562 | } | ||
563 | } else { | ||
564 | /* In this case threat the message as if it was received in order | ||
565 | */ | ||
566 | /* This is also a point for new multiparted messages */ | ||
567 | NEW_MULTIPARTED: | ||
568 | |||
569 | /* Message is not late; pick up the latest parameters */ | ||
570 | session->rsequnum = header.sequnum; | ||
571 | session->rtimestamp = header.timestamp; | ||
572 | bwc_add_recv(session->bwc, length); | ||
573 | |||
574 | /* Store message. | ||
575 | */ | ||
576 | session->mp = new_message(&header, header.data_length_lower, data + RTP_HEADER_SIZE, length - RTP_HEADER_SIZE); | ||
577 | memmove(session->mp->data + header.offset_lower, session->mp->data, session->mp->len); | ||
578 | } | ||
579 | |||
580 | return 0; | ||
581 | } | ||
36 | 582 | ||
37 | size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header) | 583 | size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header) |
38 | { | 584 | { |
39 | uint8_t *p = rdata; | 585 | uint8_t *p = rdata; |
40 | *p++ = (header->protocol_version & 3) << 6 | 586 | *p++ = (header->ve & 3) << 6 |
41 | | (header->pe & 1) << 5 | 587 | | (header->pe & 1) << 5 |
42 | | (header->xe & 1) << 4 | 588 | | (header->xe & 1) << 4 |
43 | | (header->cc & 0xf); | 589 | | (header->cc & 0xf); |
@@ -62,11 +608,10 @@ size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header) | |||
62 | return p - rdata; | 608 | return p - rdata; |
63 | } | 609 | } |
64 | 610 | ||
65 | |||
66 | size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header) | 611 | size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header) |
67 | { | 612 | { |
68 | const uint8_t *p = data; | 613 | const uint8_t *p = data; |
69 | header->protocol_version = (*p >> 6) & 3; | 614 | header->ve = (*p >> 6) & 3; |
70 | header->pe = (*p >> 5) & 1; | 615 | header->pe = (*p >> 5) & 1; |
71 | header->xe = (*p >> 4) & 1; | 616 | header->xe = (*p >> 4) & 1; |
72 | header->cc = *p & 0xf; | 617 | header->cc = *p & 0xf; |
@@ -93,44 +638,56 @@ size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header) | |||
93 | } | 638 | } |
94 | 639 | ||
95 | 640 | ||
96 | int handle_rtp_packet(Messenger *m, uint32_t friendnumber, const uint8_t *data, uint16_t length, void *object); | ||
97 | |||
98 | |||
99 | RTPSession *rtp_new(int payload_type, Messenger *m, uint32_t friendnumber, | 641 | RTPSession *rtp_new(int payload_type, Messenger *m, uint32_t friendnumber, |
100 | BWController *bwc, void *cs, | 642 | BWController *bwc, void *cs, |
101 | int (*mcb)(void *, struct RTPMessage *)) | 643 | int (*mcb)(void *, struct RTPMessage *)) |
102 | { | 644 | { |
103 | assert(mcb); | 645 | assert(mcb != nullptr); |
104 | assert(cs); | 646 | assert(cs != nullptr); |
105 | assert(m); | 647 | assert(m != nullptr); |
106 | 648 | ||
107 | RTPSession *retu = (RTPSession *)calloc(1, sizeof(RTPSession)); | 649 | RTPSession *session = (RTPSession *)calloc(1, sizeof(RTPSession)); |
108 | 650 | ||
109 | if (!retu) { | 651 | if (!session) { |
110 | LOGGER_WARNING(m->log, "Alloc failed! Program might misbehave!"); | 652 | LOGGER_WARNING(m->log, "Alloc failed! Program might misbehave!"); |
111 | return nullptr; | 653 | return nullptr; |
112 | } | 654 | } |
113 | 655 | ||
114 | retu->ssrc = random_u32(); | 656 | session->work_buffer_list = (struct RTPWorkBufferList *)calloc(1, sizeof(struct RTPWorkBufferList)); |
115 | retu->payload_type = payload_type; | ||
116 | 657 | ||
117 | retu->m = m; | 658 | if (session->work_buffer_list == nullptr) { |
118 | retu->friend_number = friendnumber; | 659 | LOGGER_ERROR(m->log, "out of memory while allocating work buffer list"); |
660 | free(session); | ||
661 | return nullptr; | ||
662 | } | ||
119 | 663 | ||
120 | /* Also set payload type as prefix */ | 664 | // First entry is free. |
665 | session->work_buffer_list->next_free_entry = 0; | ||
666 | |||
667 | session->ssrc = payload_type == rtp_TypeVideo ? 0 : random_u32(); | ||
668 | session->payload_type = payload_type; | ||
669 | session->m = m; | ||
670 | session->friend_number = friendnumber; | ||
121 | 671 | ||
122 | retu->bwc = bwc; | 672 | // set NULL just in case |
123 | retu->cs = cs; | 673 | session->mp = nullptr; |
124 | retu->mcb = mcb; | 674 | session->first_packets_counter = 1; |
125 | 675 | ||
126 | if (-1 == rtp_allow_receiving(retu)) { | 676 | /* Also set payload type as prefix */ |
677 | session->bwc = bwc; | ||
678 | session->cs = cs; | ||
679 | session->mcb = mcb; | ||
680 | |||
681 | if (-1 == rtp_allow_receiving(session)) { | ||
127 | LOGGER_WARNING(m->log, "Failed to start rtp receiving mode"); | 682 | LOGGER_WARNING(m->log, "Failed to start rtp receiving mode"); |
128 | free(retu); | 683 | free(session->work_buffer_list); |
684 | free(session); | ||
129 | return nullptr; | 685 | return nullptr; |
130 | } | 686 | } |
131 | 687 | ||
132 | return retu; | 688 | return session; |
133 | } | 689 | } |
690 | |||
134 | void rtp_kill(RTPSession *session) | 691 | void rtp_kill(RTPSession *session) |
135 | { | 692 | { |
136 | if (!session) { | 693 | if (!session) { |
@@ -138,10 +695,15 @@ void rtp_kill(RTPSession *session) | |||
138 | } | 695 | } |
139 | 696 | ||
140 | LOGGER_DEBUG(session->m->log, "Terminated RTP session: %p", session); | 697 | LOGGER_DEBUG(session->m->log, "Terminated RTP session: %p", session); |
141 | |||
142 | rtp_stop_receiving(session); | 698 | rtp_stop_receiving(session); |
699 | |||
700 | LOGGER_DEBUG(session->m->log, "Terminated RTP session V3 work_buffer_list->next_free_entry: %d", | ||
701 | (int)session->work_buffer_list->next_free_entry); | ||
702 | |||
703 | free(session->work_buffer_list); | ||
143 | free(session); | 704 | free(session); |
144 | } | 705 | } |
706 | |||
145 | int rtp_allow_receiving(RTPSession *session) | 707 | int rtp_allow_receiving(RTPSession *session) |
146 | { | 708 | { |
147 | if (session == nullptr) { | 709 | if (session == nullptr) { |
@@ -157,6 +719,7 @@ int rtp_allow_receiving(RTPSession *session) | |||
157 | LOGGER_DEBUG(session->m->log, "Started receiving on session: %p", session); | 719 | LOGGER_DEBUG(session->m->log, "Started receiving on session: %p", session); |
158 | return 0; | 720 | return 0; |
159 | } | 721 | } |
722 | |||
160 | int rtp_stop_receiving(RTPSession *session) | 723 | int rtp_stop_receiving(RTPSession *session) |
161 | { | 724 | { |
162 | if (session == nullptr) { | 725 | if (session == nullptr) { |
@@ -168,42 +731,76 @@ int rtp_stop_receiving(RTPSession *session) | |||
168 | LOGGER_DEBUG(session->m->log, "Stopped receiving on session: %p", session); | 731 | LOGGER_DEBUG(session->m->log, "Stopped receiving on session: %p", session); |
169 | return 0; | 732 | return 0; |
170 | } | 733 | } |
171 | int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Logger *log) | 734 | |
735 | /** | ||
736 | * @param input is raw vpx data. | ||
737 | * @param length is the length of the raw data. | ||
738 | */ | ||
739 | int rtp_send_data(RTPSession *session, const uint8_t *data, uint32_t length, | ||
740 | bool is_keyframe, Logger *log) | ||
172 | { | 741 | { |
173 | if (!session) { | 742 | if (!session) { |
174 | LOGGER_ERROR(log, "No session!"); | 743 | LOGGER_ERROR(log, "No session!"); |
175 | return -1; | 744 | return -1; |
176 | } | 745 | } |
177 | 746 | ||
178 | VLA(uint8_t, rdata, length + RTP_HEADER_SIZE + 1); | 747 | uint8_t is_video_payload = 0; |
179 | memset(rdata, 0, SIZEOF_VLA(rdata)); | ||
180 | 748 | ||
181 | rdata[0] = session->payload_type; | 749 | if (session->payload_type == rtp_TypeVideo) { |
750 | is_video_payload = 1; | ||
751 | } | ||
182 | 752 | ||
183 | struct RTPHeader header = {0}; | 753 | struct RTPHeader header = {0}; |
184 | 754 | ||
185 | header.protocol_version = 2; | 755 | header.ve = 2; // this is unused in toxav |
756 | |||
186 | header.pe = 0; | 757 | header.pe = 0; |
758 | |||
187 | header.xe = 0; | 759 | header.xe = 0; |
760 | |||
188 | header.cc = 0; | 761 | header.cc = 0; |
189 | 762 | ||
190 | header.ma = 0; | 763 | header.ma = 0; |
764 | |||
191 | header.pt = session->payload_type % 128; | 765 | header.pt = session->payload_type % 128; |
192 | 766 | ||
193 | header.sequnum = session->sequnum; | 767 | header.sequnum = session->sequnum; |
768 | |||
194 | header.timestamp = current_time_monotonic(); | 769 | header.timestamp = current_time_monotonic(); |
770 | |||
195 | header.ssrc = session->ssrc; | 771 | header.ssrc = session->ssrc; |
196 | 772 | ||
197 | header.offset_lower = 0; | 773 | header.offset_lower = 0; |
774 | |||
775 | // here the highest bits gets stripped anyway, no need to do keyframe bit magic here! | ||
198 | header.data_length_lower = length; | 776 | header.data_length_lower = length; |
199 | 777 | ||
200 | if (MAX_CRYPTO_DATA_SIZE > length + RTP_HEADER_SIZE + 1) { | 778 | header.flags = RTP_LARGE_FRAME; |
201 | 779 | ||
780 | uint16_t length_safe = (uint16_t)length; | ||
781 | |||
782 | if (length > UINT16_MAX) { | ||
783 | length_safe = UINT16_MAX; | ||
784 | } | ||
785 | |||
786 | header.data_length_lower = length_safe; | ||
787 | header.data_length_full = length; // without header | ||
788 | header.offset_lower = 0; | ||
789 | header.offset_full = 0; | ||
790 | |||
791 | if (is_keyframe) { | ||
792 | header.flags |= RTP_KEY_FRAME; | ||
793 | } | ||
794 | |||
795 | VLA(uint8_t, rdata, length + RTP_HEADER_SIZE + 1); | ||
796 | memset(rdata, 0, SIZEOF_VLA(rdata)); | ||
797 | rdata[0] = session->payload_type; // packet id == payload_type | ||
798 | |||
799 | if (MAX_CRYPTO_DATA_SIZE > (length + RTP_HEADER_SIZE + 1)) { | ||
202 | /** | 800 | /** |
203 | * The length is lesser than the maximum allowed length (including header) | 801 | * The length is lesser than the maximum allowed length (including header) |
204 | * Send the packet in single piece. | 802 | * Send the packet in single piece. |
205 | */ | 803 | */ |
206 | |||
207 | rtp_header_pack(rdata + 1, &header); | 804 | rtp_header_pack(rdata + 1, &header); |
208 | memcpy(rdata + 1 + RTP_HEADER_SIZE, data, length); | 805 | memcpy(rdata + 1 + RTP_HEADER_SIZE, data, length); |
209 | 806 | ||
@@ -211,13 +808,11 @@ int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Log | |||
211 | LOGGER_WARNING(session->m->log, "RTP send failed (len: %d)! std error: %s", SIZEOF_VLA(rdata), strerror(errno)); | 808 | LOGGER_WARNING(session->m->log, "RTP send failed (len: %d)! std error: %s", SIZEOF_VLA(rdata), strerror(errno)); |
212 | } | 809 | } |
213 | } else { | 810 | } else { |
214 | |||
215 | /** | 811 | /** |
216 | * The length is greater than the maximum allowed length (including header) | 812 | * The length is greater than the maximum allowed length (including header) |
217 | * Send the packet in multiple pieces. | 813 | * Send the packet in multiple pieces. |
218 | */ | 814 | */ |
219 | 815 | uint32_t sent = 0; | |
220 | uint16_t sent = 0; | ||
221 | uint16_t piece = MAX_CRYPTO_DATA_SIZE - (RTP_HEADER_SIZE + 1); | 816 | uint16_t piece = MAX_CRYPTO_DATA_SIZE - (RTP_HEADER_SIZE + 1); |
222 | 817 | ||
223 | while ((length - sent) + RTP_HEADER_SIZE + 1 > MAX_CRYPTO_DATA_SIZE) { | 818 | while ((length - sent) + RTP_HEADER_SIZE + 1 > MAX_CRYPTO_DATA_SIZE) { |
@@ -232,6 +827,7 @@ int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Log | |||
232 | 827 | ||
233 | sent += piece; | 828 | sent += piece; |
234 | header.offset_lower = sent; | 829 | header.offset_lower = sent; |
830 | header.offset_full = sent; // raw data offset, without any header | ||
235 | } | 831 | } |
236 | 832 | ||
237 | /* Send remaining */ | 833 | /* Send remaining */ |
@@ -252,215 +848,3 @@ int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Log | |||
252 | session->sequnum ++; | 848 | session->sequnum ++; |
253 | return 0; | 849 | return 0; |
254 | } | 850 | } |
255 | |||
256 | |||
257 | static bool chloss(const RTPSession *session, const struct RTPHeader *header) | ||
258 | { | ||
259 | if (header->timestamp < session->rtimestamp) { | ||
260 | uint16_t hosq, lost = 0; | ||
261 | |||
262 | hosq = header->sequnum; | ||
263 | |||
264 | lost = (hosq > session->rsequnum) ? | ||
265 | (session->rsequnum + 65535) - hosq : | ||
266 | session->rsequnum - hosq; | ||
267 | |||
268 | fprintf(stderr, "Lost packet\n"); | ||
269 | |||
270 | while (lost --) { | ||
271 | bwc_add_lost(session->bwc, 0); | ||
272 | } | ||
273 | |||
274 | return true; | ||
275 | } | ||
276 | |||
277 | return false; | ||
278 | } | ||
279 | static struct RTPMessage *new_message(size_t allocate_len, const uint8_t *data, uint16_t data_length) | ||
280 | { | ||
281 | assert(allocate_len >= data_length); | ||
282 | |||
283 | struct RTPMessage *msg = (struct RTPMessage *)calloc(sizeof(struct RTPMessage) + | ||
284 | (allocate_len - RTP_HEADER_SIZE), 1); | ||
285 | |||
286 | if (msg == nullptr) { | ||
287 | return nullptr; | ||
288 | } | ||
289 | |||
290 | msg->len = data_length - RTP_HEADER_SIZE; | ||
291 | rtp_header_unpack(data, &msg->header); | ||
292 | memcpy(msg->data, data + RTP_HEADER_SIZE, allocate_len - RTP_HEADER_SIZE); | ||
293 | |||
294 | return msg; | ||
295 | } | ||
296 | |||
297 | int handle_rtp_packet(Messenger *m, uint32_t friendnumber, const uint8_t *data, uint16_t length, void *object) | ||
298 | { | ||
299 | (void) m; | ||
300 | (void) friendnumber; | ||
301 | |||
302 | RTPSession *session = (RTPSession *)object; | ||
303 | |||
304 | data ++; | ||
305 | length--; | ||
306 | |||
307 | if (!session || length < RTP_HEADER_SIZE) { | ||
308 | LOGGER_WARNING(m->log, "No session or invalid length of received buffer!"); | ||
309 | return -1; | ||
310 | } | ||
311 | |||
312 | struct RTPHeader header; | ||
313 | |||
314 | rtp_header_unpack(data, &header); | ||
315 | |||
316 | if (header.pt != session->payload_type % 128) { | ||
317 | LOGGER_WARNING(m->log, "Invalid payload type with the session"); | ||
318 | return -1; | ||
319 | } | ||
320 | |||
321 | if (header.offset_lower >= header.data_length_lower) { | ||
322 | /* Never allow this case to happen */ | ||
323 | return -1; | ||
324 | } | ||
325 | |||
326 | bwc_feed_avg(session->bwc, length); | ||
327 | |||
328 | if (header.data_length_lower == length - RTP_HEADER_SIZE) { | ||
329 | /* The message is sent in single part */ | ||
330 | |||
331 | /* Only allow messages which have arrived in order; | ||
332 | * drop late messages | ||
333 | */ | ||
334 | if (chloss(session, &header)) { | ||
335 | return 0; | ||
336 | } | ||
337 | |||
338 | /* Message is not late; pick up the latest parameters */ | ||
339 | session->rsequnum = header.sequnum; | ||
340 | session->rtimestamp = header.timestamp; | ||
341 | |||
342 | bwc_add_recv(session->bwc, length); | ||
343 | |||
344 | /* Invoke processing of active multiparted message */ | ||
345 | if (session->mp) { | ||
346 | if (session->mcb) { | ||
347 | session->mcb(session->cs, session->mp); | ||
348 | } else { | ||
349 | free(session->mp); | ||
350 | } | ||
351 | |||
352 | session->mp = nullptr; | ||
353 | } | ||
354 | |||
355 | /* The message came in the allowed time; | ||
356 | * process it only if handler for the session is present. | ||
357 | */ | ||
358 | |||
359 | if (!session->mcb) { | ||
360 | return 0; | ||
361 | } | ||
362 | |||
363 | return session->mcb(session->cs, new_message(length, data, length)); | ||
364 | } | ||
365 | |||
366 | /* The message is sent in multiple parts */ | ||
367 | |||
368 | if (session->mp) { | ||
369 | /* There are 2 possible situations in this case: | ||
370 | * 1) being that we got the part of already processing message. | ||
371 | * 2) being that we got the part of a new/old message. | ||
372 | * | ||
373 | * We handle them differently as we only allow a single multiparted | ||
374 | * processing message | ||
375 | */ | ||
376 | |||
377 | if (session->mp->header.sequnum == header.sequnum && | ||
378 | session->mp->header.timestamp == header.timestamp) { | ||
379 | /* First case */ | ||
380 | |||
381 | /* Make sure we have enough allocated memory */ | ||
382 | if (session->mp->header.data_length_lower - session->mp->len < length - RTP_HEADER_SIZE || | ||
383 | session->mp->header.data_length_lower <= header.offset_lower) { | ||
384 | /* There happened to be some corruption on the stream; | ||
385 | * continue wihtout this part | ||
386 | */ | ||
387 | return 0; | ||
388 | } | ||
389 | |||
390 | memcpy(session->mp->data + header.offset_lower, data + RTP_HEADER_SIZE, | ||
391 | length - RTP_HEADER_SIZE); | ||
392 | |||
393 | session->mp->len += length - RTP_HEADER_SIZE; | ||
394 | |||
395 | bwc_add_recv(session->bwc, length); | ||
396 | |||
397 | if (session->mp->len == session->mp->header.data_length_lower) { | ||
398 | /* Received a full message; now push it for the further | ||
399 | * processing. | ||
400 | */ | ||
401 | if (session->mcb) { | ||
402 | session->mcb(session->cs, session->mp); | ||
403 | } else { | ||
404 | free(session->mp); | ||
405 | } | ||
406 | |||
407 | session->mp = nullptr; | ||
408 | } | ||
409 | } else { | ||
410 | /* Second case */ | ||
411 | |||
412 | if (session->mp->header.timestamp > header.timestamp) { | ||
413 | /* The received message part is from the old message; | ||
414 | * discard it. | ||
415 | */ | ||
416 | return 0; | ||
417 | } | ||
418 | |||
419 | /* Measure missing parts of the old message */ | ||
420 | bwc_add_lost(session->bwc, | ||
421 | (session->mp->header.data_length_lower - session->mp->len) + | ||
422 | |||
423 | /* Must account sizes of rtp headers too */ | ||
424 | ((session->mp->header.data_length_lower - session->mp->len) / | ||
425 | MAX_CRYPTO_DATA_SIZE) * RTP_HEADER_SIZE); | ||
426 | |||
427 | /* Push the previous message for processing */ | ||
428 | if (session->mcb) { | ||
429 | session->mcb(session->cs, session->mp); | ||
430 | } else { | ||
431 | free(session->mp); | ||
432 | } | ||
433 | |||
434 | session->mp = nullptr; | ||
435 | goto NEW_MULTIPARTED; | ||
436 | } | ||
437 | } else { | ||
438 | /* In this case threat the message as if it was received in order | ||
439 | */ | ||
440 | |||
441 | /* This is also a point for new multiparted messages */ | ||
442 | NEW_MULTIPARTED: | ||
443 | |||
444 | /* Only allow messages which have arrived in order; | ||
445 | * drop late messages | ||
446 | */ | ||
447 | if (chloss(session, &header)) { | ||
448 | return 0; | ||
449 | } | ||
450 | |||
451 | /* Message is not late; pick up the latest parameters */ | ||
452 | session->rsequnum = header.sequnum; | ||
453 | session->rtimestamp = header.timestamp; | ||
454 | |||
455 | bwc_add_recv(session->bwc, length); | ||
456 | |||
457 | /* Again, only store message if handler is present | ||
458 | */ | ||
459 | if (session->mcb) { | ||
460 | session->mp = new_message(header.data_length_lower + RTP_HEADER_SIZE, data, length); | ||
461 | memmove(session->mp->data + header.offset_lower, session->mp->data, session->mp->len); | ||
462 | } | ||
463 | } | ||
464 | |||
465 | return 0; | ||
466 | } | ||