summaryrefslogtreecommitdiff
path: root/nacl/curvecp/curvecpmessage.c
diff options
context:
space:
mode:
Diffstat (limited to 'nacl/curvecp/curvecpmessage.c')
-rw-r--r--nacl/curvecp/curvecpmessage.c654
1 files changed, 654 insertions, 0 deletions
diff --git a/nacl/curvecp/curvecpmessage.c b/nacl/curvecp/curvecpmessage.c
new file mode 100644
index 00000000..df1e1664
--- /dev/null
+++ b/nacl/curvecp/curvecpmessage.c
@@ -0,0 +1,654 @@
1#include <sys/types.h>
2#include <sys/wait.h>
3#include <unistd.h>
4#include <signal.h>
5#include <poll.h>
6#include "open.h"
7#include "blocking.h"
8#include "e.h"
9#include "die.h"
10#include "randommod.h"
11#include "byte.h"
12#include "crypto_uint32.h"
13#include "uint16_pack.h"
14#include "uint32_pack.h"
15#include "uint64_pack.h"
16#include "uint16_unpack.h"
17#include "uint32_unpack.h"
18#include "uint64_unpack.h"
19#include "nanoseconds.h"
20#include "writeall.h"
21
22int flagverbose = 1;
23int flagserver = 1;
24int wantping = 0; /* 1: ping after a second; 2: ping immediately */
25
26#define USAGE "\
27curvecpmessage: how to use:\n\
28curvecpmessage: -q (optional): no error messages\n\
29curvecpmessage: -Q (optional): print error messages (default)\n\
30curvecpmessage: -v (optional): print extra information\n\
31curvecpmessage: -c (optional): program is a client; server starts first\n\
32curvecpmessage: -C (optional): program is a client that starts first\n\
33curvecpmessage: -s (optional): program is a server (default)\n\
34curvecpmessage: prog: run this program\n\
35"
36
37void die_usage(const char *s)
38{
39 if (s) die_4(100,USAGE,"curvecpmessage: fatal: ",s,"\n");
40 die_1(100,USAGE);
41}
42
43void die_fatal(const char *trouble,const char *d,const char *fn)
44{
45 if (!flagverbose) die_0(111);
46 if (d) {
47 if (fn) die_9(111,"curvecpmessage: fatal: ",trouble," ",d,"/",fn,": ",e_str(errno),"\n");
48 die_7(111,"curvecpmessage: fatal: ",trouble," ",d,": ",e_str(errno),"\n");
49 }
50 if (errno) die_5(111,"curvecpmessage: fatal: ",trouble,": ",e_str(errno),"\n");
51 die_3(111,"curvecpmessage: fatal: ",trouble,"\n");
52}
53
54void die_badmessage(void)
55{
56 errno = EPROTO;
57 die_fatal("unable to read from file descriptor 8",0,0);
58}
59
60void die_internalerror(void)
61{
62 errno = EPROTO;
63 die_fatal("internal error",0,0);
64}
65
66
67int tochild[2] = {-1,-1};
68int fromchild[2] = {-1,-1};
69pid_t child = -1;
70int childstatus;
71
72struct pollfd p[3];
73
74long long sendacked = 0; /* number of initial bytes sent and fully acknowledged */
75long long sendbytes = 0; /* number of additional bytes to send */
76unsigned char sendbuf[131072]; /* circular queue with the additional bytes; size must be power of 2 */
77long long sendprocessed = 0; /* within sendbytes, number of bytes absorbed into blocks */
78
79crypto_uint16 sendeof = 0; /* 2048 for normal eof after sendbytes, 4096 for error after sendbytes */
80int sendeofprocessed = 0;
81int sendeofacked = 0;
82
83long long totalblocktransmissions = 0;
84long long totalblocks = 0;
85
86#define OUTGOING 128 /* must be power of 2 */
87long long blocknum = 0; /* number of outgoing blocks being tracked */
88long long blockfirst = 0; /* circular queue */
89long long blockpos[OUTGOING]; /* position of block's first byte within stream */
90long long blocklen[OUTGOING]; /* number of bytes in this block */
91crypto_uint16 blockeof[OUTGOING]; /* 0, 2048, 4096 */
92long long blocktransmissions[OUTGOING];
93long long blocktime[OUTGOING]; /* time of last message sending this block; 0 means acked */
94long long earliestblocktime = 0; /* if nonzero, minimum of active blocktime values */
95crypto_uint32 blockid[OUTGOING]; /* ID of last message sending this block */
96
97#define INCOMING 64 /* must be power of 2 */
98long long messagenum = 0; /* number of messages in incoming queue */
99long long messagefirst = 0; /* position of first message; circular queue */
100unsigned char messagelen[INCOMING]; /* times 16 */
101unsigned char message[INCOMING][1088];
102unsigned char messagetodo[2048];
103long long messagetodolen = 0;
104
105long long receivebytes = 0; /* number of initial bytes fully received */
106long long receivewritten = 0; /* within receivebytes, number of bytes given to child */
107crypto_uint16 receiveeof = 0; /* 0, 2048, 4096 */
108long long receivetotalbytes = 0; /* total number of bytes in stream, if receiveeof */
109unsigned char receivebuf[131072]; /* circular queue beyond receivewritten; size must be power of 2 */
110unsigned char receivevalid[131072]; /* 1 for byte successfully received; XXX: use buddy structure to speed this up */
111
112long long maxblocklen = 512;
113crypto_uint32 nextmessageid = 1;
114
115unsigned char buf[4096];
116
117long long lastblocktime = 0;
118long long nsecperblock = 1000000000;
119long long lastspeedadjustment = 0;
120long long lastedge = 0;
121long long lastdoubling = 0;
122
123long long rtt;
124long long rtt_delta;
125long long rtt_average = 0;
126long long rtt_deviation = 0;
127long long rtt_lowwater = 0;
128long long rtt_highwater = 0;
129long long rtt_timeout = 1000000000;
130long long rtt_seenrecenthigh = 0;
131long long rtt_seenrecentlow = 0;
132long long rtt_seenolderhigh = 0;
133long long rtt_seenolderlow = 0;
134long long rtt_phase = 0;
135
136long long lastpanic = 0;
137
138void earliestblocktime_compute(void) /* XXX: use priority queue */
139{
140 long long i;
141 long long pos;
142 earliestblocktime = 0;
143 for (i = 0;i < blocknum;++i) {
144 pos = (blockfirst + i) & (OUTGOING - 1);
145 if (blocktime[pos]) {
146 if (!earliestblocktime)
147 earliestblocktime = blocktime[pos];
148 else
149 if (blocktime[pos] < earliestblocktime)
150 earliestblocktime = blocktime[pos];
151 }
152 }
153}
154
155void acknowledged(unsigned long long start,unsigned long long stop)
156{
157 long long i;
158 long long pos;
159 if (stop == start) return;
160 for (i = 0;i < blocknum;++i) {
161 pos = (blockfirst + i) & (OUTGOING - 1);
162 if (blockpos[pos] >= start && blockpos[pos] + blocklen[pos] <= stop) {
163 blocktime[pos] = 0;
164 totalblocktransmissions += blocktransmissions[pos];
165 totalblocks += 1;
166 }
167 }
168 while (blocknum) {
169 pos = blockfirst & (OUTGOING - 1);
170 if (blocktime[pos]) break;
171 sendacked += blocklen[pos];
172 sendbytes -= blocklen[pos];
173 sendprocessed -= blocklen[pos];
174 ++blockfirst;
175 --blocknum;
176 }
177 if (sendeof)
178 if (start == 0)
179 if (stop > sendacked + sendbytes)
180 if (!sendeofacked) {
181 sendeofacked = 1;
182 }
183 earliestblocktime_compute();
184}
185
186int main(int argc,char **argv)
187{
188 long long pos;
189 long long len;
190 long long u;
191 long long r;
192 long long i;
193 long long k;
194 long long recent;
195 long long nextaction;
196 long long timeout;
197 struct pollfd *q;
198 struct pollfd *watch8;
199 struct pollfd *watchtochild;
200 struct pollfd *watchfromchild;
201
202 signal(SIGPIPE,SIG_IGN);
203
204 if (!argv[0]) die_usage(0);
205 for (;;) {
206 char *x;
207 if (!argv[1]) break;
208 if (argv[1][0] != '-') break;
209 x = *++argv;
210 if (x[0] == '-' && x[1] == 0) break;
211 if (x[0] == '-' && x[1] == '-' && x[2] == 0) break;
212 while (*++x) {
213 if (*x == 'q') { flagverbose = 0; continue; }
214 if (*x == 'Q') { flagverbose = 1; continue; }
215 if (*x == 'v') { if (flagverbose == 2) flagverbose = 3; else flagverbose = 2; continue; }
216 if (*x == 'c') { flagserver = 0; wantping = 2; continue; }
217 if (*x == 'C') { flagserver = 0; wantping = 1; continue; }
218 if (*x == 's') { flagserver = 1; wantping = 0; continue; }
219 die_usage(0);
220 }
221 }
222 if (!*++argv) die_usage("missing prog");
223
224 for (;;) {
225 r = open_read("/dev/null");
226 if (r == -1) die_fatal("unable to open /dev/null",0,0);
227 if (r > 9) { close(r); break; }
228 }
229
230 if (open_pipe(tochild) == -1) die_fatal("unable to create pipe",0,0);
231 if (open_pipe(fromchild) == -1) die_fatal("unable to create pipe",0,0);
232
233 blocking_enable(tochild[0]);
234 blocking_enable(fromchild[1]);
235
236 child = fork();
237 if (child == -1) die_fatal("unable to fork",0,0);
238 if (child == 0) {
239 close(8);
240 close(9);
241 if (flagserver) {
242 close(0);
243 if (dup(tochild[0]) != 0) die_fatal("unable to dup",0,0);
244 close(1);
245 if (dup(fromchild[1]) != 1) die_fatal("unable to dup",0,0);
246 } else {
247 close(6);
248 if (dup(tochild[0]) != 6) die_fatal("unable to dup",0,0);
249 close(7);
250 if (dup(fromchild[1]) != 7) die_fatal("unable to dup",0,0);
251 }
252 signal(SIGPIPE,SIG_DFL);
253 execvp(*argv,argv);
254 die_fatal("unable to run",*argv,0);
255 }
256
257 close(tochild[0]);
258 close(fromchild[1]);
259
260 recent = nanoseconds();
261 lastspeedadjustment = recent;
262 if (flagserver) maxblocklen = 1024;
263
264 for (;;) {
265 if (sendeofacked)
266 if (receivewritten == receivetotalbytes)
267 if (receiveeof)
268 if (tochild[1] < 0)
269 break; /* XXX: to re-ack should enter a TIME-WAIT state here */
270
271 q = p;
272
273 watch8 = q;
274 if (watch8) { q->fd = 8; q->events = POLLIN; ++q; }
275
276 watchtochild = q;
277 if (tochild[1] < 0) watchtochild = 0;
278 if (receivewritten >= receivebytes) watchtochild = 0;
279 if (watchtochild) { q->fd = tochild[1]; q->events = POLLOUT; ++q; }
280
281 watchfromchild = q;
282 if (sendeof) watchfromchild = 0;
283 if (sendbytes + 4096 > sizeof sendbuf) watchfromchild = 0;
284 if (watchfromchild) { q->fd = fromchild[0]; q->events = POLLIN; ++q; }
285
286 nextaction = recent + 60000000000LL;
287 if (wantping == 1) nextaction = recent + 1000000000;
288 if (wantping == 2)
289 if (nextaction > lastblocktime + nsecperblock) nextaction = lastblocktime + nsecperblock;
290 if (blocknum < OUTGOING)
291 if (!(sendeof ? sendeofprocessed : sendprocessed >= sendbytes))
292 if (nextaction > lastblocktime + nsecperblock) nextaction = lastblocktime + nsecperblock;
293 if (earliestblocktime)
294 if (earliestblocktime + rtt_timeout > lastblocktime + nsecperblock)
295 if (earliestblocktime + rtt_timeout < nextaction)
296 nextaction = earliestblocktime + rtt_timeout;
297
298 if (messagenum)
299 if (!watchtochild)
300 nextaction = 0;
301
302 if (nextaction <= recent)
303 timeout = 0;
304 else
305 timeout = (nextaction - recent) / 1000000 + 1;
306
307 if (poll(p,q - p,timeout) < 0) {
308 watch8 = 0;
309 watchtochild = 0;
310 watchfromchild = 0;
311 } else {
312 if (watch8) if (!watch8->revents) watch8 = 0;
313 if (watchtochild) if (!watchtochild->revents) watchtochild = 0;
314 if (watchfromchild) if (!watchfromchild->revents) watchfromchild = 0;
315 }
316
317 /* XXX: keepalives */
318
319 do { /* try receiving data from child: */
320 if (!watchfromchild) break;
321 if (sendeof) break;
322 if (sendbytes + 4096 > sizeof sendbuf) break;
323
324 pos = (sendacked & (sizeof sendbuf - 1)) + sendbytes;
325 if (pos < sizeof sendbuf) {
326 r = read(fromchild[0],sendbuf + pos,sizeof sendbuf - pos);
327 } else {
328 r = read(fromchild[0],sendbuf + pos - sizeof sendbuf,sizeof sendbuf - sendbytes);
329 }
330 if (r == -1) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) break;
331 if (r < 0) { sendeof = 4096; break; }
332 if (r == 0) { sendeof = 2048; break; }
333 sendbytes += r;
334 if (sendbytes >= 1152921504606846976LL) die_internalerror();
335 } while(0);
336
337 recent = nanoseconds();
338
339 do { /* try re-sending an old block: */
340 if (recent < lastblocktime + nsecperblock) break;
341 if (earliestblocktime == 0) break;
342 if (recent < earliestblocktime + rtt_timeout) break;
343
344 for (i = 0;i < blocknum;++i) {
345 pos = (blockfirst + i) & (OUTGOING - 1);
346 if (blocktime[pos] == earliestblocktime) {
347 if (recent > lastpanic + 4 * rtt_timeout) {
348 nsecperblock *= 2;
349 lastpanic = recent;
350 lastedge = recent;
351 }
352 goto sendblock;
353 }
354 }
355 } while(0);
356
357 do { /* try sending a new block: */
358 if (recent < lastblocktime + nsecperblock) break;
359 if (blocknum >= OUTGOING) break;
360 if (!wantping)
361 if (sendeof ? sendeofprocessed : sendprocessed >= sendbytes) break;
362 /* XXX: if any Nagle-type processing is desired, do it here */
363
364 pos = (blockfirst + blocknum) & (OUTGOING - 1);
365 ++blocknum;
366 blockpos[pos] = sendacked + sendprocessed;
367 blocklen[pos] = sendbytes - sendprocessed;
368 if (blocklen[pos] > maxblocklen) blocklen[pos] = maxblocklen;
369 if ((blockpos[pos] & (sizeof sendbuf - 1)) + blocklen[pos] > sizeof sendbuf)
370 blocklen[pos] = sizeof sendbuf - (blockpos[pos] & (sizeof sendbuf - 1));
371 /* XXX: or could have the full block in post-buffer space */
372 sendprocessed += blocklen[pos];
373 blockeof[pos] = 0;
374 if (sendprocessed == sendbytes) {
375 blockeof[pos] = sendeof;
376 if (sendeof) sendeofprocessed = 1;
377 }
378 blocktransmissions[pos] = 0;
379
380 sendblock:
381
382 blocktransmissions[pos] += 1;
383 blocktime[pos] = recent;
384 blockid[pos] = nextmessageid;
385 if (!++nextmessageid) ++nextmessageid;
386
387 /* constraints: u multiple of 16; u >= 16; u <= 1088; u >= 48 + blocklen[pos] */
388 u = 64 + blocklen[pos];
389 if (u <= 192) u = 192;
390 else if (u <= 320) u = 320;
391 else if (u <= 576) u = 576;
392 else if (u <= 1088) u = 1088;
393 else die_internalerror();
394 if (blocklen[pos] < 0 || blocklen[pos] > 1024) die_internalerror();
395
396 byte_zero(buf + 8,u);
397 buf[7] = u / 16;
398 uint32_pack(buf + 8,blockid[pos]);
399 /* XXX: include any acknowledgments that have piled up */
400 uint16_pack(buf + 46,blockeof[pos] | (crypto_uint16) blocklen[pos]);
401 uint64_pack(buf + 48,blockpos[pos]);
402 byte_copy(buf + 8 + u - blocklen[pos],blocklen[pos],sendbuf + (blockpos[pos] & (sizeof sendbuf - 1)));
403
404 if (writeall(9,buf + 7,u + 1) == -1) die_fatal("unable to write descriptor 9",0,0);
405 lastblocktime = recent;
406 wantping = 0;
407
408 earliestblocktime_compute();
409 } while(0);
410
411 do { /* try receiving messages: */
412 if (!watch8) break;
413 r = read(8,buf,sizeof buf);
414 if (r == -1) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) break;
415 if (r == 0) die_badmessage();
416 if (r < 0) die_fatal("unable to read from file descriptor 8",0,0);
417 for (k = 0;k < r;++k) {
418 messagetodo[messagetodolen++] = buf[k];
419 u = 16 * (unsigned long long) messagetodo[0];
420 if (u < 16) die_badmessage();
421 if (u > 1088) die_badmessage();
422 if (messagetodolen == 1 + u) {
423 if (messagenum < INCOMING) {
424 pos = (messagefirst + messagenum) & (INCOMING - 1);
425 messagelen[pos] = messagetodo[0];
426 byte_copy(message[pos],u,messagetodo + 1);
427 ++messagenum;
428 } else {
429 ; /* drop tail */
430 }
431 messagetodolen = 0;
432 }
433 }
434 } while(0);
435
436 do { /* try processing a message: */
437 if (!messagenum) break;
438 if (tochild[1] >= 0 && receivewritten < receivebytes) break;
439
440 maxblocklen = 1024;
441
442 pos = messagefirst & (INCOMING - 1);
443 len = 16 * (unsigned long long) messagelen[pos];
444 do { /* handle this message if it's comprehensible: */
445 unsigned long long D;
446 unsigned long long SF;
447 unsigned long long startbyte;
448 unsigned long long stopbyte;
449 crypto_uint32 id;
450 long long i;
451
452 if (len < 48) break;
453 if (len > 1088) break;
454
455 id = uint32_unpack(message[pos] + 4);
456 for (i = 0;i < blocknum;++i) {
457 k = (blockfirst + i) & (OUTGOING - 1);
458 if (blockid[k] == id) {
459 rtt = recent - blocktime[k];
460 if (!rtt_average) {
461 nsecperblock = rtt;
462 rtt_average = rtt;
463 rtt_deviation = rtt / 2;
464 rtt_highwater = rtt;
465 rtt_lowwater = rtt;
466 }
467
468 /* Jacobson's retransmission timeout calculation: */
469 rtt_delta = rtt - rtt_average;
470 rtt_average += rtt_delta / 8;
471 if (rtt_delta < 0) rtt_delta = -rtt_delta;
472 rtt_delta -= rtt_deviation;
473 rtt_deviation += rtt_delta / 4;
474 rtt_timeout = rtt_average + 4 * rtt_deviation;
475 /* adjust for delayed acks with anti-spiking: */
476 rtt_timeout += 8 * nsecperblock;
477
478 /* recognizing top and bottom of congestion cycle: */
479 rtt_delta = rtt - rtt_highwater;
480 rtt_highwater += rtt_delta / 1024;
481 rtt_delta = rtt - rtt_lowwater;
482 if (rtt_delta > 0) rtt_lowwater += rtt_delta / 8192;
483 else rtt_lowwater += rtt_delta / 256;
484
485 if (rtt_average > rtt_highwater + 5000000) rtt_seenrecenthigh = 1;
486 else if (rtt_average < rtt_lowwater) rtt_seenrecentlow = 1;
487
488 if (recent >= lastspeedadjustment + 16 * nsecperblock) {
489 if (recent - lastspeedadjustment > 10000000000LL) {
490 nsecperblock = 1000000000; /* slow restart */
491 nsecperblock += randommod(nsecperblock / 8);
492 }
493
494 lastspeedadjustment = recent;
495
496 if (nsecperblock >= 131072) {
497 /* additive increase: adjust 1/N by a constant c */
498 /* rtt-fair additive increase: adjust 1/N by a constant c every nanosecond */
499 /* approximation: adjust 1/N by cN every N nanoseconds */
500 /* i.e., N <- 1/(1/N + cN) = N/(1 + cN^2) every N nanoseconds */
501 if (nsecperblock < 16777216) {
502 /* N/(1+cN^2) approx N - cN^3 */
503 u = nsecperblock / 131072;
504 nsecperblock -= u * u * u;
505 } else {
506 double d = nsecperblock;
507 nsecperblock = d/(1 + d*d / 2251799813685248.0);
508 }
509 }
510
511 if (rtt_phase == 0) {
512 if (rtt_seenolderhigh) {
513 rtt_phase = 1;
514 lastedge = recent;
515 nsecperblock += randommod(nsecperblock / 4);
516 }
517 } else {
518 if (rtt_seenolderlow) {
519 rtt_phase = 0;
520 }
521 }
522
523 rtt_seenolderhigh = rtt_seenrecenthigh;
524 rtt_seenolderlow = rtt_seenrecentlow;
525 rtt_seenrecenthigh = 0;
526 rtt_seenrecentlow = 0;
527 }
528
529 do {
530 if (recent - lastedge < 60000000000LL) {
531 if (recent < lastdoubling + 4 * nsecperblock + 64 * rtt_timeout + 5000000000LL) break;
532 } else {
533 if (recent < lastdoubling + 4 * nsecperblock + 2 * rtt_timeout) break;
534 }
535 if (nsecperblock <= 65535) break;
536
537 nsecperblock /= 2;
538 lastdoubling = recent;
539 if (lastedge) lastedge = recent;
540 } while(0);
541 }
542 }
543
544 stopbyte = uint64_unpack(message[pos] + 8);
545 acknowledged(0,stopbyte);
546 startbyte = stopbyte + (unsigned long long) uint32_unpack(message[pos] + 16);
547 stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 20);
548 acknowledged(startbyte,stopbyte);
549 startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 22);
550 stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 24);
551 acknowledged(startbyte,stopbyte);
552 startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 26);
553 stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 28);
554 acknowledged(startbyte,stopbyte);
555 startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 30);
556 stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 32);
557 acknowledged(startbyte,stopbyte);
558 startbyte = stopbyte + (unsigned long long) uint16_unpack(message[pos] + 34);
559 stopbyte = startbyte + (unsigned long long) uint16_unpack(message[pos] + 36);
560 acknowledged(startbyte,stopbyte);
561
562 D = uint16_unpack(message[pos] + 38);
563 SF = D & (2048 + 4096);
564 D -= SF;
565 if (D > 1024) break;
566 if (48 + D > len) break;
567
568 startbyte = uint64_unpack(message[pos] + 40);
569 stopbyte = startbyte + D;
570
571 if (stopbyte > receivewritten + sizeof receivebuf) {
572 break;
573 /* of course, flow control would avoid this case */
574 }
575
576 if (SF) {
577 receiveeof = SF;
578 receivetotalbytes = stopbyte;
579 }
580
581 for (k = 0;k < D;++k) {
582 unsigned char ch = message[pos][len - D + k];
583 unsigned long long where = startbyte + k;
584 if (where >= receivewritten && where < receivewritten + sizeof receivebuf) {
585 receivevalid[where & (sizeof receivebuf - 1)] = 1;
586 receivebuf[where & (sizeof receivebuf - 1)] = ch;
587 }
588 }
589 for (;;) {
590 if (receivebytes >= receivewritten + sizeof receivebuf) break;
591 if (!receivevalid[receivebytes & (sizeof receivebuf - 1)]) break;
592 ++receivebytes;
593 }
594
595 if (!uint32_unpack(message[pos])) break; /* never acknowledge a pure acknowledgment */
596
597 /* XXX: delay acknowledgments */
598 u = 192;
599 byte_zero(buf + 8,u);
600 buf[7] = u / 16;
601 byte_copy(buf + 12,4,message[pos]);
602 if (receiveeof && receivebytes == receivetotalbytes) {
603 uint64_pack(buf + 16,receivebytes + 1);
604 } else
605 uint64_pack(buf + 16,receivebytes);
606 /* XXX: incorporate selective acknowledgments */
607
608 if (writeall(9,buf + 7,u + 1) == -1) die_fatal("unable to write descriptor 9",0,0);
609 } while(0);
610
611 ++messagefirst;
612 --messagenum;
613 } while(0);
614
615 do { /* try sending data to child: */
616 if (!watchtochild) break;
617 if (tochild[1] < 0) { receivewritten = receivebytes; break; }
618 if (receivewritten >= receivebytes) break;
619
620 pos = receivewritten & (sizeof receivebuf - 1);
621 len = receivebytes - receivewritten;
622 if (pos + len > sizeof receivebuf) len = sizeof receivebuf - pos;
623 r = write(tochild[1],receivebuf + pos,len);
624 if (r == -1) if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) break;
625 if (r <= 0) {
626 close(tochild[1]);
627 tochild[1] = -1;
628 break;
629 }
630 byte_zero(receivevalid + pos,r);
631 receivewritten += r;
632 } while(0);
633
634 do { /* try closing pipe to child: */
635 if (!receiveeof) break;
636 if (receivewritten < receivetotalbytes) break;
637 if (tochild[1] < 0) break;
638
639 if (receiveeof == 4096)
640 ; /* XXX: UNIX doesn't provide a way to signal an error through a pipe */
641 close(tochild[1]);
642 tochild[1] = -1;
643 } while(0);
644
645 }
646
647
648 do {
649 r = waitpid(child,&childstatus,0);
650 } while (r == -1 && errno == EINTR);
651
652 if (!WIFEXITED(childstatus)) { errno = 0; die_fatal("process killed by signal",0,0); }
653 return WEXITSTATUS(childstatus);
654}