From cc8a536cb07484396354c4880f3631666a1cf874 Mon Sep 17 00:00:00 2001 From: irungentoo Date: Thu, 5 Sep 2013 17:00:41 -0400 Subject: Base of group chats seems to be working now. --- testing/experiment/group_chats.c | 99 +++++++++++++++++++++++++++++------ testing/experiment/group_chats.h | 8 ++- testing/experiment/group_chats_test.c | 83 ++++++++++++++++++++++++++++- 3 files changed, 169 insertions(+), 21 deletions(-) diff --git a/testing/experiment/group_chats.c b/testing/experiment/group_chats.c index 665707b2..faf274d5 100644 --- a/testing/experiment/group_chats.c +++ b/testing/experiment/group_chats.c @@ -111,8 +111,10 @@ static int peer_okping(Group_Chat *chat, uint8_t *client_id) uint64_t temp_time = unix_time(); for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) { - if (chat->close[i].last_recv < temp_time + BAD_NODE_TIMEOUT) + if (chat->close[i].last_recv + BAD_NODE_TIMEOUT < temp_time) { + ++j; continue; + } /* Equal */ if (memcmp(chat->close[i].client_id, client_id, crypto_box_PUBLICKEYBYTES) == 0) @@ -147,7 +149,7 @@ static int add_closepeer(Group_Chat *chat, uint8_t *client_id, IP_Port ip_port) } for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) { /* Try replacing bad nodes first */ - if (chat->close[i].last_recv < temp_time + BAD_NODE_TIMEOUT) { + if (chat->close[i].last_recv + BAD_NODE_TIMEOUT < temp_time) { memcpy(chat->close[i].client_id, client_id, crypto_box_PUBLICKEYBYTES); chat->close[i].ip_port = ip_port; chat->close[i].last_recv = temp_time; @@ -224,6 +226,7 @@ static int addpeer(Group_Chat *chat, uint8_t *client_id) Group_Peer *temp; temp = realloc(chat->group, sizeof(Group_Peer) * (chat->numpeers + 1)); + memset(&(temp[chat->numpeers]), 0, sizeof(Group_Peer)); if (temp == NULL) return -1; @@ -301,14 +304,11 @@ static int send_sendnodes(Group_Chat *chat, IP_Port ip_port, int peernum, uint64 for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) { if (chat->close[i].last_recv + BAD_NODE_TIMEOUT > temp_time) { memcpy(contents.nodes[j].client_id, chat->close[i].client_id, crypto_box_PUBLICKEYBYTES); - contents.nodes[j].ip_port = ip_port; + contents.nodes[j].ip_port = chat->close[i].ip_port; ++j; } } - if (j == 0) - return -1; - return send_groupchatpacket(chat, ip_port, chat->group[peernum].client_id, (uint8_t *)&contents, sizeof(contents.pingid) + sizeof(groupchat_nodes) * j, 49); } @@ -324,6 +324,10 @@ static int handle_getnodes(Group_Chat *chat, IP_Port source, int peernum, uint8_ getnodes_data contents; memcpy(&contents, data, sizeof(contents)); send_sendnodes(chat, source, peernum, contents.pingid); + + if (peer_okping(chat, chat->group[peernum].client_id) > 0) + send_getnodes(chat, source, peernum); + return 0; } @@ -332,7 +336,7 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 if (peernum < 0 || peernum >= chat->numpeers) return 1; - if (len > sizeof(sendnodes_data) || len < (sizeof(uint64_t) + sizeof(groupchat_nodes))) + if (len > sizeof(sendnodes_data) || len < sizeof(uint64_t)) return 1; if ((len - sizeof(uint64_t)) % sizeof(groupchat_nodes) != 0) @@ -354,6 +358,10 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 if (peer_okping(chat, contents.nodes[i].client_id) > 0) { int peern = peer_in_chat(chat, contents.nodes[i].client_id); + if (peern == -1) { /*NOTE: This is just for testing and will be removed later.*/ + peern = addpeer(chat, contents.nodes[i].client_id); + } + if (peern == -1) continue; @@ -364,17 +372,37 @@ static int handle_sendnodes(Group_Chat *chat, IP_Port source, int peernum, uint8 add_closepeer(chat, chat->group[peernum].client_id, source); return 0; } - +#define GROUP_DATA_MIN_SIZE (crypto_box_PUBLICKEYBYTES + sizeof(uint32_t) + 1) static int handle_data(Group_Chat *chat, uint8_t *data, uint32_t len) { - if (len < 2) + if (len < GROUP_DATA_MIN_SIZE) return 1; +//TODO: + int peernum = peer_in_chat(chat, data); + + if (peernum == -1) { /*NOTE: This is just for testing and will be removed later.*/ + peernum = addpeer(chat, data); + } + + if (peernum == -1) + return 1; + + uint32_t message_num; + memcpy(&message_num, data + crypto_box_PUBLICKEYBYTES, sizeof(uint32_t)); + message_num = ntohl(message_num); + + if (message_num - chat->group[peernum].last_message_number > 64 || + message_num == chat->group[peernum].last_message_number) + return 1; + + chat->group[peernum].last_message_number = message_num; + int handled = 0; - if (data[0] == 64 && chat->group_message != NULL) { -//TODO - (*chat->group_message)(chat, 0, data + 1, len - 1, chat->group_message_userdata); + if (data[crypto_box_PUBLICKEYBYTES + sizeof(message_num)] == 64 + && chat->group_message != NULL) { /* If message is chat message */ + (*chat->group_message)(chat, peernum, data + GROUP_DATA_MIN_SIZE, len - 1, chat->group_message_userdata); handled = 1; } @@ -386,6 +414,19 @@ static int handle_data(Group_Chat *chat, uint8_t *data, uint32_t len) return 1; } +static uint8_t send_data(Group_Chat *chat, uint8_t *data, uint32_t len, uint8_t message_id) +{ + if (len + GROUP_DATA_MIN_SIZE > MAX_DATA_SIZE) /*NOTE: not the real maximum len.*/ + return 1; + + uint8_t packet[MAX_DATA_SIZE]; + uint32_t message_num = htonl(chat->message_number); +//TODO + memcpy(packet, chat->self_public_key, crypto_box_PUBLICKEYBYTES); + memcpy(packet + crypto_box_PUBLICKEYBYTES, &message_num, sizeof(message_num)); + packet[crypto_box_PUBLICKEYBYTES + sizeof(message_num)] = message_id; + return sendto_allpeers(chat, packet, len + GROUP_DATA_MIN_SIZE, 50); +} /* * Handle get nodes group packet. * @@ -417,7 +458,6 @@ int handle_groupchatpacket(Group_Chat *chat, IP_Port source, uint8_t *packet, ui if (peernum == -1) return 1; - switch (number) { case 48: return handle_getnodes(chat, source, peernum, data, len); @@ -435,9 +475,9 @@ int handle_groupchatpacket(Group_Chat *chat, IP_Port source, uint8_t *packet, ui return 1; } -uint32_t m_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length) +uint32_t group_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length) { - + return send_data(chat, message, length, 64); //TODO: better return values? } void callback_groupmessage(Group_Chat *chat, void (*function)(Group_Chat *chat, int, uint8_t *, uint16_t, void *), @@ -454,9 +494,34 @@ Group_Chat *new_groupchat(Networking_Core *net) Group_Chat *chat = calloc(1, sizeof(Group_Chat)); chat->net = net; + crypto_box_keypair(chat->self_public_key, chat->self_secret_key); return chat; } +#define NODE_PING_INTERVAL 10 + +static void ping_close(Group_Chat *chat) +{ + uint32_t i; + uint64_t temp_time = unix_time(); + + for (i = 0; i < GROUP_CLOSE_CONNECTIONS; ++i) { + if (chat->close[i].last_recv < temp_time + BAD_NODE_TIMEOUT) { + int peernum = peer_in_chat(chat, chat->close[i].client_id); + + if (peernum == -1) + continue; + + if (chat->group[peernum].last_pinged + NODE_PING_INTERVAL < temp_time) + send_getnodes(chat, chat->close[i].ip_port, peernum); + } + } +} + +void do_groupchat(Group_Chat *chat) +{ + ping_close(chat); +} void kill_groupchat(Group_Chat *chat) { @@ -464,7 +529,7 @@ void kill_groupchat(Group_Chat *chat) free(chat); } -void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, int peernum) +void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, uint8_t *client_id) { - send_getnodes(chat, ip_port, peernum); + send_getnodes(chat, ip_port, addpeer(chat, client_id)); } diff --git a/testing/experiment/group_chats.h b/testing/experiment/group_chats.h index 527610d8..42ea3e08 100644 --- a/testing/experiment/group_chats.h +++ b/testing/experiment/group_chats.h @@ -76,7 +76,7 @@ void callback_groupmessage(Group_Chat *chat, void (*function)(Group_Chat *chat, * Send a message to the group. * */ -uint32_t m_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length); +uint32_t group_sendmessage(Group_Chat *chat, uint8_t *message, uint32_t length); /* Create a new group chat. * @@ -93,6 +93,10 @@ Group_Chat *new_groupchat(Networking_Core *net); */ void kill_groupchat(Group_Chat *chat); +/* + * This is the main loop. + */ +void do_groupchat(Group_Chat *chat); /* if we receive a group chat packet we call this function so it can be handled. return 0 if packet is handled correctly. @@ -100,7 +104,7 @@ void kill_groupchat(Group_Chat *chat); int handle_groupchatpacket(Group_Chat *chat, IP_Port source, uint8_t *packet, uint32_t length); -void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, int peernum); +void chat_bootstrap(Group_Chat *chat, IP_Port ip_port, uint8_t *client_id); #ifdef __cplusplus } diff --git a/testing/experiment/group_chats_test.c b/testing/experiment/group_chats_test.c index f08e4e15..8ef5b10e 100644 --- a/testing/experiment/group_chats_test.c +++ b/testing/experiment/group_chats_test.c @@ -1,18 +1,97 @@ #include "group_chats.h" #define NUM_CHATS 8 +#ifdef WIN32 +#define c_sleep(x) Sleep(1*x) +#else +#define c_sleep(x) usleep(1000*x) +#endif +Group_Chat *chats[NUM_CHATS]; + +void print_close(Group_Close *close) +{ + uint32_t i, j; + IP_Port p_ip; + printf("___________________CLOSE________________________________\n"); + + for (i = 0; i < GROUP_CLOSE_CONNECTIONS; i++) { + printf("ClientID: "); + + for (j = 0; j < CLIENT_ID_SIZE; j++) { + printf("%02hhX", close[i].client_id[j]); + } + + p_ip = close[i].ip_port; + printf("\nIP: %u.%u.%u.%u Port: %u", p_ip.ip.uint8[0], p_ip.ip.uint8[1], p_ip.ip.uint8[2], p_ip.ip.uint8[3], + ntohs(p_ip.port)); + printf("\nTimestamp: %llu", (long long unsigned int) close[i].last_recv); + printf("\n"); + } +} + +void print_group(Group_Chat *chat) +{ + uint32_t i, j; + printf("-----------------\nClientID: "); + + for (j = 0; j < CLIENT_ID_SIZE; j++) { + printf("%02hhX", chat->self_public_key[j]); + } + + printf("\n___________________GROUP________________________________\n"); + + for (i = 0; i < chat->numpeers; i++) { + printf("ClientID: "); + + for (j = 0; j < CLIENT_ID_SIZE; j++) { + printf("%02hhX", chat->group[i].client_id[j]); + } + + printf("\nTimestamp: %llu", (long long unsigned int) chat->group[i].last_recv); + printf("\nlast_pinged: %llu", (long long unsigned int) chat->group[i].last_pinged); + printf("\npingid: %llu", (long long unsigned int) chat->group[i].pingid); + printf("\n"); + } +} + int main() { IP ip; ip.uint32 = 0; uint32_t i; - Group_Chat *chats[NUM_CHATS]; + for (i = 0; i < NUM_CHATS; ++i) { - chats[i] = new_groupchat(new_networking(ip, 1234)); + chats[i] = new_groupchat(new_networking(ip, 12745)); if (chats[i] == 0) exit(1); + + networking_registerhandler(chats[i]->net, 48, &handle_groupchatpacket, chats[i]); + } + + printf("ok\n"); + IP_Port ip_port; + ip_port.ip.uint32 = 0; + ip_port.ip.uint8[0] = 127; + ip_port.ip.uint8[3] = 1; + ip_port.port = htons(12745); + + for (i = 0; i < NUM_CHATS; ++i) { + chat_bootstrap(chats[i], ip_port, chats[0]->self_public_key); + printf("%u\n", i); + } + + while (1) { + for (i = 0; i < NUM_CHATS; ++i) { + networking_poll(chats[i]->net); + do_groupchat(chats[i]); + printf("%u\n", chats[i]->numpeers); + print_close(chats[i]->close); + print_group(chats[i]); + } + + c_sleep(100); } return 0; -- cgit v1.2.3