bind to the address we received UDP on

That in addition allocates a new UDP socket per client,
and forwards the initial client hello to the worker
process as auxillary data. That, eliminates the need to
re-open the main server's UDP socket per client connection.
This commit is contained in:
Nikos Mavrogiannopoulos
2015-01-10 22:49:25 +01:00
parent cb56984e8d
commit 3d7ac2c98c
5 changed files with 226 additions and 68 deletions

View File

@@ -94,6 +94,7 @@ message cli_stats_msg
message udp_fd_msg
{
required bool hello = 1 [default = true]; /* is that a client hello? */
optional bytes data = 2; /* the client hello data */
}
/* SESSION_INFO */

View File

@@ -60,6 +60,9 @@
#include <grp.h>
#include <ip-lease.h>
#include <ccan/list/list.h>
/* for recvmsg */
#include <netinet/in.h>
#include <netinet/ip.h>
int syslog_open = 0;
static unsigned int terminate = 0;
@@ -103,6 +106,23 @@ int y;
perror("setsockopt(IP_DF) failed");
#endif
}
#if defined(IP_PKTINFO)
y = 1;
if (setsockopt(fd, SOL_IP, IP_PKTINFO,
(const void *)&y, sizeof(y)) < 0)
perror("setsockopt(IP_PKTINFO) failed");
#elif defined(IP_RECVDSTADDR) /* *BSD */
y = 1;
if (setsockopt(fd, IPPROTO_IP, IP_RECVDSTADDR,
(const void *)&y, sizeof(y)) < 0)
perror("setsockopt(IP_RECVDSTADDR) failed");
#endif
#if defined(IPV6_RECVPKTINFO)
y = 1;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO,
(const void *)&y, sizeof(y)) < 0)
perror("setsockopt(IPV6_RECVPKTINFO) failed");
#endif
}
static void set_common_socket_options(int fd)
@@ -405,53 +425,33 @@ listen_ports(void *pool, struct cfg_st* config,
* that.
*/
static
int reopen_udp_port(struct listener_st *l)
void set_udp_opts(int fd, int family)
{
int s, y, e;
int y;
close(l->fd);
l->fd = -1;
s = socket(l->family, SOCK_DGRAM, l->protocol);
if (s < 0) {
perror("socket() failed");
return -1;
}
#if defined(IPV6_V6ONLY)
if (l->family == AF_INET6) {
if (family == AF_INET6) {
y = 1;
/* avoid listen on ipv6 addresses failing
* because already listening on ipv4 addresses: */
setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY,
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY,
(const void *) &y, sizeof(y));
}
#endif
y = 1;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (const void *) &y, sizeof(y));
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *) &y, sizeof(y));
#if defined(IP_DONTFRAG)
y = 1;
setsockopt(s, IPPROTO_IP, IP_DONTFRAG,
setsockopt(fd, IPPROTO_IP, IP_DONTFRAG,
(const void *) &y, sizeof(y));
#elif defined(IP_MTU_DISCOVER)
y = IP_PMTUDISC_DO;
setsockopt(s, IPPROTO_IP, IP_MTU_DISCOVER,
setsockopt(fd, IPPROTO_IP, IP_MTU_DISCOVER,
(const void *) &y, sizeof(y));
#endif
set_cloexec_flag (s, 1);
set_cloexec_flag (fd, 1);
if (bind(s, (void*)&l->addr, l->addr_len) < 0) {
e = errno;
syslog(LOG_ERR, "bind() failed: %s", strerror(e));
close(s);
return -1;
}
l->fd = s;
return 0;
return;
}
@@ -665,27 +665,78 @@ static int forward_udp_to_owner(main_server_st* s, struct listener_st *listener)
{
int ret, e;
struct sockaddr_storage cli_addr;
struct sockaddr_storage our_addr;
struct proc_st *proc_to_send = NULL;
socklen_t cli_addr_size;
socklen_t cli_addr_size, our_addr_size = 0;
uint8_t buffer[1024];
char cmbuf[256];
char tbuf[64];
uint8_t *session_id = NULL;
int session_id_size = 0;
ssize_t buffer_size;
int connected = 0;
int match_ip_only = 0;
time_t now;
struct iovec iov = { buffer, sizeof(buffer) };
struct cmsghdr *cmsg;
int sfd = -1;
struct msghdr mh = {
.msg_name = &cli_addr,
.msg_namelen = sizeof(cli_addr),
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = cmbuf,
.msg_controllen = sizeof(cmbuf),
};
/* first receive from the correct client and connect socket */
cli_addr_size = sizeof(cli_addr);
ret = recvfrom(listener->fd, buffer, sizeof(buffer), MSG_PEEK, (void*)&cli_addr, &cli_addr_size);
ret = recvmsg(listener->fd, &mh, 0);
if (ret < 0) {
mslog(s, NULL, LOG_INFO, "error receiving in UDP socket");
return -1;
}
buffer_size = ret;
/* find our address */
for (cmsg = CMSG_FIRSTHDR(&mh); cmsg != NULL; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
#if defined(IP_PKTINFO)
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
struct in_pktinfo *pi = CMSG_DATA(cmsg);
struct sockaddr_in *a = (struct sockaddr_in*)&our_addr;
a->sin_family = AF_INET;
memcpy(&a->sin_addr, &pi->ipi_addr, sizeof(struct in_addr));
a->sin_port = htons(s->config->udp_port);
our_addr_size = sizeof(struct sockaddr_in);
break;
}
#elif defined(IP_RECVDSTADDR)
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_RECVDSTADDR) {
struct in_addr *pi = CMSG_DATA(cmsg);
struct sockaddr_in *a = (struct sockaddr_in*)&our_addr;
a->sin_family = AF_INET;
memcpy(&a->sin_addr, &pi->ipi_addr, sizeof(struct in_addr));
a->sin_port = htons(s->config->udp_port);
our_addr_size = sizeof(struct sockaddr_in);
break;
}
#endif
#ifdef IPV6_RECVPKTINFO
if (cmsg->cmsg_level != IPPROTO_IPV6 || cmsg->cmsg_type != IPV6_RECVPKTINFO) {
struct in6_pktinfo *pi = CMSG_DATA(cmsg);
struct sockaddr_in6 *a = (struct sockaddr_in6*)&our_addr;
a->sin6_family = AF_INET6;
memcpy(&a->sin6_addr, &pi->ipi6_addr, sizeof(struct in6_addr));
a->sin6_port = htons(s->config->udp_port);
our_addr_size = sizeof(struct sockaddr_in6);
break;
}
#endif
}
/* obtain the session id */
if (buffer_size < RECORD_PAYLOAD_POS+HANDSHAKE_SESSION_ID_POS+GNUTLS_MAX_SESSION_ID+2) {
mslog(s, NULL, LOG_INFO, "%s: too short UDP packet",
@@ -747,7 +798,27 @@ time_t now;
goto fail;
}
ret = connect(listener->fd, (void*)&cli_addr, cli_addr_size);
sfd = socket(listener->family, SOCK_DGRAM, listener->protocol);
if (sfd < 0) {
e = errno;
mslog(s, proc_to_send, LOG_ERR, "new UDP socket failed: %s",
strerror(e));
goto fail;
}
set_udp_opts(sfd, listener->family);
if (our_addr_size > 0) {
ret = bind(sfd, (struct sockaddr *)&our_addr, our_addr_size);
if (ret == -1) {
e = errno;
mslog(s, proc_to_send, LOG_ERR, "bind UDP to %s: %s",
human_addr((struct sockaddr*)&listener->addr, listener->addr_len, tbuf, sizeof(tbuf)),
strerror(e));
}
}
ret = connect(sfd, (void*)&cli_addr, cli_addr_size);
if (ret == -1) {
e = errno;
mslog(s, proc_to_send, LOG_ERR, "connect UDP socket from %s: %s",
@@ -758,10 +829,14 @@ time_t now;
if (match_ip_only != 0) {
msg.hello = 0;
} else {
msg.data.data = buffer;
msg.data.len = buffer_size;
msg.has_data = 1;
}
ret = send_socket_msg_to_worker(s, proc_to_send, CMD_UDP_FD,
listener->fd,
sfd,
&msg,
(pack_size_func)udp_fd_msg__get_packed_size,
(pack_func)udp_fd_msg__pack);
@@ -773,18 +848,11 @@ time_t now;
mslog(s, proc_to_send, LOG_DEBUG, "passed UDP socket from %s",
human_addr((struct sockaddr*)&cli_addr, cli_addr_size, tbuf, sizeof(tbuf)));
proc_to_send->udp_fd_receive_time = now;
connected = 1;
reopen_udp_port(listener);
}
fail:
if (connected == 0) {
/* received packet from unknown host. Ignore it. */
recv(listener->fd, buffer, buffer_size, 0);
return -1;
}
if (sfd != -1)
close(sfd);
return 0;

View File

@@ -48,6 +48,7 @@
# include <sys/mman.h>
#endif
int handle_worker_commands(struct worker_st *ws)
{
struct iovec iov[3];
@@ -55,7 +56,7 @@ int handle_worker_commands(struct worker_st *ws)
uint16_t length;
int e;
struct msghdr hdr;
uint8_t cmd_data[32];
uint8_t cmd_data[1024];
union {
struct cmsghdr cm;
char control[CMSG_SPACE(sizeof(int))];
@@ -118,7 +119,12 @@ int handle_worker_commands(struct worker_st *ws)
tmsg = udp_fd_msg__unpack(NULL, length, cmd_data);
if (tmsg) {
hello = tmsg->hello;
udp_fd_msg__free_unpacked(tmsg, NULL);
if (hello) {
ws->dtls_tptr.msg = tmsg;
} else {
udp_fd_msg__free_unpacked(tmsg, NULL);
tmsg = NULL;
}
}
if ( (cmptr = CMSG_FIRSTHDR(&hdr)) != NULL && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
@@ -137,8 +143,9 @@ int handle_worker_commands(struct worker_st *ws)
close(fd);
return 0;
}
if (ws->dtls_session != NULL)
gnutls_transport_set_ptr(ws->dtls_session, (gnutls_transport_ptr_t)(long)fd);
if (ws->dtls_session != NULL) {
ws->dtls_tptr.fd = fd;
}
} else { /* received client hello */
ws->udp_state = UP_SETUP;
}

View File

@@ -547,6 +547,76 @@ int body_cb(http_parser * parser, const char *at, size_t length)
return 0;
}
inline static ssize_t dtls_pull_buffer_size(gnutls_transport_ptr_t ptr)
{
dtls_transport_ptr *p = ptr;
if (p->msg && p->consumed < p->msg->data.len)
return 1;
return 0;
}
static
ssize_t dtls_pull(gnutls_transport_ptr_t ptr, void *data, size_t size)
{
dtls_transport_ptr *p = ptr;
if (p->msg) {
if (p->consumed < p->msg->data.len) {
ssize_t need = p->msg->data.len - p->consumed;
if (need > size) {
need = size;
}
memcpy(data, &p->msg->data.data[p->consumed], need);
p->consumed += need;
return need;
} else {
udp_fd_msg__free_unpacked(p->msg, NULL);
p->msg = NULL;
}
}
return recv(p->fd, data, size, 0);
}
static
int dtls_pull_timeout(gnutls_transport_ptr_t ptr, unsigned int ms)
{
fd_set rfds;
struct timeval tv;
int ret;
dtls_transport_ptr *p = ptr;
int fd = p->fd;
if (dtls_pull_buffer_size(ptr)) {
return 1;
}
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
tv.tv_sec = 0;
tv.tv_usec = ms * 1000;
while (tv.tv_usec >= 1000000) {
tv.tv_usec -= 1000000;
tv.tv_sec++;
}
ret = select(fd + 1, &rfds, NULL, NULL, &tv);
if (ret <= 0)
return ret;
return ret;
}
static
ssize_t dtls_push(gnutls_transport_ptr_t ptr, const void *data, size_t size)
{
dtls_transport_ptr *p = ptr;
return send(p->fd, data, size, 0);
}
static int setup_dtls_connection(struct worker_st *ws)
{
int ret;
@@ -605,8 +675,12 @@ static int setup_dtls_connection(struct worker_st *ws)
goto fail;
}
gnutls_transport_set_ptr(session,
(gnutls_transport_ptr_t) (long)ws->udp_fd);
ws->dtls_tptr.fd = ws->udp_fd;
gnutls_transport_set_push_function(session, dtls_push);
gnutls_transport_set_pull_function(session, dtls_pull);
gnutls_transport_set_pull_timeout_function(session, dtls_pull_timeout);
gnutls_transport_set_ptr(session, &ws->dtls_tptr);
gnutls_session_set_ptr(session, ws);
gnutls_certificate_server_set_request(session, GNUTLS_CERT_IGNORE);
@@ -2011,17 +2085,6 @@ static int connect_handler(worker_st * ws)
for (;;) {
FD_ZERO(&rfds);
FD_SET(ws->conn_fd, &rfds);
FD_SET(ws->cmd_fd, &rfds);
FD_SET(ws->tun_fd, &rfds);
max = MAX(ws->cmd_fd, ws->conn_fd);
max = MAX(max, ws->tun_fd);
if (ws->udp_state > UP_WAIT_FD) {
FD_SET(ws->udp_fd, &rfds);
max = MAX(max, ws->udp_fd);
}
if (terminate != 0) {
terminate:
ws->buffer[0] = 'S';
@@ -2045,14 +2108,27 @@ static int connect_handler(worker_st * ws)
else
tls_pending = 0;
if (ws->dtls_session != NULL && ws->udp_state > UP_WAIT_FD) {
dtls_pending =
gnutls_record_check_pending(ws->dtls_session);
if (ws->udp_state > UP_WAIT_FD) {
dtls_pending = dtls_pull_buffer_size(&ws->dtls_tptr);
if (ws->dtls_session != NULL)
dtls_pending +=
gnutls_record_check_pending(ws->dtls_session);
} else {
dtls_pending = 0;
}
if (tls_pending == 0 && dtls_pending == 0) {
FD_SET(ws->conn_fd, &rfds);
FD_SET(ws->cmd_fd, &rfds);
FD_SET(ws->tun_fd, &rfds);
max = MAX(ws->cmd_fd, ws->conn_fd);
max = MAX(max, ws->tun_fd);
if (ws->udp_state > UP_WAIT_FD) {
FD_SET(ws->udp_fd, &rfds);
max = MAX(max, ws->udp_fd);
}
#ifdef HAVE_PSELECT
tv.tv_nsec = 0;
tv.tv_sec = 10;
@@ -2096,8 +2172,8 @@ static int connect_handler(worker_st * ws)
}
/* read data from UDP channel */
if (ws->udp_state > UP_WAIT_FD
&& (FD_ISSET(ws->udp_fd, &rfds) || dtls_pending != 0)) {
if (ws->udp_state > UP_WAIT_FD &&
(FD_ISSET(ws->udp_fd, &rfds) || dtls_pending != 0)) {
ret = dtls_mainloop(ws, &tnow);
if (ret < 0)
@@ -2115,7 +2191,6 @@ static int connect_handler(worker_st * ws)
goto exit;
}
}
}
return 0;

View File

@@ -129,10 +129,17 @@ struct http_req_st {
unsigned no_ipv6;
};
typedef struct dtls_transport_ptr {
int fd;
UdpFdMsg *msg; /* holds the data of the first client hello */
int consumed;
} dtls_transport_ptr;
typedef struct worker_st {
struct tls_st *creds;
gnutls_session_t session;
gnutls_session_t dtls_session;
dtls_transport_ptr dtls_tptr;
struct http_req_st req;