summaryrefslogtreecommitdiff
path: root/toxav/rtp.c
diff options
context:
space:
mode:
Diffstat (limited to 'toxav/rtp.c')
-rw-r--r--toxav/rtp.c874
1 files changed, 629 insertions, 245 deletions
diff --git a/toxav/rtp.c b/toxav/rtp.c
index 06bc6df5..c5d940e2 100644
--- a/toxav/rtp.c
+++ b/toxav/rtp.c
@@ -33,11 +33,557 @@
33#include <errno.h> 33#include <errno.h>
34#include <stdlib.h> 34#include <stdlib.h>
35 35
36enum {
37 /**
38 * The number of milliseconds we want to keep a keyframe in the buffer for,
39 * even though there are no free slots for incoming frames.
40 */
41 VIDEO_KEEP_KEYFRAME_IN_BUFFER_FOR_MS = 15,
42};
43
44// allocate_len is NOT including header!
45static struct RTPMessage *new_message(const struct RTPHeader *header, size_t allocate_len, const uint8_t *data,
46 uint16_t data_length)
47{
48 assert(allocate_len >= data_length);
49 struct RTPMessage *msg = (struct RTPMessage *)calloc(1, sizeof(struct RTPMessage) + allocate_len);
50
51 if (msg == nullptr) {
52 return nullptr;
53 }
54
55 msg->len = data_length; // result without header
56 msg->header = *header;
57 memcpy(msg->data, data, msg->len);
58 return msg;
59}
60
61enum {
62 /**
63 * Instruct the caller to clear slot 0.
64 */
65 GET_SLOT_RESULT_DROP_OLDEST_SLOT = -1,
66 /**
67 * Instruct the caller to drop the incoming packet.
68 */
69 GET_SLOT_RESULT_DROP_INCOMING = -2,
70};
71
72/**
73 * Find the next free slot in work_buffer for the incoming data packet.
74 *
75 * - If the data packet belongs to a frame thats already in the work_buffer then
76 * use that slot.
77 * - If there is no free slot return GET_SLOT_RESULT_DROP_OLDEST_SLOT.
78 * - If the data packet is too old return GET_SLOT_RESULT_DROP_INCOMING.
79 *
80 * If there is a keyframe beeing assembled in slot 0, keep it a bit longer and
81 * do not kick it out right away if all slots are full instead kick out the new
82 * incoming interframe.
83 */
84static int8_t get_slot(Logger *log, struct RTPWorkBufferList *wkbl, bool is_keyframe,
85 const struct RTPHeader *header, bool is_multipart)
86{
87 if (is_multipart) {
88 // This RTP message is part of a multipart frame, so we try to find an
89 // existing slot with the previous parts of the frame in it.
90 for (uint8_t i = 0; i < wkbl->next_free_entry; i++) {
91 const struct RTPWorkBuffer *slot = &wkbl->work_buffer[i];
92
93 if ((slot->buf->header.sequnum == header->sequnum) && (slot->buf->header.timestamp == header->timestamp)) {
94 // Sequence number and timestamp match, so this slot belongs to
95 // the same frame.
96 //
97 // In reality, these will almost certainly either both match or
98 // both not match. Only if somehow there were 65535 frames
99 // between, the timestamp will matter.
100 return i;
101 }
102 }
103 }
104
105 // The message may or may not be part of a multipart frame.
106 //
107 // If it is part of a multipart frame, then this is an entirely new frame
108 // for which we did not have a slot *or* the frame is so old that its slot
109 // has been evicted by now.
110 //
111 // |----------- time ----------->
112 // _________________
113 // slot 0 | |
114 // -----------------
115 // _________________
116 // slot 1 | |
117 // -----------------
118 // ____________
119 // slot 2 | | -> frame too old, drop
120 // ------------
121 //
122 //
123 //
124 // |----------- time ----------->
125 // _________________
126 // slot 0 | |
127 // -----------------
128 // _________________
129 // slot 1 | |
130 // -----------------
131 // ____________
132 // slot 2 | | -> ok, start filling in a new slot
133 // ------------
134
135 // If there is a free slot:
136 if (wkbl->next_free_entry < USED_RTP_WORKBUFFER_COUNT) {
137 // If there is at least one filled slot:
138 if (wkbl->next_free_entry > 0) {
139 // Get the most recently filled slot.
140 const struct RTPWorkBuffer *slot = &wkbl->work_buffer[wkbl->next_free_entry - 1];
141
142 // If the incoming packet is older than our newest slot, drop it.
143 // This is the first situation in the above diagram.
144 if (slot->buf->header.timestamp > header->timestamp) {
145 LOGGER_DEBUG(log, "workbuffer:2:timestamp too old");
146 return GET_SLOT_RESULT_DROP_INCOMING;
147 }
148 }
149
150 // Not all slots are filled, and the packet is newer than our most
151 // recent slot, so it's a new frame we want to start assembling. This is
152 // the second situation in the above diagram.
153 return wkbl->next_free_entry;
154 }
155
156 // If the incoming frame is a key frame, then stop assembling the oldest
157 // slot, regardless of whether there was a keyframe in that or not.
158 if (is_keyframe) {
159 return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
160 }
161
162 // The incoming slot is not a key frame, so we look at slot 0 to see what to
163 // do next.
164 const struct RTPWorkBuffer *slot = &wkbl->work_buffer[0];
165
166 // The incoming frame is not a key frame, but the existing slot 0 is also
167 // not a keyframe, so we stop assembling the existing frame and make space
168 // for the new one.
169 if (!slot->is_keyframe) {
170 return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
171 }
172
173 // If this key frame is fully received, we also stop assembling and clear
174 // slot 0. This also means sending the frame to the decoder.
175 if (slot->received_len == slot->buf->header.data_length_full) {
176 return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
177 }
178
179 // This is a key frame, not fully received yet, but it's already much older
180 // than the incoming frame, so we stop assembling it and send whatever part
181 // we did receive to the decoder.
182 if (slot->buf->header.timestamp + VIDEO_KEEP_KEYFRAME_IN_BUFFER_FOR_MS <= header->timestamp) {
183 return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
184 }
185
186 // This is a key frame, it's not too old yet, so we keep it in its slot for
187 // a little longer.
188 LOGGER_INFO(log, "keep KEYFRAME in workbuffer");
189 return GET_SLOT_RESULT_DROP_INCOMING;
190}
191
192/**
193 * Returns an assembled frame (as much data as we currently have for this frame,
194 * some pieces may be missing)
195 *
196 * If there are no frames ready, we return NULL. If this function returns
197 * non-NULL, it transfers ownership of the message to the caller, i.e. the
198 * caller is responsible for storing it elsewhere or calling free().
199 */
200static struct RTPMessage *process_frame(Logger *log, struct RTPWorkBufferList *wkbl, uint8_t slot_id)
201{
202 assert(wkbl->next_free_entry >= 0);
203
204 if (wkbl->next_free_entry == 0) {
205 // There are no frames in any slot.
206 return nullptr;
207 }
208
209 // Slot 0 contains a key frame, slot_id points at an interframe that is
210 // relative to that key frame, so we don't use it yet.
211 if (wkbl->work_buffer[0].is_keyframe && slot_id != 0) {
212 LOGGER_DEBUG(log, "process_frame:KEYFRAME waiting in slot 0");
213 return nullptr;
214 }
215
216 // Either slot_id is 0 and slot 0 is a key frame, or there is no key frame
217 // in slot 0 (and slot_id is anything).
218 struct RTPWorkBuffer *const slot = &wkbl->work_buffer[slot_id];
219
220 // Move ownership of the frame out of the slot into m_new.
221 struct RTPMessage *const m_new = slot->buf;
222 slot->buf = nullptr;
223
224 assert(wkbl->next_free_entry >= 1);
225
226 if (slot_id != wkbl->next_free_entry - 1) {
227 // The slot is not the last slot, so we created a gap. We move all the
228 // entries after it one step up.
229 for (uint8_t i = slot_id; i < wkbl->next_free_entry - 1; i++) {
230 // Move entry (i+1) into entry (i).
231 wkbl->work_buffer[i] = wkbl->work_buffer[i + 1];
232 }
233 }
234
235 // We now have a free entry at the end of the array.
236 wkbl->next_free_entry--;
237
238 // Clear the newly freed entry.
239 const struct RTPWorkBuffer empty = {0};
240 wkbl->work_buffer[wkbl->next_free_entry] = empty;
241
242 // Move ownership of the frame to the caller.
243 return m_new;
244}
245
246/**
247 * @param log A logger.
248 * @param wkbl The list of in-progress frames, i.e. all the slots.
249 * @param slot_id The slot we want to fill the data into.
250 * @param is_keyframe Whether the data is part of a key frame.
251 * @param header The RTP header from the incoming packet.
252 * @param incoming_data The pure payload without header.
253 * @param incoming_data_length The length in bytes of the incoming data payload.
254 */
255static bool fill_data_into_slot(Logger *log, struct RTPWorkBufferList *wkbl, const uint8_t slot_id, bool is_keyframe,
256 const struct RTPHeader *header, const uint8_t *incoming_data, uint16_t incoming_data_length)
257{
258 // We're either filling the data into an existing slot, or in a new one that
259 // is the next free entry.
260 assert(slot_id <= wkbl->next_free_entry);
261 struct RTPWorkBuffer *const slot = &wkbl->work_buffer[slot_id];
262
263 assert(header != nullptr);
264 assert(is_keyframe == (bool)(header->flags & RTP_KEY_FRAME));
265
266 if (slot->received_len == 0) {
267 assert(slot->buf == nullptr);
268
269 // No data for this slot has been received, yet, so we create a new
270 // message for it with enough memory for the entire frame.
271 struct RTPMessage *msg = (struct RTPMessage *)calloc(1, sizeof(struct RTPMessage) + header->data_length_full);
272
273 if (msg == nullptr) {
274 LOGGER_ERROR(log, "Out of memory while trying to allocate for frame of size %u\n",
275 (unsigned)header->data_length_full);
276 // Out of memory: throw away the incoming data.
277 return false;
278 }
279
280 // Unused in the new video receiving code, as it's 16 bit and can't hold
281 // the full length of large frames. Instead, we use slot->received_len.
282 msg->len = 0;
283 msg->header = *header;
284
285 slot->buf = msg;
286 slot->is_keyframe = is_keyframe;
287 slot->received_len = 0;
288
289 assert(wkbl->next_free_entry < USED_RTP_WORKBUFFER_COUNT);
290 wkbl->next_free_entry++;
291 }
292
293 // We already checked this when we received the packet, but we rely on it
294 // here, so assert again.
295 assert(header->offset_full < header->data_length_full);
296
297 // Copy the incoming chunk of data into the correct position in the full
298 // frame data array.
299 memcpy(
300 slot->buf->data + header->offset_full,
301 incoming_data,
302 incoming_data_length
303 );
304
305 // Update the total received length of this slot.
306 slot->received_len += incoming_data_length;
307
308 // Update received length also in the header of the message, for later use.
309 slot->buf->header.received_length_full = slot->received_len;
310
311 return slot->received_len == header->data_length_full;
312}
313
314static void update_bwc_values(Logger *log, RTPSession *session, const struct RTPMessage *msg)
315{
316 if (session->first_packets_counter < DISMISS_FIRST_LOST_VIDEO_PACKET_COUNT) {
317 session->first_packets_counter++;
318 } else {
319 uint32_t data_length_full = msg->header.data_length_full; // without header
320 uint32_t received_length_full = msg->header.received_length_full; // without header
321 bwc_add_recv(session->bwc, data_length_full);
322
323 if (received_length_full < data_length_full) {
324 LOGGER_DEBUG(log, "BWC: full length=%u received length=%d", data_length_full, received_length_full);
325 bwc_add_lost(session->bwc, (data_length_full - received_length_full));
326 }
327 }
328}
329
330/**
331 * Handle a single RTP video packet.
332 *
333 * The packet may or may not be part of a multipart frame. This function will
334 * find out and handle it appropriately.
335 *
336 * @param session The current RTP session with:
337 * session->mcb == vc_queue_message() // this function is called from here
338 * session->mp == struct RTPMessage *
339 * session->cs == call->video.second // == VCSession created by vc_new() call
340 * @param header The RTP header deserialised from the packet.
341 * @param incoming_data The packet data *not* header, i.e. this is the actual
342 * payload.
343 * @param incoming_data_length The packet length *not* including header, i.e.
344 * this is the actual payload length.
345 * @param log A logger.
346 *
347 * @return -1 on error, 0 on success.
348 */
349static int handle_video_packet(RTPSession *session, const struct RTPHeader *header,
350 const uint8_t *incoming_data, uint16_t incoming_data_length, Logger *log)
351{
352 // Full frame length in bytes. The frame may be split into multiple packets,
353 // but this value is the complete assembled frame size.
354 const uint32_t full_frame_length = header->data_length_full;
355
356 // Current offset in the frame. If this is the first packet of a multipart
357 // frame or it's not a multipart frame, then this value is 0.
358 const uint32_t offset = header->offset_full; // without header
359
360 // The sender tells us whether this is a key frame.
361 const bool is_keyframe = (header->flags & RTP_KEY_FRAME) != 0;
362
363 LOGGER_DEBUG(log, "-- handle_video_packet -- full lens=%u len=%u offset=%u is_keyframe=%s",
364 (unsigned)incoming_data_length, (unsigned)full_frame_length, (unsigned)offset, is_keyframe ? "K" : ".");
365 LOGGER_DEBUG(log, "wkbl->next_free_entry:003=%d", session->work_buffer_list->next_free_entry);
366
367 const bool is_multipart = full_frame_length != incoming_data_length;
368
369 /* The message was sent in single part */
370 int8_t slot_id = get_slot(log, session->work_buffer_list, is_keyframe, header, is_multipart);
371 LOGGER_DEBUG(log, "slot num=%d", slot_id);
372
373 // get_slot told us to drop the packet, so we ignore it.
374 if (slot_id == GET_SLOT_RESULT_DROP_INCOMING) {
375 return -1;
376 }
377
378 // get_slot said there is no free slot.
379 if (slot_id == GET_SLOT_RESULT_DROP_OLDEST_SLOT) {
380 LOGGER_DEBUG(log, "there was no free slot, so we process the oldest frame");
381 // We now own the frame.
382 struct RTPMessage *m_new = process_frame(log, session->work_buffer_list, 0);
383
384 // The process_frame function returns NULL if there is no slot 0, i.e.
385 // the work buffer list is completely empty. It can't be empty, because
386 // get_slot just told us it's full, so process_frame must return non-null.
387 assert(m_new != nullptr);
388
389 LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-001a b0=%d b1=%d", (int)m_new->data[0], (int)m_new->data[1]);
390 update_bwc_values(log, session, m_new);
391 // Pass ownership of m_new to the callback.
392 session->mcb(session->cs, m_new);
393 // Now we no longer own m_new.
394 m_new = nullptr;
395
396 // Now we must have a free slot, so we either get that slot, i.e. >= 0,
397 // or get told to drop the incoming packet if it's too old.
398 slot_id = get_slot(log, session->work_buffer_list, is_keyframe, header, /* is_multipart */false);
399
400 if (slot_id == GET_SLOT_RESULT_DROP_INCOMING) {
401 // The incoming frame is too old, so we drop it.
402 return -1;
403 }
404 }
405
406 // We must have a valid slot here.
407 assert(slot_id >= 0);
408
409 LOGGER_DEBUG(log, "fill_data_into_slot.1");
410
411 // fill in this part into the slot buffer at the correct offset
412 if (!fill_data_into_slot(
413 log,
414 session->work_buffer_list,
415 slot_id,
416 is_keyframe,
417 header,
418 incoming_data,
419 incoming_data_length)) {
420 // Memory allocation failed. Return error.
421 return -1;
422 }
423
424 struct RTPMessage *m_new = process_frame(log, session->work_buffer_list, slot_id);
425
426 if (m_new) {
427 LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-003a b0=%d b1=%d", (int)m_new->data[0], (int)m_new->data[1]);
428 update_bwc_values(log, session, m_new);
429 session->mcb(session->cs, m_new);
430
431 m_new = nullptr;
432 }
433
434 return 0;
435}
436
437/**
438 * @return -1 on error, 0 on success.
439 */
440static int handle_rtp_packet(Messenger *m, uint32_t friendnumber, const uint8_t *data, uint16_t length, void *object)
441{
442 RTPSession *session = (RTPSession *)object;
443
444 if (!session || length < RTP_HEADER_SIZE + 1) {
445 LOGGER_WARNING(m->log, "No session or invalid length of received buffer!");
446 return -1;
447 }
448
449 // Get the packet type.
450 const uint8_t packet_type = data[0];
451 ++data;
452 --length;
453
454 // Unpack the header.
455 struct RTPHeader header;
456 rtp_header_unpack(data, &header);
457
458 if (header.pt != packet_type % 128) {
459 LOGGER_WARNING(m->log, "RTPHeader packet type and Tox protocol packet type did not agree: %d != %d",
460 header.pt, packet_type % 128);
461 return -1;
462 }
463
464 if (header.pt != session->payload_type % 128) {
465 LOGGER_WARNING(m->log, "RTPHeader packet type does not match this session's payload type: %d != %d",
466 header.pt, session->payload_type % 128);
467 return -1;
468 }
469
470 if (header.offset_full >= header.data_length_full) {
471 LOGGER_ERROR(m->log, "Invalid video packet: frame offset (%u) >= full frame length (%u)",
472 (unsigned)header.offset_full, (unsigned)header.data_length_full);
473 return -1;
474 }
475
476 if (header.offset_lower >= header.data_length_lower) {
477 LOGGER_ERROR(m->log, "Invalid old protocol video packet: frame offset (%u) >= full frame length (%u)",
478 (unsigned)header.offset_lower, (unsigned)header.data_length_lower);
479 return -1;
480 }
481
482 LOGGER_DEBUG(m->log, "header.pt %d, video %d", (uint8_t)header.pt, (rtp_TypeVideo % 128));
483
484 // The sender uses the new large-frame capable protocol and is sending a
485 // video packet.
486 if ((header.flags & RTP_LARGE_FRAME) && header.pt == (rtp_TypeVideo % 128)) {
487 return handle_video_packet(session, &header, data + RTP_HEADER_SIZE, length - RTP_HEADER_SIZE, m->log);
488 }
489
490 // everything below here is for the old 16 bit protocol ------------------
491
492 if (header.data_length_lower == length - RTP_HEADER_SIZE) {
493 /* The message is sent in single part */
494
495 /* Message is not late; pick up the latest parameters */
496 session->rsequnum = header.sequnum;
497 session->rtimestamp = header.timestamp;
498 bwc_add_recv(session->bwc, length);
499
500 /* Invoke processing of active multiparted message */
501 if (session->mp) {
502 session->mcb(session->cs, session->mp);
503 session->mp = nullptr;
504 }
505
506 /* The message came in the allowed time;
507 */
508
509 return session->mcb(session->cs, new_message(&header, length - RTP_HEADER_SIZE, data + RTP_HEADER_SIZE,
510 length - RTP_HEADER_SIZE));
511 }
512
513 /* The message is sent in multiple parts */
514
515 if (session->mp) {
516 /* There are 2 possible situations in this case:
517 * 1) being that we got the part of already processing message.
518 * 2) being that we got the part of a new/old message.
519 *
520 * We handle them differently as we only allow a single multiparted
521 * processing message
522 */
523 if (session->mp->header.sequnum == header.sequnum &&
524 session->mp->header.timestamp == header.timestamp) {
525 /* First case */
526
527 /* Make sure we have enough allocated memory */
528 if (session->mp->header.data_length_lower - session->mp->len < length - RTP_HEADER_SIZE ||
529 session->mp->header.data_length_lower <= header.offset_lower) {
530 /* There happened to be some corruption on the stream;
531 * continue wihtout this part
532 */
533 return 0;
534 }
535
536 memcpy(session->mp->data + header.offset_lower, data + RTP_HEADER_SIZE,
537 length - RTP_HEADER_SIZE);
538 session->mp->len += length - RTP_HEADER_SIZE;
539 bwc_add_recv(session->bwc, length);
540
541 if (session->mp->len == session->mp->header.data_length_lower) {
542 /* Received a full message; now push it for the further
543 * processing.
544 */
545 session->mcb(session->cs, session->mp);
546 session->mp = nullptr;
547 }
548 } else {
549 /* Second case */
550 if (session->mp->header.timestamp > header.timestamp) {
551 /* The received message part is from the old message;
552 * discard it.
553 */
554 return 0;
555 }
556
557 /* Push the previous message for processing */
558 session->mcb(session->cs, session->mp);
559
560 session->mp = nullptr;
561 goto NEW_MULTIPARTED;
562 }
563 } else {
564 /* In this case threat the message as if it was received in order
565 */
566 /* This is also a point for new multiparted messages */
567NEW_MULTIPARTED:
568
569 /* Message is not late; pick up the latest parameters */
570 session->rsequnum = header.sequnum;
571 session->rtimestamp = header.timestamp;
572 bwc_add_recv(session->bwc, length);
573
574 /* Store message.
575 */
576 session->mp = new_message(&header, header.data_length_lower, data + RTP_HEADER_SIZE, length - RTP_HEADER_SIZE);
577 memmove(session->mp->data + header.offset_lower, session->mp->data, session->mp->len);
578 }
579
580 return 0;
581}
36 582
37size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header) 583size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header)
38{ 584{
39 uint8_t *p = rdata; 585 uint8_t *p = rdata;
40 *p++ = (header->protocol_version & 3) << 6 586 *p++ = (header->ve & 3) << 6
41 | (header->pe & 1) << 5 587 | (header->pe & 1) << 5
42 | (header->xe & 1) << 4 588 | (header->xe & 1) << 4
43 | (header->cc & 0xf); 589 | (header->cc & 0xf);
@@ -62,11 +608,10 @@ size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header)
62 return p - rdata; 608 return p - rdata;
63} 609}
64 610
65
66size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header) 611size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header)
67{ 612{
68 const uint8_t *p = data; 613 const uint8_t *p = data;
69 header->protocol_version = (*p >> 6) & 3; 614 header->ve = (*p >> 6) & 3;
70 header->pe = (*p >> 5) & 1; 615 header->pe = (*p >> 5) & 1;
71 header->xe = (*p >> 4) & 1; 616 header->xe = (*p >> 4) & 1;
72 header->cc = *p & 0xf; 617 header->cc = *p & 0xf;
@@ -93,44 +638,56 @@ size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header)
93} 638}
94 639
95 640
96int handle_rtp_packet(Messenger *m, uint32_t friendnumber, const uint8_t *data, uint16_t length, void *object);
97
98
99RTPSession *rtp_new(int payload_type, Messenger *m, uint32_t friendnumber, 641RTPSession *rtp_new(int payload_type, Messenger *m, uint32_t friendnumber,
100 BWController *bwc, void *cs, 642 BWController *bwc, void *cs,
101 int (*mcb)(void *, struct RTPMessage *)) 643 int (*mcb)(void *, struct RTPMessage *))
102{ 644{
103 assert(mcb); 645 assert(mcb != nullptr);
104 assert(cs); 646 assert(cs != nullptr);
105 assert(m); 647 assert(m != nullptr);
106 648
107 RTPSession *retu = (RTPSession *)calloc(1, sizeof(RTPSession)); 649 RTPSession *session = (RTPSession *)calloc(1, sizeof(RTPSession));
108 650
109 if (!retu) { 651 if (!session) {
110 LOGGER_WARNING(m->log, "Alloc failed! Program might misbehave!"); 652 LOGGER_WARNING(m->log, "Alloc failed! Program might misbehave!");
111 return nullptr; 653 return nullptr;
112 } 654 }
113 655
114 retu->ssrc = random_u32(); 656 session->work_buffer_list = (struct RTPWorkBufferList *)calloc(1, sizeof(struct RTPWorkBufferList));
115 retu->payload_type = payload_type;
116 657
117 retu->m = m; 658 if (session->work_buffer_list == nullptr) {
118 retu->friend_number = friendnumber; 659 LOGGER_ERROR(m->log, "out of memory while allocating work buffer list");
660 free(session);
661 return nullptr;
662 }
119 663
120 /* Also set payload type as prefix */ 664 // First entry is free.
665 session->work_buffer_list->next_free_entry = 0;
666
667 session->ssrc = payload_type == rtp_TypeVideo ? 0 : random_u32();
668 session->payload_type = payload_type;
669 session->m = m;
670 session->friend_number = friendnumber;
121 671
122 retu->bwc = bwc; 672 // set NULL just in case
123 retu->cs = cs; 673 session->mp = nullptr;
124 retu->mcb = mcb; 674 session->first_packets_counter = 1;
125 675
126 if (-1 == rtp_allow_receiving(retu)) { 676 /* Also set payload type as prefix */
677 session->bwc = bwc;
678 session->cs = cs;
679 session->mcb = mcb;
680
681 if (-1 == rtp_allow_receiving(session)) {
127 LOGGER_WARNING(m->log, "Failed to start rtp receiving mode"); 682 LOGGER_WARNING(m->log, "Failed to start rtp receiving mode");
128 free(retu); 683 free(session->work_buffer_list);
684 free(session);
129 return nullptr; 685 return nullptr;
130 } 686 }
131 687
132 return retu; 688 return session;
133} 689}
690
134void rtp_kill(RTPSession *session) 691void rtp_kill(RTPSession *session)
135{ 692{
136 if (!session) { 693 if (!session) {
@@ -138,10 +695,15 @@ void rtp_kill(RTPSession *session)
138 } 695 }
139 696
140 LOGGER_DEBUG(session->m->log, "Terminated RTP session: %p", session); 697 LOGGER_DEBUG(session->m->log, "Terminated RTP session: %p", session);
141
142 rtp_stop_receiving(session); 698 rtp_stop_receiving(session);
699
700 LOGGER_DEBUG(session->m->log, "Terminated RTP session V3 work_buffer_list->next_free_entry: %d",
701 (int)session->work_buffer_list->next_free_entry);
702
703 free(session->work_buffer_list);
143 free(session); 704 free(session);
144} 705}
706
145int rtp_allow_receiving(RTPSession *session) 707int rtp_allow_receiving(RTPSession *session)
146{ 708{
147 if (session == nullptr) { 709 if (session == nullptr) {
@@ -157,6 +719,7 @@ int rtp_allow_receiving(RTPSession *session)
157 LOGGER_DEBUG(session->m->log, "Started receiving on session: %p", session); 719 LOGGER_DEBUG(session->m->log, "Started receiving on session: %p", session);
158 return 0; 720 return 0;
159} 721}
722
160int rtp_stop_receiving(RTPSession *session) 723int rtp_stop_receiving(RTPSession *session)
161{ 724{
162 if (session == nullptr) { 725 if (session == nullptr) {
@@ -168,42 +731,76 @@ int rtp_stop_receiving(RTPSession *session)
168 LOGGER_DEBUG(session->m->log, "Stopped receiving on session: %p", session); 731 LOGGER_DEBUG(session->m->log, "Stopped receiving on session: %p", session);
169 return 0; 732 return 0;
170} 733}
171int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Logger *log) 734
735/**
736 * @param input is raw vpx data.
737 * @param length is the length of the raw data.
738 */
739int rtp_send_data(RTPSession *session, const uint8_t *data, uint32_t length,
740 bool is_keyframe, Logger *log)
172{ 741{
173 if (!session) { 742 if (!session) {
174 LOGGER_ERROR(log, "No session!"); 743 LOGGER_ERROR(log, "No session!");
175 return -1; 744 return -1;
176 } 745 }
177 746
178 VLA(uint8_t, rdata, length + RTP_HEADER_SIZE + 1); 747 uint8_t is_video_payload = 0;
179 memset(rdata, 0, SIZEOF_VLA(rdata));
180 748
181 rdata[0] = session->payload_type; 749 if (session->payload_type == rtp_TypeVideo) {
750 is_video_payload = 1;
751 }
182 752
183 struct RTPHeader header = {0}; 753 struct RTPHeader header = {0};
184 754
185 header.protocol_version = 2; 755 header.ve = 2; // this is unused in toxav
756
186 header.pe = 0; 757 header.pe = 0;
758
187 header.xe = 0; 759 header.xe = 0;
760
188 header.cc = 0; 761 header.cc = 0;
189 762
190 header.ma = 0; 763 header.ma = 0;
764
191 header.pt = session->payload_type % 128; 765 header.pt = session->payload_type % 128;
192 766
193 header.sequnum = session->sequnum; 767 header.sequnum = session->sequnum;
768
194 header.timestamp = current_time_monotonic(); 769 header.timestamp = current_time_monotonic();
770
195 header.ssrc = session->ssrc; 771 header.ssrc = session->ssrc;
196 772
197 header.offset_lower = 0; 773 header.offset_lower = 0;
774
775 // here the highest bits gets stripped anyway, no need to do keyframe bit magic here!
198 header.data_length_lower = length; 776 header.data_length_lower = length;
199 777
200 if (MAX_CRYPTO_DATA_SIZE > length + RTP_HEADER_SIZE + 1) { 778 header.flags = RTP_LARGE_FRAME;
201 779
780 uint16_t length_safe = (uint16_t)length;
781
782 if (length > UINT16_MAX) {
783 length_safe = UINT16_MAX;
784 }
785
786 header.data_length_lower = length_safe;
787 header.data_length_full = length; // without header
788 header.offset_lower = 0;
789 header.offset_full = 0;
790
791 if (is_keyframe) {
792 header.flags |= RTP_KEY_FRAME;
793 }
794
795 VLA(uint8_t, rdata, length + RTP_HEADER_SIZE + 1);
796 memset(rdata, 0, SIZEOF_VLA(rdata));
797 rdata[0] = session->payload_type; // packet id == payload_type
798
799 if (MAX_CRYPTO_DATA_SIZE > (length + RTP_HEADER_SIZE + 1)) {
202 /** 800 /**
203 * The length is lesser than the maximum allowed length (including header) 801 * The length is lesser than the maximum allowed length (including header)
204 * Send the packet in single piece. 802 * Send the packet in single piece.
205 */ 803 */
206
207 rtp_header_pack(rdata + 1, &header); 804 rtp_header_pack(rdata + 1, &header);
208 memcpy(rdata + 1 + RTP_HEADER_SIZE, data, length); 805 memcpy(rdata + 1 + RTP_HEADER_SIZE, data, length);
209 806
@@ -211,13 +808,11 @@ int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Log
211 LOGGER_WARNING(session->m->log, "RTP send failed (len: %d)! std error: %s", SIZEOF_VLA(rdata), strerror(errno)); 808 LOGGER_WARNING(session->m->log, "RTP send failed (len: %d)! std error: %s", SIZEOF_VLA(rdata), strerror(errno));
212 } 809 }
213 } else { 810 } else {
214
215 /** 811 /**
216 * The length is greater than the maximum allowed length (including header) 812 * The length is greater than the maximum allowed length (including header)
217 * Send the packet in multiple pieces. 813 * Send the packet in multiple pieces.
218 */ 814 */
219 815 uint32_t sent = 0;
220 uint16_t sent = 0;
221 uint16_t piece = MAX_CRYPTO_DATA_SIZE - (RTP_HEADER_SIZE + 1); 816 uint16_t piece = MAX_CRYPTO_DATA_SIZE - (RTP_HEADER_SIZE + 1);
222 817
223 while ((length - sent) + RTP_HEADER_SIZE + 1 > MAX_CRYPTO_DATA_SIZE) { 818 while ((length - sent) + RTP_HEADER_SIZE + 1 > MAX_CRYPTO_DATA_SIZE) {
@@ -232,6 +827,7 @@ int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Log
232 827
233 sent += piece; 828 sent += piece;
234 header.offset_lower = sent; 829 header.offset_lower = sent;
830 header.offset_full = sent; // raw data offset, without any header
235 } 831 }
236 832
237 /* Send remaining */ 833 /* Send remaining */
@@ -252,215 +848,3 @@ int rtp_send_data(RTPSession *session, const uint8_t *data, uint16_t length, Log
252 session->sequnum ++; 848 session->sequnum ++;
253 return 0; 849 return 0;
254} 850}
255
256
257static bool chloss(const RTPSession *session, const struct RTPHeader *header)
258{
259 if (header->timestamp < session->rtimestamp) {
260 uint16_t hosq, lost = 0;
261
262 hosq = header->sequnum;
263
264 lost = (hosq > session->rsequnum) ?
265 (session->rsequnum + 65535) - hosq :
266 session->rsequnum - hosq;
267
268 fprintf(stderr, "Lost packet\n");
269
270 while (lost --) {
271 bwc_add_lost(session->bwc, 0);
272 }
273
274 return true;
275 }
276
277 return false;
278}
279static struct RTPMessage *new_message(size_t allocate_len, const uint8_t *data, uint16_t data_length)
280{
281 assert(allocate_len >= data_length);
282
283 struct RTPMessage *msg = (struct RTPMessage *)calloc(sizeof(struct RTPMessage) +
284 (allocate_len - RTP_HEADER_SIZE), 1);
285
286 if (msg == nullptr) {
287 return nullptr;
288 }
289
290 msg->len = data_length - RTP_HEADER_SIZE;
291 rtp_header_unpack(data, &msg->header);
292 memcpy(msg->data, data + RTP_HEADER_SIZE, allocate_len - RTP_HEADER_SIZE);
293
294 return msg;
295}
296
297int handle_rtp_packet(Messenger *m, uint32_t friendnumber, const uint8_t *data, uint16_t length, void *object)
298{
299 (void) m;
300 (void) friendnumber;
301
302 RTPSession *session = (RTPSession *)object;
303
304 data ++;
305 length--;
306
307 if (!session || length < RTP_HEADER_SIZE) {
308 LOGGER_WARNING(m->log, "No session or invalid length of received buffer!");
309 return -1;
310 }
311
312 struct RTPHeader header;
313
314 rtp_header_unpack(data, &header);
315
316 if (header.pt != session->payload_type % 128) {
317 LOGGER_WARNING(m->log, "Invalid payload type with the session");
318 return -1;
319 }
320
321 if (header.offset_lower >= header.data_length_lower) {
322 /* Never allow this case to happen */
323 return -1;
324 }
325
326 bwc_feed_avg(session->bwc, length);
327
328 if (header.data_length_lower == length - RTP_HEADER_SIZE) {
329 /* The message is sent in single part */
330
331 /* Only allow messages which have arrived in order;
332 * drop late messages
333 */
334 if (chloss(session, &header)) {
335 return 0;
336 }
337
338 /* Message is not late; pick up the latest parameters */
339 session->rsequnum = header.sequnum;
340 session->rtimestamp = header.timestamp;
341
342 bwc_add_recv(session->bwc, length);
343
344 /* Invoke processing of active multiparted message */
345 if (session->mp) {
346 if (session->mcb) {
347 session->mcb(session->cs, session->mp);
348 } else {
349 free(session->mp);
350 }
351
352 session->mp = nullptr;
353 }
354
355 /* The message came in the allowed time;
356 * process it only if handler for the session is present.
357 */
358
359 if (!session->mcb) {
360 return 0;
361 }
362
363 return session->mcb(session->cs, new_message(length, data, length));
364 }
365
366 /* The message is sent in multiple parts */
367
368 if (session->mp) {
369 /* There are 2 possible situations in this case:
370 * 1) being that we got the part of already processing message.
371 * 2) being that we got the part of a new/old message.
372 *
373 * We handle them differently as we only allow a single multiparted
374 * processing message
375 */
376
377 if (session->mp->header.sequnum == header.sequnum &&
378 session->mp->header.timestamp == header.timestamp) {
379 /* First case */
380
381 /* Make sure we have enough allocated memory */
382 if (session->mp->header.data_length_lower - session->mp->len < length - RTP_HEADER_SIZE ||
383 session->mp->header.data_length_lower <= header.offset_lower) {
384 /* There happened to be some corruption on the stream;
385 * continue wihtout this part
386 */
387 return 0;
388 }
389
390 memcpy(session->mp->data + header.offset_lower, data + RTP_HEADER_SIZE,
391 length - RTP_HEADER_SIZE);
392
393 session->mp->len += length - RTP_HEADER_SIZE;
394
395 bwc_add_recv(session->bwc, length);
396
397 if (session->mp->len == session->mp->header.data_length_lower) {
398 /* Received a full message; now push it for the further
399 * processing.
400 */
401 if (session->mcb) {
402 session->mcb(session->cs, session->mp);
403 } else {
404 free(session->mp);
405 }
406
407 session->mp = nullptr;
408 }
409 } else {
410 /* Second case */
411
412 if (session->mp->header.timestamp > header.timestamp) {
413 /* The received message part is from the old message;
414 * discard it.
415 */
416 return 0;
417 }
418
419 /* Measure missing parts of the old message */
420 bwc_add_lost(session->bwc,
421 (session->mp->header.data_length_lower - session->mp->len) +
422
423 /* Must account sizes of rtp headers too */
424 ((session->mp->header.data_length_lower - session->mp->len) /
425 MAX_CRYPTO_DATA_SIZE) * RTP_HEADER_SIZE);
426
427 /* Push the previous message for processing */
428 if (session->mcb) {
429 session->mcb(session->cs, session->mp);
430 } else {
431 free(session->mp);
432 }
433
434 session->mp = nullptr;
435 goto NEW_MULTIPARTED;
436 }
437 } else {
438 /* In this case threat the message as if it was received in order
439 */
440
441 /* This is also a point for new multiparted messages */
442NEW_MULTIPARTED:
443
444 /* Only allow messages which have arrived in order;
445 * drop late messages
446 */
447 if (chloss(session, &header)) {
448 return 0;
449 }
450
451 /* Message is not late; pick up the latest parameters */
452 session->rsequnum = header.sequnum;
453 session->rtimestamp = header.timestamp;
454
455 bwc_add_recv(session->bwc, length);
456
457 /* Again, only store message if handler is present
458 */
459 if (session->mcb) {
460 session->mp = new_message(header.data_length_lower + RTP_HEADER_SIZE, data, length);
461 memmove(session->mp->data + header.offset_lower, session->mp->data, session->mp->len);
462 }
463 }
464
465 return 0;
466}