summaryrefslogtreecommitdiff
path: root/toxcore/Lossless_UDP.c
diff options
context:
space:
mode:
authorjin-eld <jin at mediatomb dot cc>2013-08-04 15:10:37 +0300
committerjin-eld <jin at mediatomb dot cc>2013-08-24 03:25:07 +0300
commite658892793c42b2d058eed0937025ef2ddaaa372 (patch)
tree2a022cab057f2c16ca95860ed980092880052f6e /toxcore/Lossless_UDP.c
parente2aa8161adc85795fe4d63d4642f47e90937ddc2 (diff)
Rename core directory because of autoconf name clash
While doing the checks configure might generate "core" files and will then try to remove them. Having a "core" directory generates an error while runing the configure script. There's no workaround but to rename the core directory.
Diffstat (limited to 'toxcore/Lossless_UDP.c')
-rw-r--r--toxcore/Lossless_UDP.c842
1 files changed, 842 insertions, 0 deletions
diff --git a/toxcore/Lossless_UDP.c b/toxcore/Lossless_UDP.c
new file mode 100644
index 00000000..c30eb903
--- /dev/null
+++ b/toxcore/Lossless_UDP.c
@@ -0,0 +1,842 @@
1/* Lossless_UDP.c
2 *
3 * An implementation of the Lossless_UDP protocol as seen in http://wiki.tox.im/index.php/Lossless_UDP
4 *
5 * Copyright (C) 2013 Tox project All Rights Reserved.
6 *
7 * This file is part of Tox.
8 *
9 * Tox is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
13 *
14 * Tox is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with Tox. If not, see <http://www.gnu.org/licenses/>.
21 *
22 */
23
24/*
25 * TODO: clean this file a bit.
26 * There are a couple of useless variables to get rid of.
27 */
28
29#include "Lossless_UDP.h"
30
31
32/* Functions */
33
34/*
35 * Get connection id from IP_Port
36 * Return -1 if there are no connections like we are looking for
37 * Return id if it found it
38 */
39int getconnection_id(Lossless_UDP *ludp, IP_Port ip_port)
40{
41 uint32_t i;
42
43 for (i = 0; i < ludp->connections_length; ++i) {
44 if (ludp->connections[i].ip_port.ip.i == ip_port.ip.i &&
45 ludp->connections[i].ip_port.port == ip_port.port &&
46 ludp->connections[i].status > 0)
47 return i;
48 }
49
50 return -1;
51}
52
53
54/*
55 * Generate a handshake_id which depends on the ip_port.
56 * This function will always give one unique handshake_id per ip_port.
57 *
58 * TODO: make this better
59 */
60static uint32_t handshake_id(Lossless_UDP *ludp, IP_Port source)
61{
62 uint32_t id = 0, i;
63
64 for (i = 0; i < 6; ++i) {
65 if (ludp->randtable[i][((uint8_t *)&source)[i]] == 0)
66 ludp->randtable[i][((uint8_t *)&source)[i]] = random_int();
67
68 id ^= ludp->randtable[i][((uint8_t *)&source)[i]];
69 }
70
71 if (id == 0) /* id can't be zero */
72 id = 1;
73
74 return id;
75}
76
77/*
78 * Change the hanshake id associated with that ip_port
79 *
80 * TODO: make this better
81 */
82static void change_handshake(Lossless_UDP *ludp, IP_Port source)
83{
84 uint8_t rand = random_int() % 4;
85 ludp->randtable[rand][((uint8_t *)&source)[rand]] = random_int();
86}
87
88/*
89 * Initialize a new connection to ip_port
90 * Returns an integer corresponding to the connection idt
91 * Return -1 if it could not initialize the connectiont
92 * If there already was an existing connection to that ip_port return its number.
93 */
94int new_connection(Lossless_UDP *ludp, IP_Port ip_port)
95{
96 int connect = getconnection_id(ludp, ip_port);
97
98 if (connect != -1)
99 return connect;
100
101 if (ludp->connections_number == ludp->connections_length) {
102 Connection *temp;
103 temp = realloc(ludp->connections, sizeof(Connection) * (ludp->connections_length + 1));
104
105 if (temp == NULL)
106 return -1;
107
108 memset(&temp[ludp->connections_length], 0, sizeof(Connection));
109 ++ludp->connections_length;
110 ludp->connections = temp;
111 }
112
113 uint32_t i;
114
115 for (i = 0; i < ludp->connections_length; ++i) {
116 if (ludp->connections[i].status == 0) {
117 memset(&ludp->connections[i], 0, sizeof(Connection));
118 uint32_t handshake_id1 = handshake_id(ludp, ip_port);
119
120 ludp->connections[i] = (Connection) {
121 .ip_port = ip_port,
122 .status = 1,
123 .inbound = 0,
124 .handshake_id1 = handshake_id1,
125 .sent_packetnum = handshake_id1,
126 .sendbuff_packetnum = handshake_id1,
127 .successful_sent = handshake_id1,
128 .SYNC_rate = SYNC_RATE,
129 .data_rate = DATA_SYNC_RATE,
130 .last_recvSYNC = current_time(),
131 .last_sent = current_time(),
132 .killat = ~0,
133 .send_counter = 0,
134 /* add randomness to timeout to prevent connections getting stuck in a loop. */
135 .timeout = CONNEXION_TIMEOUT + rand() % CONNEXION_TIMEOUT
136 };
137 ++ludp->connections_number;
138
139 return i;
140 }
141 }
142
143 return -1;
144}
145
146/*
147 * Initialize a new inbound connection from ip_port
148 * Returns an integer corresponding to the connection id.
149 * Return -1 if it could not initialize the connection.
150 */
151static int new_inconnection(Lossless_UDP *ludp, IP_Port ip_port)
152{
153 if (getconnection_id(ludp, ip_port) != -1)
154 return -1;
155
156 if (ludp->connections_number == ludp->connections_length) {
157 Connection *temp;
158 temp = realloc(ludp->connections, sizeof(Connection) * (ludp->connections_length + 1));
159
160 if (temp == NULL)
161 return -1;
162
163 memset(&temp[ludp->connections_length], 0, sizeof(Connection));
164 ++ludp->connections_length;
165 ludp->connections = temp;
166 }
167
168 uint32_t i;
169
170 for (i = 0; i < ludp->connections_length; ++i) {
171 if (ludp->connections[i].status == 0) {
172 memset(&ludp->connections[i], 0, sizeof(Connection));
173 uint64_t timeout = CONNEXION_TIMEOUT + rand() % CONNEXION_TIMEOUT;
174
175 ludp->connections[i] = (Connection) {
176 .ip_port = ip_port,
177 .status = 2,
178 .inbound = 2,
179 .SYNC_rate = SYNC_RATE,
180 .data_rate = DATA_SYNC_RATE,
181 .last_recvSYNC = current_time(),
182 .last_sent = current_time(),
183 .send_counter = 127,
184
185 /* add randomness to timeout to prevent connections getting stuck in a loop. */
186 .timeout = timeout,
187
188 /* if this connection isn't handled within the timeout kill it. */
189 .killat = current_time() + 1000000UL * timeout
190 };
191 ++ludp->connections_number;
192 return i;
193 }
194 }
195
196 return -1;
197}
198
199/*
200 * Returns an integer corresponding to the next connection in our incoming connection list.
201 * Return -1 if there are no new incoming connections in the list.
202 */
203int incoming_connection(Lossless_UDP *ludp)
204{
205 uint32_t i;
206
207 for (i = 0; i < ludp->connections_length; ++i) {
208 if (ludp->connections[i].inbound == 2) {
209 ludp->connections[i].inbound = 1;
210 return i;
211 }
212 }
213
214 return -1;
215}
216
217/* Try to free some memory from the connections array. */
218static void free_connections(Lossless_UDP *ludp)
219{
220 uint32_t i;
221
222 for (i = ludp->connections_length; i != 0; --i)
223 if (ludp->connections[i - 1].status != 0)
224 break;
225
226 if (ludp->connections_length == i)
227 return;
228
229 if (i == 0) {
230 free(ludp->connections);
231 ludp->connections = NULL;
232 ludp->connections_length = i;
233 return;
234 }
235
236 Connection *temp;
237 temp = realloc(ludp->connections, sizeof(Connection) * i);
238
239 if (temp == NULL && i != 0)
240 return;
241
242 ludp->connections = temp;
243 ludp->connections_length = i;
244}
245
246/*
247 * Return -1 if it could not kill the connection.
248 * Return 0 if killed successfully
249 */
250int kill_connection(Lossless_UDP *ludp, int connection_id)
251{
252 if (connection_id >= 0 && connection_id < ludp->connections_length) {
253 if (ludp->connections[connection_id].status > 0) {
254 ludp->connections[connection_id].status = 0;
255 change_handshake(ludp, ludp->connections[connection_id].ip_port);
256 --ludp->connections_number;
257 free_connections(ludp);
258 return 0;
259 }
260 }
261
262 return -1;
263}
264
265/*
266 * Kill connection in seconds.
267 * Return -1 if it can not kill the connection.
268 * Return 0 if it will kill it.
269 */
270int kill_connection_in(Lossless_UDP *ludp, int connection_id, uint32_t seconds)
271{
272 if (connection_id >= 0 && connection_id < ludp->connections_length) {
273 if (ludp->connections[connection_id].status > 0) {
274 ludp->connections[connection_id].killat = current_time() + 1000000UL * seconds;
275 return 0;
276 }
277 }
278
279 return -1;
280}
281
282/*
283 * Check if connection is connected:
284 * Return 0 no.
285 * Return 1 if attempting handshake.
286 * Return 2 if handshake is done.
287 * Return 3 if fully connected.
288 * Return 4 if timed out and waiting to be killed.
289 */
290int is_connected(Lossless_UDP *ludp, int connection_id)
291{
292 if (connection_id >= 0 && connection_id < ludp->connections_length)
293 return ludp->connections[connection_id].status;
294
295 return 0;
296}
297
298/* returns the ip_port of the corresponding connection. */
299IP_Port connection_ip(Lossless_UDP *ludp, int connection_id)
300{
301 if (connection_id >= 0 && connection_id < ludp->connections_length)
302 return ludp->connections[connection_id].ip_port;
303
304 IP_Port zero = {{{0}}, 0};
305 return zero;
306}
307
308/* returns the number of packets in the queue waiting to be successfully sent. */
309uint32_t sendqueue(Lossless_UDP *ludp, int connection_id)
310{
311 if (connection_id < 0 || connection_id >= ludp->connections_length)
312 return 0;
313
314 return ludp->connections[connection_id].sendbuff_packetnum - ludp->connections[connection_id].successful_sent;
315}
316
317/* returns the number of packets in the queue waiting to be successfully read with read_packet(...) */
318uint32_t recvqueue(Lossless_UDP *ludp, int connection_id)
319{
320 if (connection_id < 0 || connection_id >= ludp->connections_length)
321 return 0;
322
323 return ludp->connections[connection_id].recv_packetnum - ludp->connections[connection_id].successful_read;
324}
325
326/* returns the id of the next packet in the queue
327 return -1 if no packet in queue */
328char id_packet(Lossless_UDP *ludp, int connection_id)
329{
330 if (connection_id < 0 || connection_id >= ludp->connections_length)
331 return -1;
332
333 if (recvqueue(ludp, connection_id) != 0 && ludp->connections[connection_id].status != 0)
334 return ludp->connections[connection_id].recvbuffer[ludp->connections[connection_id].successful_read %
335 MAX_QUEUE_NUM].data[0];
336
337 return -1;
338}
339
340/* return 0 if there is no received data in the buffer.
341 return length of received packet if successful */
342int read_packet(Lossless_UDP *ludp, int connection_id, uint8_t *data)
343{
344 if (recvqueue(ludp, connection_id) != 0) {
345 uint16_t index = ludp->connections[connection_id].successful_read % MAX_QUEUE_NUM;
346 uint16_t size = ludp->connections[connection_id].recvbuffer[index].size;
347 memcpy(data, ludp->connections[connection_id].recvbuffer[index].data, size);
348 ++ludp->connections[connection_id].successful_read;
349 ludp->connections[connection_id].recvbuffer[index].size = 0;
350 return size;
351 }
352
353 return 0;
354}
355
356/*
357 * Return 0 if data could not be put in packet queue
358 * Return 1 if data was put into the queue
359 */
360int write_packet(Lossless_UDP *ludp, int connection_id, uint8_t *data, uint32_t length)
361{
362 if (length > MAX_DATA_SIZE || length == 0)
363 return 0;
364
365 if (sendqueue(ludp, connection_id) < BUFFER_PACKET_NUM) {
366 uint32_t index = ludp->connections[connection_id].sendbuff_packetnum % MAX_QUEUE_NUM;
367 memcpy(ludp->connections[connection_id].sendbuffer[index].data, data, length);
368 ludp->connections[connection_id].sendbuffer[index].size = length;
369 ludp->connections[connection_id].sendbuff_packetnum++;
370 return 1;
371 }
372
373 return 0;
374}
375
376/* put the packet numbers the we are missing in requested and return the number */
377uint32_t missing_packets(Lossless_UDP *ludp, int connection_id, uint32_t *requested)
378{
379 uint32_t number = 0;
380 uint32_t i;
381 uint32_t temp;
382
383 /* don't request packets if the buffer is full. */
384 if (recvqueue(ludp, connection_id) >= (BUFFER_PACKET_NUM - 1))
385 return 0;
386
387 for (i = ludp->connections[connection_id].recv_packetnum; i != ludp->connections[connection_id].osent_packetnum; i++) {
388 if (ludp->connections[connection_id].recvbuffer[i % MAX_QUEUE_NUM].size == 0) {
389 temp = htonl(i);
390 memcpy(requested + number, &temp, 4);
391 ++number;
392 }
393 }
394
395 if (number == 0)
396 ludp->connections[connection_id].recv_packetnum = ludp->connections[connection_id].osent_packetnum;
397
398 return number;
399}
400
401/*
402 * BEGIN Packet sending functions
403 * One per packet type.
404 * see http://wiki.tox.im/index.php/Lossless_UDP for more information.
405 */
406
407static int send_handshake(Lossless_UDP *ludp, IP_Port ip_port, uint32_t handshake_id1, uint32_t handshake_id2)
408{
409 uint8_t packet[1 + 4 + 4];
410 uint32_t temp;
411
412 packet[0] = NET_PACKET_HANDSHAKE;
413 temp = htonl(handshake_id1);
414 memcpy(packet + 1, &temp, 4);
415 temp = htonl(handshake_id2);
416 memcpy(packet + 5, &temp, 4);
417
418 return sendpacket(ludp->net->sock, ip_port, packet, sizeof(packet));
419}
420
421static int send_SYNC(Lossless_UDP *ludp, uint32_t connection_id)
422{
423 uint8_t packet[(BUFFER_PACKET_NUM * 4 + 4 + 4 + 2)];
424 uint16_t index = 0;
425
426 IP_Port ip_port = ludp->connections[connection_id].ip_port;
427 uint8_t counter = ludp->connections[connection_id].send_counter;
428 uint32_t recv_packetnum = htonl(ludp->connections[connection_id].recv_packetnum);
429 uint32_t sent_packetnum = htonl(ludp->connections[connection_id].sent_packetnum);
430
431 uint32_t requested[BUFFER_PACKET_NUM];
432 uint32_t number = missing_packets(ludp, connection_id, requested);
433
434 packet[0] = NET_PACKET_SYNC;
435 index += 1;
436 memcpy(packet + index, &counter, 1);
437 index += 1;
438 memcpy(packet + index, &recv_packetnum, 4);
439 index += 4;
440 memcpy(packet + index, &sent_packetnum, 4);
441 index += 4;
442 memcpy(packet + index, requested, 4 * number);
443
444 return sendpacket(ludp->net->sock, ip_port, packet, (number * 4 + 4 + 4 + 2));
445
446}
447
448static int send_data_packet(Lossless_UDP *ludp, uint32_t connection_id, uint32_t packet_num)
449{
450 uint32_t index = packet_num % MAX_QUEUE_NUM;
451 uint32_t temp;
452 uint8_t packet[1 + 4 + MAX_DATA_SIZE];
453 packet[0] = NET_PACKET_DATA;
454 temp = htonl(packet_num);
455 memcpy(packet + 1, &temp, 4);
456 memcpy(packet + 5, ludp->connections[connection_id].sendbuffer[index].data,
457 ludp->connections[connection_id].sendbuffer[index].size);
458 return sendpacket(ludp->net->sock, ludp->connections[connection_id].ip_port, packet,
459 1 + 4 + ludp->connections[connection_id].sendbuffer[index].size);
460}
461
462/* sends 1 data packet */
463static int send_DATA(Lossless_UDP *ludp, uint32_t connection_id)
464{
465 int ret;
466 uint32_t buffer[BUFFER_PACKET_NUM];
467
468 if (ludp->connections[connection_id].num_req_paquets > 0) {
469 ret = send_data_packet(ludp, connection_id, ludp->connections[connection_id].req_packets[0]);
470 ludp->connections[connection_id].num_req_paquets--;
471 memcpy(buffer, ludp->connections[connection_id].req_packets + 1, ludp->connections[connection_id].num_req_paquets * 4);
472 memcpy(ludp->connections[connection_id].req_packets, buffer, ludp->connections[connection_id].num_req_paquets * 4);
473 return ret;
474 }
475
476 if (ludp->connections[connection_id].sendbuff_packetnum != ludp->connections[connection_id].sent_packetnum) {
477 ret = send_data_packet(ludp, connection_id, ludp->connections[connection_id].sent_packetnum);
478 ludp->connections[connection_id].sent_packetnum++;
479 return ret;
480 }
481
482 return 0;
483}
484
485/*
486 * END of packet sending functions
487 *
488 *
489 * BEGIN Packet handling functions
490 * One to handle each type of packets we receive
491 */
492
493
494/* Return 0 if handled correctly, 1 if packet is bad. */
495static int handle_handshake(void *object, IP_Port source, uint8_t *packet, uint32_t length)
496{
497 Lossless_UDP *ludp = object;
498
499 if (length != (1 + 4 + 4))
500 return 1;
501
502 uint32_t temp;
503 uint32_t handshake_id1, handshake_id2;
504
505 int connection = getconnection_id(ludp, source);
506 memcpy(&temp, packet + 1, 4);
507 handshake_id1 = ntohl(temp);
508 memcpy(&temp, packet + 5, 4);
509 handshake_id2 = ntohl(temp);
510
511 if (handshake_id2 == 0 && is_connected(ludp, connection) < 3) {
512 send_handshake(ludp, source, handshake_id(ludp, source), handshake_id1);
513 return 0;
514 }
515
516 if (is_connected(ludp, connection) != 1)
517 return 1;
518
519 /* if handshake_id2 is what we sent previously as handshake_id1 */
520 if (handshake_id2 == ludp->connections[connection].handshake_id1) {
521 ludp->connections[connection].status = 2;
522 /* NOTE: is this necessary?
523 ludp->connections[connection].handshake_id2 = handshake_id1; */
524 ludp->connections[connection].orecv_packetnum = handshake_id2;
525 ludp->connections[connection].osent_packetnum = handshake_id1;
526 ludp->connections[connection].recv_packetnum = handshake_id1;
527 ludp->connections[connection].successful_read = handshake_id1;
528 }
529
530 return 0;
531}
532
533/* returns 1 if sync packet is valid 0 if not. */
534static int SYNC_valid(uint32_t length)
535{
536 if (length < 4 + 4 + 2)
537 return 0;
538
539 if (length > (BUFFER_PACKET_NUM * 4 + 4 + 4 + 2) ||
540 ((length - 4 - 4 - 2) % 4) != 0)
541 return 0;
542
543 return 1;
544}
545
546/* case 1 in handle_SYNC: */
547static int handle_SYNC1(Lossless_UDP *ludp, IP_Port source, uint32_t recv_packetnum, uint32_t sent_packetnum)
548{
549 if (handshake_id(ludp, source) == recv_packetnum) {
550 int x = new_inconnection(ludp, source);
551
552 if (x != -1) {
553 ludp->connections[x].orecv_packetnum = recv_packetnum;
554 ludp->connections[x].sent_packetnum = recv_packetnum;
555 ludp->connections[x].sendbuff_packetnum = recv_packetnum;
556 ludp->connections[x].successful_sent = recv_packetnum;
557 ludp->connections[x].osent_packetnum = sent_packetnum;
558 ludp->connections[x].recv_packetnum = sent_packetnum;
559 ludp->connections[x].successful_read = sent_packetnum;
560
561 return x;
562 }
563 }
564
565 return -1;
566}
567
568/* case 2 in handle_SYNC: */
569static int handle_SYNC2(Lossless_UDP *ludp, int connection_id, uint8_t counter, uint32_t recv_packetnum,
570 uint32_t sent_packetnum)
571{
572 if (recv_packetnum == ludp->connections[connection_id].orecv_packetnum) {
573 /* && sent_packetnum == ludp->connections[connection_id].osent_packetnum) */
574 ludp->connections[connection_id].status = 3;
575 ludp->connections[connection_id].recv_counter = counter;
576 ++ludp->connections[connection_id].send_counter;
577 send_SYNC(ludp, connection_id);
578 return 0;
579 }
580
581 return 1;
582}
583/* case 3 in handle_SYNC: */
584static int handle_SYNC3(Lossless_UDP *ludp, int connection_id, uint8_t counter, uint32_t recv_packetnum,
585 uint32_t sent_packetnum,
586 uint32_t *req_packets,
587 uint16_t number)
588{
589 uint8_t comp_counter = (counter - ludp->connections[connection_id].recv_counter );
590 uint32_t i, temp;
591 /* uint32_t comp_1 = (recv_packetnum - ludp->connections[connection_id].successful_sent);
592 uint32_t comp_2 = (sent_packetnum - ludp->connections[connection_id].successful_read); */
593 uint32_t comp_1 = (recv_packetnum - ludp->connections[connection_id].orecv_packetnum);
594 uint32_t comp_2 = (sent_packetnum - ludp->connections[connection_id].osent_packetnum);
595
596 /* packet valid */
597 if (comp_1 <= BUFFER_PACKET_NUM &&
598 comp_2 <= BUFFER_PACKET_NUM &&
599 comp_counter < 10 && comp_counter != 0) {
600
601 ludp->connections[connection_id].orecv_packetnum = recv_packetnum;
602 ludp->connections[connection_id].osent_packetnum = sent_packetnum;
603 ludp->connections[connection_id].successful_sent = recv_packetnum;
604 ludp->connections[connection_id].last_recvSYNC = current_time();
605 ludp->connections[connection_id].recv_counter = counter;
606
607 ++ludp->connections[connection_id].send_counter;
608
609 for (i = 0; i < number; ++i) {
610 temp = ntohl(req_packets[i]);
611 memcpy(ludp->connections[connection_id].req_packets + i, &temp, 4 * number);
612 }
613
614 ludp->connections[connection_id].num_req_paquets = number;
615 return 0;
616 }
617
618 return 1;
619}
620
621static int handle_SYNC(void *object, IP_Port source, uint8_t *packet, uint32_t length)
622{
623 Lossless_UDP *ludp = object;
624
625 if (!SYNC_valid(length))
626 return 1;
627
628 int connection = getconnection_id(ludp, source);
629 uint8_t counter;
630 uint32_t temp;
631 uint32_t recv_packetnum, sent_packetnum;
632 uint32_t req_packets[BUFFER_PACKET_NUM];
633 uint16_t number = (length - 4 - 4 - 2) / 4;
634
635 memcpy(&counter, packet + 1, 1);
636 memcpy(&temp, packet + 2, 4);
637 recv_packetnum = ntohl(temp);
638 memcpy(&temp, packet + 6, 4);
639 sent_packetnum = ntohl(temp);
640
641 if (number != 0)
642 memcpy(req_packets, packet + 10, 4 * number);
643
644 if (connection == -1)
645 return handle_SYNC1(ludp, source, recv_packetnum, sent_packetnum);
646
647 if (ludp->connections[connection].status == 2)
648 return handle_SYNC2(ludp, connection, counter,
649 recv_packetnum, sent_packetnum);
650
651 if (ludp->connections[connection].status == 3)
652 return handle_SYNC3(ludp, connection, counter, recv_packetnum,
653 sent_packetnum, req_packets, number);
654
655 return 0;
656}
657
658/*
659 * Add a packet to the received buffer and set the recv_packetnum of the
660 * connection to its proper value. Return 1 if data was too big, 0 if not.
661 */
662static int add_recv(Lossless_UDP *ludp, int connection_id, uint32_t data_num, uint8_t *data, uint16_t size)
663{
664 if (size > MAX_DATA_SIZE)
665 return 1;
666
667 uint32_t i;
668 uint32_t maxnum = ludp->connections[connection_id].successful_read + BUFFER_PACKET_NUM;
669 uint32_t sent_packet = data_num - ludp->connections[connection_id].osent_packetnum;
670
671 for (i = ludp->connections[connection_id].recv_packetnum; i != maxnum; ++i) {
672 if (i == data_num) {
673 memcpy(ludp->connections[connection_id].recvbuffer[i % MAX_QUEUE_NUM].data, data, size);
674
675 ludp->connections[connection_id].recvbuffer[i % MAX_QUEUE_NUM].size = size;
676 ludp->connections[connection_id].last_recvdata = current_time();
677
678 if (sent_packet < BUFFER_PACKET_NUM) {
679 ludp->connections[connection_id].osent_packetnum = data_num;
680 }
681
682 break;
683 }
684 }
685
686 for (i = ludp->connections[connection_id].recv_packetnum; i != maxnum; ++i) {
687 if (ludp->connections[connection_id].recvbuffer[i % MAX_QUEUE_NUM].size != 0)
688 ludp->connections[connection_id].recv_packetnum = i;
689 else
690 break;
691 }
692
693 return 0;
694}
695
696static int handle_data(void *object, IP_Port source, uint8_t *packet, uint32_t length)
697{
698 Lossless_UDP *ludp = object;
699 int connection = getconnection_id(ludp, source);
700
701 if (connection == -1)
702 return 1;
703
704 /* Drop the data packet if connection is not connected. */
705 if (ludp->connections[connection].status != 3)
706 return 1;
707
708 if (length > 1 + 4 + MAX_DATA_SIZE || length < 1 + 4 + 1)
709 return 1;
710
711 uint32_t temp;
712 uint32_t number;
713 uint16_t size = length - 1 - 4;
714
715 memcpy(&temp, packet + 1, 4);
716 number = ntohl(temp);
717
718 return add_recv(ludp, connection, number, packet + 5, size);
719}
720
721/*
722 * END of packet handling functions
723 */
724
725Lossless_UDP *new_lossless_udp(Networking_Core *net)
726{
727 if (net == NULL)
728 return NULL;
729
730 Lossless_UDP *temp = calloc(1, sizeof(Lossless_UDP));
731
732 if (temp == NULL)
733 return NULL;
734
735 temp->net = net;
736 networking_registerhandler(net, NET_PACKET_HANDSHAKE, &handle_handshake, temp);
737 networking_registerhandler(net, NET_PACKET_SYNC, &handle_SYNC, temp);
738 networking_registerhandler(net, NET_PACKET_DATA, &handle_data, temp);
739 return temp;
740}
741
742/*
743 * Send handshake requests
744 * handshake packets are sent at the same rate as SYNC packets
745 */
746static void do_new(Lossless_UDP *ludp)
747{
748 uint32_t i;
749 uint64_t temp_time = current_time();
750
751 for (i = 0; i < ludp->connections_length; ++i) {
752 if (ludp->connections[i].status == 1)
753 if ((ludp->connections[i].last_sent + (1000000UL / ludp->connections[i].SYNC_rate)) <= temp_time) {
754 send_handshake(ludp, ludp->connections[i].ip_port, ludp->connections[i].handshake_id1, 0);
755 ludp->connections[i].last_sent = temp_time;
756 }
757
758 /* kill all timed out connections */
759 if (ludp->connections[i].status > 0 &&
760 (ludp->connections[i].last_recvSYNC + ludp->connections[i].timeout * 1000000UL) < temp_time &&
761 ludp->connections[i].status != 4) {
762 ludp->connections[i].status = 4;
763 /* kill_connection(i); */
764 }
765
766 if (ludp->connections[i].status > 0 && ludp->connections[i].killat < temp_time)
767 kill_connection(ludp, i);
768 }
769}
770
771static void do_SYNC(Lossless_UDP *ludp)
772{
773 uint32_t i;
774 uint64_t temp_time = current_time();
775
776 for (i = 0; i < ludp->connections_length; ++i) {
777 if (ludp->connections[i].status == 2 || ludp->connections[i].status == 3)
778 if ((ludp->connections[i].last_SYNC + (1000000UL / ludp->connections[i].SYNC_rate)) <= temp_time) {
779 send_SYNC(ludp, i);
780 ludp->connections[i].last_SYNC = temp_time;
781 }
782 }
783}
784
785static void do_data(Lossless_UDP *ludp)
786{
787 uint32_t i;
788 uint64_t j;
789 uint64_t temp_time = current_time();
790
791 for (i = 0; i < ludp->connections_length; ++i)
792 if (ludp->connections[i].status == 3 && sendqueue(ludp, i) != 0)
793 if ((ludp->connections[i].last_sent + (1000000UL / ludp->connections[i].data_rate)) <= temp_time) {
794 for (j = ludp->connections[i].last_sent; j < temp_time; j += (1000000UL / ludp->connections[i].data_rate))
795 send_DATA(ludp, i);
796
797 ludp->connections[i].last_sent = temp_time;
798 }
799}
800
801#define MAX_SYNC_RATE 10
802
803/*
804 * Automatically adjusts send rates of packets for optimal transmission.
805 *
806 * TODO: flow control.
807 */
808static void adjust_rates(Lossless_UDP *ludp)
809{
810 uint32_t i;
811 uint64_t temp_time = current_time();
812
813 for (i = 0; i < ludp->connections_length; ++i) {
814 if (ludp->connections[i].status == 1 || ludp->connections[i].status == 2)
815 ludp->connections[i].SYNC_rate = MAX_SYNC_RATE;
816
817 if (ludp->connections[i].status == 3) {
818 if (sendqueue(ludp, i) != 0) {
819 ludp->connections[i].data_rate = (BUFFER_PACKET_NUM - ludp->connections[i].num_req_paquets) * MAX_SYNC_RATE;
820 ludp->connections[i].SYNC_rate = MAX_SYNC_RATE;
821 } else if (ludp->connections[i].last_recvdata + 1000000UL > temp_time)
822 ludp->connections[i].SYNC_rate = MAX_SYNC_RATE;
823 else
824 ludp->connections[i].SYNC_rate = SYNC_RATE;
825 }
826 }
827}
828
829/* Call this function a couple times per second It's the main loop. */
830void do_lossless_udp(Lossless_UDP *ludp)
831{
832 do_new(ludp);
833 do_SYNC(ludp);
834 do_data(ludp);
835 adjust_rates(ludp);
836}
837
838void kill_lossless_udp(Lossless_UDP *ludp)
839{
840 free(ludp->connections);
841 free(ludp);
842}