network: unix
unix-socket
#ifndef MILL_UNIX_BUFLEN
#define MILL_UNIX_BUFLEN (4096)
#endif
enum mill_unixtype {
MILL_UNIXLISTENER,
MILL_UNIXCONN
};
struct mill_unixsock_ {
enum mill_unixtype type;
};
struct mill_unixlistener {
struct mill_unixsock_ sock;
int fd;
};
struct mill_unixconn {
struct mill_unixsock_ sock;
int fd;
size_t ifirst;
size_t ilen;
size_t olen;
char ibuf[MILL_UNIX_BUFLEN];
char obuf[MILL_UNIX_BUFLEN];
};
static void mill_unixtune(int s) {
int opt = fcntl(s, F_GETFL, 0);
if (opt == -1)
opt = 0;
int rc = fcntl(s, F_SETFL, opt | O_NONBLOCK);
mill_assert(rc != -1);
#ifdef SO_NOSIGPIPE
opt = 1;
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
mill_assert (rc == 0 || errno == EINVAL);
#endif
}
static int mill_unixresolve(const char *addr, struct sockaddr_un *su) {
mill_assert(su);
if (strlen(addr) >= sizeof(su->sun_path)) {
errno = EINVAL;
return -1;
}
su->sun_family = AF_UNIX;
strncpy(su->sun_path, addr, sizeof(su->sun_path));
errno = 0;
return 0;
}
static void unixconn_init(struct mill_unixconn *conn, int fd) {
conn->sock.type = MILL_UNIXCONN;
conn->fd = fd;
conn->ifirst = 0;
conn->ilen = 0;
conn->olen = 0;
}
struct mill_unixsock_ *mill_unixlisten_(const char *addr, int backlog) {
struct sockaddr_un su;
int rc = mill_unixresolve(addr, &su);
if (rc != 0) {
return NULL;
}
int s = socket(AF_UNIX, SOCK_STREAM, 0);
if(s == -1)
return NULL;
mill_unixtune(s);
rc = bind(s, (struct sockaddr*)&su, sizeof(struct sockaddr_un));
if(rc != 0)
return NULL;
rc = listen(s, backlog);
if(rc != 0)
return NULL;
struct mill_unixlistener *l = malloc(sizeof(struct mill_unixlistener));
if(!l) {
fdclean(s);
close(s);
errno = ENOMEM;
return NULL;
}
l->sock.type = MILL_UNIXLISTENER;
l->fd = s;
errno = 0;
return &l->sock;
}
struct mill_unixsock_ *mill_unixaccept_(struct mill_unixsock_ *s, int64_t deadline) {
if(s->type != MILL_UNIXLISTENER)
mill_panic("trying to accept on a socket that isn't listening");
struct mill_unixlistener *l = (struct mill_unixlistener*)s;
while(1) {
int as = accept(l->fd, NULL, NULL);
if (as >= 0) {
mill_unixtune(as);
struct mill_unixconn *conn = malloc(sizeof(struct mill_unixconn));
if(!conn) {
fdclean(as);
close(as);
errno = ENOMEM;
return NULL;
}
unixconn_init(conn, as);
errno = 0;
return (struct mill_unixsock_*)conn;
}
mill_assert(as == -1);
if(errno != EAGAIN && errno != EWOULDBLOCK)
return NULL;
int rc = fdwait(l->fd, FDW_IN, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return NULL;
}
if(rc & FDW_ERR)
return NULL;
mill_assert(rc == FDW_IN);
}
}
struct mill_unixsock_ *mill_unixconnect_(const char *addr) {
struct sockaddr_un su;
int rc = mill_unixresolve(addr, &su);
if (rc != 0) {
return NULL;
}
int s = socket(AF_UNIX, SOCK_STREAM, 0);
if(s == -1)
return NULL;
mill_unixtune(s);
rc = connect(s, (struct sockaddr*)&su, sizeof(struct sockaddr_un));
if(rc != 0) {
int err = errno;
mill_assert(rc == -1);
fdclean(s);
close(s);
errno = err;
return NULL;
}
struct mill_unixconn *conn = malloc(sizeof(struct mill_unixconn));
if(!conn) {
fdclean(s);
close(s);
errno = ENOMEM;
return NULL;
}
unixconn_init(conn, s);
errno = 0;
return (struct mill_unixsock_*)conn;
}
void mill_unixpair_(struct mill_unixsock_ **a, struct mill_unixsock_ **b) {
if(!a || !b) {
errno = EINVAL;
return;
}
int fd[2];
int rc = socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
if (rc != 0)
return;
mill_unixtune(fd[0]);
mill_unixtune(fd[1]);
struct mill_unixconn *conn = malloc(sizeof(struct mill_unixconn));
if(!conn) {
fdclean(fd[0]);
close(fd[0]);
fdclean(fd[1]);
close(fd[1]);
errno = ENOMEM;
return;
}
unixconn_init(conn, fd[0]);
*a = (struct mill_unixsock_*)conn;
conn = malloc(sizeof(struct mill_unixconn));
if(!conn) {
free(*a);
fdclean(fd[0]);
close(fd[0]);
fdclean(fd[1]);
close(fd[1]);
errno = ENOMEM;
return;
}
unixconn_init(conn, fd[1]);
*b = (struct mill_unixsock_*)conn;
errno = 0;
}
size_t mill_unixsend_(struct mill_unixsock_ *s, const void *buf, size_t len, int64_t deadline) {
if(s->type != MILL_UNIXCONN)
mill_panic("trying to send to an unconnected socket");
struct mill_unixconn *conn = (struct mill_unixconn*)s;
if(conn->olen + len <= MILL_UNIX_BUFLEN) {
memcpy(&conn->obuf[conn->olen], buf, len);
conn->olen += len;
errno = 0;
return len;
}
unixflush(s, deadline);
if(errno != 0)
return 0;
if(conn->olen + len <= MILL_UNIX_BUFLEN) {
memcpy(&conn->obuf[conn->olen], buf, len);
conn->olen += len;
errno = 0;
return len;
}
char *pos = (char*)buf;
size_t remaining = len;
while(remaining) {
ssize_t sz = send(conn->fd, pos, remaining, 0);
if(sz == -1) {
if(errno == EPIPE) {
errno = ECONNRESET;
return 0;
}
if(errno != EAGAIN && errno != EWOULDBLOCK)
return 0;
int rc = fdwait(conn->fd, FDW_OUT, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return len - remaining;
}
continue;
}
pos += sz;
remaining -= sz;
}
errno = 0;
return len;
}
void mill_unixflush_(struct mill_unixsock_ *s, int64_t deadline) {
if(s->type != MILL_UNIXCONN)
mill_panic("trying to send to an unconnected socket");
struct mill_unixconn *conn = (struct mill_unixconn*)s;
if(!conn->olen) {
errno = 0;
return;
}
char *pos = conn->obuf;
size_t remaining = conn->olen;
while(remaining) {
ssize_t sz = send(conn->fd, pos, remaining, 0);
if(sz == -1) {
if(errno == EPIPE) {
errno = ECONNRESET;
return;
}
if(errno != EAGAIN && errno != EWOULDBLOCK)
return;
int rc = fdwait(conn->fd, FDW_OUT, deadline);
if(rc == 0) {
errno = ETIMEDOUT;
return;
}
continue;
}
pos += sz;
remaining -= sz;
}
conn->olen = 0;
errno = 0;
}
size_t mill_unixrecv_(struct mill_unixsock_ *s, void *buf, size_t len, int64_t deadline) {
if(s->type != MILL_UNIXCONN)
mill_panic("trying to receive from an unconnected socket");
struct mill_unixconn *conn = (struct mill_unixconn*)s;
if(conn->ilen >= len) {
memcpy(buf, &conn->ibuf[conn->ifirst], len);
conn->ifirst += len;
conn->ilen -= len;
errno = 0;
return len;
}
char *pos = (char*)buf;
size_t remaining = len;
memcpy(pos, &conn->ibuf[conn->ifirst], conn->ilen);
pos += conn->ilen;
remaining -= conn->ilen;
conn->ifirst = 0;
conn->ilen = 0;
mill_assert(remaining);
while(1) {
if(remaining > MILL_UNIX_BUFLEN) {
ssize_t sz = recv(conn->fd, pos, remaining, 0);
if(!sz) {
errno = ECONNRESET;
return len - remaining;
}
if(sz == -1) {
if(errno != EAGAIN && errno != EWOULDBLOCK)
return len - remaining;
sz = 0;
}
if((size_t)sz == remaining) {
errno = 0;
return len;
}
pos += sz;
remaining -= sz;
}
else {
ssize_t sz = recv(conn->fd, conn->ibuf, MILL_UNIX_BUFLEN, 0);
if(!sz) {
errno = ECONNRESET;
return len - remaining;
}
if(sz == -1) {
if(errno != EAGAIN && errno != EWOULDBLOCK)
return len - remaining;
sz = 0;
}
if((size_t)sz < remaining) {
memcpy(pos, conn->ibuf, sz);
pos += sz;
remaining -= sz;
conn->ifirst = 0;
conn->ilen = 0;
}
else {
memcpy(pos, conn->ibuf, remaining);
conn->ifirst = remaining;
conn->ilen = sz - remaining;
errno = 0;
return len;
}
}
int res = fdwait(conn->fd, FDW_IN, deadline);
if(!res) {
errno = ETIMEDOUT;
return len - remaining;
}
}
}
size_t mill_unixrecvuntil_(struct mill_unixsock_ *s, void *buf, size_t len,
const char *delims, size_t delimcount, int64_t deadline) {
if(s->type != MILL_UNIXCONN)
mill_panic("trying to receive from an unconnected socket");
unsigned char *pos = (unsigned char*)buf;
size_t i;
for(i = 0; i != len; ++i, ++pos) {
size_t res = unixrecv(s, pos, 1, deadline);
if(res == 1) {
size_t j;
for(j = 0; j != delimcount; ++j)
if(*pos == delims[j])
return i + 1;
}
if (errno != 0)
return i + res;
}
errno = ENOBUFS;
return len;
}
void mill_unixshutdown_(struct mill_unixsock_ *s, int how) {
mill_assert(s->type == MILL_UNIXCONN);
struct mill_unixconn *c = (struct mill_unixconn*)s;
int rc = shutdown(c->fd, how);
mill_assert(rc == 0 || errno == ENOTCONN);
}
void mill_unixclose_(struct mill_unixsock_ *s) {
if(s->type == MILL_UNIXLISTENER) {
struct mill_unixlistener *l = (struct mill_unixlistener*)s;
fdclean(l->fd);
int rc = close(l->fd);
mill_assert(rc == 0);
free(l);
return;
}
if(s->type == MILL_UNIXCONN) {
struct mill_unixconn *c = (struct mill_unixconn*)s;
fdclean(c->fd);
int rc = close(c->fd);
mill_assert(rc == 0);
free(c);
return;
}
mill_assert(0);
}