Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ CHECK_CONST_EXISTS(KERN_ARND sys/sysctl.h EVENT__HAVE_DECL_KERN_ARND)
CHECK_CONST_EXISTS(KERN_RANDOM sys/sysctl.h EVENT__HAVE_DECL_KERN_RANDOM)
CHECK_CONST_EXISTS(RANDOM_UUID sys/sysctl.h EVENT__HAVE_DECL_RANDOM_UUID)
CHECK_SYMBOL_EXISTS(F_SETFD fcntl.h EVENT__HAVE_SETFD)
CHECK_CONST_EXISTS(SO_TIMESTAMP sys/socket.h EVENT__HAVE_DECL_SO_TIMESTAMP)
CHECK_CONST_EXISTS(SO_TIMESTAMPNS sys/socket.h EVENT__HAVE_DECL_SO_TIMESTAMPNS)

CHECK_TYPE_SIZE(fd_mask EVENT__HAVE_FD_MASK)

Expand Down
208 changes: 208 additions & 0 deletions buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,14 @@ advance_last_with_data(struct evbuffer *buf)
int
evbuffer_commit_space(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs)
{
return evbuffer_commit_space_with_timespec(buf, vec, n_vecs, NULL, 0);
}

int
evbuffer_commit_space_with_timespec(struct evbuffer *buf,
struct evbuffer_iovec *vec, int n_vecs,
const struct timespec *ts, int ts_valid)
{
struct evbuffer_chain *chain, **firstchainp, **chainp;
int result = -1;
Expand All @@ -744,6 +752,10 @@ evbuffer_commit_space(struct evbuffer *buf,
goto done;
buf->last->off += vec[0].iov_len;
added = vec[0].iov_len;
if (ts_valid && buf->last->timestamp.valid == 0) {
buf->last->timestamp.ts = *ts;
buf->last->timestamp.valid = 1;
}
if (added)
advance_last_with_data(buf);
goto okay;
Expand Down Expand Up @@ -773,6 +785,10 @@ evbuffer_commit_space(struct evbuffer *buf,
for (i=0; i<n_vecs; ++i) {
(*chainp)->off += vec[i].iov_len;
added += vec[i].iov_len;
if (ts_valid && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = *ts;
(*chainp)->timestamp.valid = 1;
}
if (vec[i].iov_len) {
buf->last_with_datap = chainp;
}
Expand All @@ -790,6 +806,7 @@ evbuffer_commit_space(struct evbuffer *buf,
return result;
}


static inline int
HAS_PINNED_R(struct evbuffer *buf)
{
Expand Down Expand Up @@ -940,6 +957,7 @@ APPEND_CHAIN_MULTICAST(struct evbuffer *dst, struct evbuffer *src)
tmp->off = chain->off;
tmp->flags |= EVBUFFER_MULTICAST|EVBUFFER_IMMUTABLE;
tmp->buffer = chain->buffer;
tmp->timestamp = chain->timestamp;
evbuffer_chain_insert(dst, tmp);
}
}
Expand Down Expand Up @@ -1145,6 +1163,7 @@ evbuffer_drain(struct evbuffer *buf, size_t len)
EVUTIL_ASSERT(remaining == 0);
chain->misalign += chain->off;
chain->off = 0;
chain->timestamp.valid = 0;
break;
} else
evbuffer_chain_free(chain);
Expand Down Expand Up @@ -1386,6 +1405,11 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}

if (CHAIN_PINNED(chain)) {
/* Pinned chain case: expand in-place by appending data from
* subsequent chains. Timestamps from subsequent chains being
* consolidated are intentionally discarded; only this chain's
* timestamp is preserved as it contains the oldest data.
*/
size_t old_off = chain->off;
if (CHAIN_SPACE_LEN(chain) < size - chain->off) {
/* not enough room at end of chunk. */
Expand All @@ -1397,6 +1421,11 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
size -= old_off;
chain = chain->next;
} else if (chain->buffer_len - chain->misalign >= (size_t)size) {
/* Sufficient space case: expand in-place without reallocation
* by appending data from subsequent chains. Timestamps from
* subsequent chains being consolidated are intentionally discarded;
* only this chain's timestamp is preserved.
*/
/* already have enough space in the first chain */
size_t old_off = chain->off;
buffer = chain->buffer + chain->misalign + chain->off;
Expand All @@ -1411,11 +1440,21 @@ evbuffer_pullup(struct evbuffer *buf, ev_ssize_t size)
}
buffer = tmp->buffer;
tmp->off = size;
/* Preserve timestamp from the original first chain */
if (chain->timestamp.valid) {
tmp->timestamp = chain->timestamp;
}
buf->first = tmp;
}

/* TODO(niels): deal with buffers that point to NULL like sendfile */

/* Note: When consolidating multiple chains into one during pullup,
* only the timestamp from the first (oldest) chain is preserved.
* Timestamps from subsequent chains are intentionally discarded.
* This design choice keeps the API simple by tracking only the
* receipt time of the oldest data in the buffer.
*/
/* Copy and free every chunk that will be entirely pulled into tmp */
last_with_data = *buf->last_with_datap;
for (; chain != NULL && (size_t)size >= chain->off; chain = next) {
Expand Down Expand Up @@ -2409,6 +2448,175 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch)
return result;
}

#if defined(_WIN32) || !defined(USE_IOVEC_IMPL)

/* Windows stub: SO_TIMESTAMPING not supported on Windows */
int
evbuffer_read_with_timestamp(
struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
return evbuffer_read(buf, fd, howmuch);
}

#else

int
evbuffer_read_with_timestamp(
struct evbuffer *buf, evutil_socket_t fd, int howmuch)
{
struct evbuffer_chain **chainp;
int n;
int result;
int nvecs;
int i;
int remaining;
IOV_TYPE vecs[NUM_READ_IOVEC];
struct msghdr msg;
/* Control message buffer for cmsg data.
* Sized to accommodate timestamp messages (SCM_TIMESTAMPNS,
* SCM_TIMESTAMP) plus additional space for other possible cmsg
* entries (SCM_RIGHTS, SCM_CREDENTIALS, etc.) to prevent truncation
* via MSG_CTRUNC which would silently lose timestamp information. */
#define EVBUFFER_RECVMSG_CTRLFN_SZ \
(CMSG_SPACE(sizeof(struct timespec)) + \
CMSG_SPACE(sizeof(struct timeval)) + 256)
unsigned char control[EVBUFFER_RECVMSG_CTRLFN_SZ];
#undef EVBUFFER_RECVMSG_CTRLFN_SZ

struct timespec ts;
int ts_found = 0;
memset(&ts, 0, sizeof(ts));

EVBUFFER_LOCK(buf);

if (buf->freeze_end) {
result = -1;
goto done;
}

if (howmuch < 0 || howmuch > EVBUFFER_MAX_READ) {
howmuch = EVBUFFER_MAX_READ;
}

/* Since we can use iovecs, we're willing to use the last
* NUM_READ_IOVEC chains. */
if (evbuffer_expand_fast_(buf, howmuch, NUM_READ_IOVEC) == -1) {
result = -1;
goto done;
}

nvecs = evbuffer_read_setup_vecs_(
buf, howmuch, vecs, NUM_READ_IOVEC, &chainp, 1);

/* Setup message header */
memset(&msg, 0, sizeof(msg));
msg.msg_iov = vecs;
msg.msg_iovlen = nvecs;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

/* Receive with ancillary data */
n = recvmsg(fd, &msg, 0);

if (n == -1) {
result = -1;
goto done;
}
if (n == 0) {
result = 0;
goto done;
}

/* Check if control data was truncated */
if (!(msg.msg_flags & MSG_CTRUNC)) {
struct cmsghdr *cmsg;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET)
continue;
#if EVENT__HAVE_DECL_SO_TIMESTAMPNS
if (cmsg->cmsg_type == SCM_TIMESTAMPNS) {
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct timespec)))
continue;
ts = *(struct timespec *)CMSG_DATA(cmsg);
ts_found = 1;
break;
}
#endif

#if EVENT__HAVE_DECL_SO_TIMESTAMP
if (cmsg->cmsg_type == SCM_TIMESTAMP) {
struct timeval *tv;
if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct timeval)))
continue;
tv = (struct timeval *)CMSG_DATA(cmsg);
ts.tv_sec = tv->tv_sec;
ts.tv_nsec = tv->tv_usec * 1000L;
ts_found = 1;
break;
}
#endif
}
}

remaining = n;
for (i=0; i < nvecs; ++i) {
/* can't overflow, since only mutable chains have
* huge misaligns. */
size_t space = (size_t) CHAIN_SPACE_LEN(*chainp);
/* XXXX This is a kludge that can waste space in perverse
* situations. */
if (space > EVBUFFER_CHAIN_MAX)
space = EVBUFFER_CHAIN_MAX;
if ((ev_ssize_t)space < remaining) {
(*chainp)->off += space;
remaining -= (int)space;
if (ts_found && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = ts;
(*chainp)->timestamp.valid = 1;
}
} else {
(*chainp)->off += remaining;
if (ts_found && (*chainp)->timestamp.valid == 0) {
(*chainp)->timestamp.ts = ts;
(*chainp)->timestamp.valid = 1;
}
buf->last_with_datap = chainp;
break;
}
chainp = &(*chainp)->next;
}

buf->total_len += n;
buf->n_add_for_cb += n;

/* Tell someone about changes in this buffer */
evbuffer_invoke_callbacks_(buf);
result = n;
done:
EVBUFFER_UNLOCK(buf);
return result;
}

#endif

int evbuffer_get_timestamp(
struct evbuffer *buf, struct timespec *timestamp)
{
int result = -1;
if (!timestamp) {
return -1;
}
EVBUFFER_LOCK(buf);
{
if (buf->first && buf->first->timestamp.valid) {
*timestamp = buf->first->timestamp.ts;
result = 0;
}
}
EVBUFFER_UNLOCK(buf);
return result;
}

#ifdef USE_IOVEC_IMPL
static inline int
evbuffer_write_iovec(struct evbuffer *buffer, evutil_socket_t fd,
Expand Down
6 changes: 6 additions & 0 deletions bufferevent-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ struct bufferevent_private {
} conn_address;

struct evdns_getaddrinfo_request *dns_request;

/** Flag: set if receive timestamps are enabled */
unsigned recv_timestamps_enabled : 1;
};

/** Possible operations for a control callback. */
Expand Down Expand Up @@ -452,6 +455,9 @@ EVENT2_EXPORT_SYMBOL
void
bufferevent_socket_set_conn_address_(struct bufferevent *bev, struct sockaddr *addr, size_t addrlen);

EVENT2_EXPORT_SYMBOL
int be_socket_enable_timestamps_(evutil_socket_t fd);


/** Internal use: We have just successfully read data into an inbuf, so
* reset the read timeout (if any). */
Expand Down
Loading