network: tcp
tcp
#ifndef MILL_TCP_BUFLEN
#define MILL_TCP_BUFLEN (1500 - 68)
#endif
enum mill_tcptype {
MILL_TCPLISTENER,
MILL_TCPCONN
};
struct mill_tcpsock_ {
enum mill_tcptype type;
};
struct mill_tcplistener {
struct mill_tcpsock_ sock;
int fd;
int port;
};
struct mill_tcpconn {
struct mill_tcpsock_ sock;
int fd;
size_t ifirst;
size_t ilen;
size_t olen;
char ibuf[MILL_TCP_BUFLEN];
char obuf[MILL_TCP_BUFLEN];
ipaddr addr;
};
static void mill_tcptune(int s) {
int opt = fcntl(s, F_GETFL, 0);
if (opt == -1)
opt = 0;
int rc = fcntl(s, F_SETFL, opt | O_NONBLOCK);
mill_assert(rc != -1);
opt = 1;
rc = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
mill_assert(rc == 0);
#ifdef SO_NOSIGPIPE
opt = 1;
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
mill_assert (rc == 0 || errno == EINVAL);
#endif
}
static void tcpconn_init(struct mill_tcpconn *conn, int fd) {
conn->sock.type = MILL_TCPCONN;
conn->fd = fd;
conn->ifirst = 0;
conn->ilen = 0;
conn->olen = 0;
}
struct mill_tcpsock_ *mill_tcplisten_(ipaddr addr, int backlog) {
int s = socket(mill_ipfamily(addr), SOCK_STREAM, 0);
if(s == -1)
return NULL;
mill_tcptune(s);
int rc = bind(s, (struct sockaddr*)&addr, mill_iplen(addr));
if(rc != 0)
return NULL;
rc = listen(s, backlog);
if(rc != 0)
return NULL;
int port = mill_ipport(addr);
if(!port) {
ipaddr baddr;
socklen_t len = sizeof(ipaddr);
rc = getsockname(s, (struct sockaddr*)&baddr, &len);
if(rc == -1) {
int err = errno;
fdclean(s);
close(s);
errno = err;
return NULL;
}
port = mill_ipport(baddr);
}
struct mill_tcplistener *l = malloc(sizeof(struct mill_tcplistener));
if(!l) {
fdclean(s);
close(s);
errno = ENOMEM;
return NULL;
}
l->sock.type = MILL_TCPLISTENER;
l->fd = s;
l->port = port;
errno = 0;
return &l->sock;
}
int mill_tcpport_(struct mill_tcpsock_ *s) {
if(s->type == MILL_TCPCONN) {
struct mill_tcpconn *c = (struct mill_tcpconn*)s;
return mill_ipport(c->addr);
}
else if(s->type == MILL_TCPLISTENER) {
struct mill_tcplistener *l = (struct mill_tcplistener*)s;
return l->port;
}
mill_assert(0);
}
struct mill_tcpsock_ *mill_tcpaccept_(struct mill_tcpsock_ *s, int64_t deadline) {
if(s->type != MILL_TCPLISTENER)
mill_panic("trying to accept on a socket that isn't listening");
struct mill_tcplistener *l = (struct mill_tcplistener*)s;
socklen_t addrlen;
ipaddr addr;
while(1) {
addrlen = sizeof(addr);
int as = accept(l->fd, (struct sockaddr *)&addr, &addrlen);
if (as >= 0) {
mill_tcptune(as);
struct mill_tcpconn *conn = malloc(sizeof(struct mill_tcpconn));
if(!conn) {
fdclean(as);
close(as);
errno = ENOMEM;
return NULL;
}
tcpconn_init(conn, as);
conn->addr = addr;
errno = 0;
return (tcpsock)conn;
}
mill_assert(as == -1);
if(errno != EAGAIN && errno != EWOULDBLOCK)
return NULL;
int rc = fdwait(l->fd, FDW_IN, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return NULL;
}
if(rc & FDW_ERR)
return NULL;
mill_assert(rc == FDW_IN);
}
}
struct mill_tcpsock_ *mill_tcpconnect_(ipaddr addr, int64_t deadline) {
int s = socket(mill_ipfamily(addr), SOCK_STREAM, 0);
if(s == -1)
return NULL;
mill_tcptune(s);
int rc = connect(s, (struct sockaddr*)&addr, mill_iplen(addr));
if(rc != 0) {
mill_assert(rc == -1);
if(errno != EINPROGRESS)
return NULL;
rc = fdwait(s, FDW_OUT, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return NULL;
}
int err;
socklen_t errsz = sizeof(err);
rc = getsockopt(s, SOL_SOCKET, SO_ERROR, (void*)&err, &errsz);
if(rc != 0) {
err = errno;
fdclean(s);
close(s);
errno = err;
return NULL;
}
if(err != 0) {
fdclean(s);
close(s);
errno = err;
return NULL;
}
}
struct mill_tcpconn *conn = malloc(sizeof(struct mill_tcpconn));
if(!conn) {
fdclean(s);
close(s);
errno = ENOMEM;
return NULL;
}
tcpconn_init(conn, s);
errno = 0;
return (tcpsock)conn;
}
size_t mill_tcpsend_(struct mill_tcpsock_ *s, const void *buf, size_t len, int64_t deadline) {
if(s->type != MILL_TCPCONN)
mill_panic("trying to send to an unconnected socket");
struct mill_tcpconn *conn = (struct mill_tcpconn*)s;
if(conn->olen + len <= MILL_TCP_BUFLEN) {
memcpy(&conn->obuf[conn->olen], buf, len);
conn->olen += len;
errno = 0;
return len;
}
tcpflush(s, deadline);
if(errno != 0)
return 0;
if(conn->olen + len <= MILL_TCP_BUFLEN) {
memcpy(&conn->obuf[conn->olen], buf, len);
conn->olen += len;
errno = 0;
return len;
}
char *pos = (char*)buf;
size_t remaining = len;
while(remaining) {
ssize_t sz = send(conn->fd, pos, remaining, 0);
if(sz == -1) {
if(errno == EPIPE) {
errno = ECONNRESET;
return 0;
}
if(errno != EAGAIN && errno != EWOULDBLOCK)
return 0;
int rc = fdwait(conn->fd, FDW_OUT, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return len - remaining;
}
continue;
}
pos += sz;
remaining -= sz;
}
errno = 0;
return len;
}
void mill_tcpflush_(struct mill_tcpsock_ *s, int64_t deadline) {
if(s->type != MILL_TCPCONN)
mill_panic("trying to send to an unconnected socket");
struct mill_tcpconn *conn = (struct mill_tcpconn*)s;
if(!conn->olen) {
errno = 0;
return;
}
char *pos = conn->obuf;
size_t remaining = conn->olen;
while(remaining) {
ssize_t sz = send(conn->fd, pos, remaining, 0);
if(sz == -1) {
if(errno == EPIPE) {
errno = ECONNRESET;
return;
}
if(errno != EAGAIN && errno != EWOULDBLOCK)
return;
int rc = fdwait(conn->fd, FDW_OUT, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return;
}
continue;
}
pos += sz;
remaining -= sz;
}
conn->olen = 0;
errno = 0;
}
size_t mill_tcprecv_(struct mill_tcpsock_ *s, void *buf, size_t len, int64_t deadline) {
if(s->type != MILL_TCPCONN)
mill_panic("trying to receive from an unconnected socket");
struct mill_tcpconn *conn = (struct mill_tcpconn*)s;
if(conn->ilen >= len) {
memcpy(buf, &conn->ibuf[conn->ifirst], len);
conn->ifirst += len;
conn->ilen -= len;
errno = 0;
return len;
}
char *pos = (char*)buf;
size_t remaining = len;
memcpy(pos, &conn->ibuf[conn->ifirst], conn->ilen);
pos += conn->ilen;
remaining -= conn->ilen;
conn->ifirst = 0;
conn->ilen = 0;
mill_assert(remaining);
while(1) {
if(remaining > MILL_TCP_BUFLEN) {
ssize_t sz = recv(conn->fd, pos, remaining, 0);
if(!sz) {
errno = ECONNRESET;
return len - remaining;
}
if(sz == -1) {
if(errno != EAGAIN && errno != EWOULDBLOCK)
return len - remaining;
sz = 0;
}
if((size_t)sz == remaining) {
errno = 0;
return len;
}
pos += sz;
remaining -= sz;
}
else {
ssize_t sz = recv(conn->fd, conn->ibuf, MILL_TCP_BUFLEN, 0);
if(!sz) {
errno = ECONNRESET;
return len - remaining;
}
if(sz == -1) {
if(errno != EAGAIN && errno != EWOULDBLOCK)
return len - remaining;
sz = 0;
}
if((size_t)sz < remaining) {
memcpy(pos, conn->ibuf, sz);
pos += sz;
remaining -= sz;
conn->ifirst = 0;
conn->ilen = 0;
}
else {
memcpy(pos, conn->ibuf, remaining);
conn->ifirst = remaining;
conn->ilen = sz - remaining;
errno = 0;
return len;
}
}
int res = fdwait(conn->fd, FDW_IN, deadline);
if(!res) {
errno = ETIMEDOUT;
return len - remaining;
}
}
}
size_t mill_tcprecvuntil_(struct mill_tcpsock_ *s, void *buf, size_t len,
const char *delims, size_t delimcount, int64_t deadline) {
if(s->type != MILL_TCPCONN)
mill_panic("trying to receive from an unconnected socket");
char *pos = (char*)buf;
size_t i;
for(i = 0; i != len; ++i, ++pos) {
size_t res = tcprecv(s, pos, 1, deadline);
if(res == 1) {
size_t j;
for(j = 0; j != delimcount; ++j)
if(*pos == delims[j])
return i + 1;
}
if (errno != 0)
return i + res;
}
errno = ENOBUFS;
return len;
}
void mill_tcpshutdown_(struct mill_tcpsock_ *s, int how) {
mill_assert(s->type == MILL_TCPCONN);
struct mill_tcpconn *c = (struct mill_tcpconn*)s;
int rc = shutdown(c->fd, how);
mill_assert(rc == 0 || errno == ENOTCONN);
}
void mill_tcpclose_(struct mill_tcpsock_ *s) {
if(s->type == MILL_TCPLISTENER) {
struct mill_tcplistener *l = (struct mill_tcplistener*)s;
fdclean(l->fd);
int rc = close(l->fd);
mill_assert(rc == 0);
free(l);
return;
}
if(s->type == MILL_TCPCONN) {
struct mill_tcpconn *c = (struct mill_tcpconn*)s;
fdclean(c->fd);
int rc = close(c->fd);
mill_assert(rc == 0);
free(c);
return;
}
mill_assert(0);
}
ipaddr mill_tcpaddr_(struct mill_tcpsock_ *s) {
if(s->type != MILL_TCPCONN)
mill_panic("trying to get address from a socket that isn't connected");
struct mill_tcpconn *l = (struct mill_tcpconn *)s;
return l->addr;
}
int mill_tcpfd(struct mill_tcpsock_ *s) {
return ((struct mill_tcpconn*)s)->fd;
}