Kill misbehaving subscribed clients instead of hanging

This change only affects clients that are subscribed to events, which
should be the main cause of our problems.

In the common case (no buffered data) the behaviour doesn't change at
all: the message is sent directly, no ev_io / ev_timeout callback is
enabled. Once a write to a client's socket is not completed fully
(returns with EAGAIN error), we put the message in the tail of a queue
and init an ev_io callback and a corresponding timer. If the timer is
triggered first, the socket is closed and the client connection is
removed. If the socket becomes writeable before the timeout we either
reset the timer if we couldn't push all the buffered data or completely
remove it if everything was pushed.

We could also replace ipc_send_message() for all client connections in
i3, not just those subscribed to events.

Furthermore, we could limit the amount of messages stored and increase
the timeout (or use multiple timeouts): eg it's ok if a client is not
reading for 10 seconds and we are only holding 5KB of messages for them
but it is not ok if they are inactive for 5 seconds and we have 30MB of
messages held.

Closes #2999
Closes #2539
This commit is contained in:
Orestis Floros
2018-04-23 12:20:05 +03:00
parent b0bbe53d04
commit 37d0105c83
10 changed files with 287 additions and 5 deletions

162
src/ipc.c
View File

@ -38,9 +38,39 @@ static void set_nonblock(int sockfd) {
err(-1, "Could not set O_NONBLOCK");
}
/*
* Given a message and a message type, create the corresponding header, merge it
* with the message and append it to the given client's output buffer.
*
*/
static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) {
const size_t size = strlen(payload);
const i3_ipc_header_t header = {
.magic = {'i', '3', '-', 'i', 'p', 'c'},
.size = size,
.type = message_type};
const size_t header_size = sizeof(i3_ipc_header_t);
const size_t message_size = header_size + size;
client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
memcpy(client->buffer + client->buffer_size + header_size, payload, size);
client->buffer_size += message_size;
}
static void free_ipc_client(ipc_client *client) {
close(client->fd);
for (int i = 0; i < client->num_events; i++){
ev_io_stop(main_loop, client->callback);
FREE(client->callback);
if (client->timeout) {
ev_timer_stop(main_loop, client->timeout);
FREE(client->timeout);
}
free(client->buffer);
for (int i = 0; i < client->num_events; i++) {
free(client->events[i]);
}
free(client->events);
@ -48,6 +78,68 @@ static void free_ipc_client(ipc_client *client) {
free(client);
}
static void ipc_client_timeout(EV_P_ ev_timer *w, int revents);
static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents);
static ev_tstamp kill_timeout = 10.0;
void ipc_set_kill_timeout(ev_tstamp new) {
kill_timeout = new;
}
/*
* Try to write the contents of the pending buffer to the client's subscription
* socket. Will set, reset or clear the timeout and io callbacks depending on
* the result of the write operation.
*
*/
static void ipc_push_pending(ipc_client *client) {
const ssize_t result = writeall_nonblock(client->fd, client->buffer, client->buffer_size);
if (result < 0) {
return;
}
if ((size_t)result == client->buffer_size) {
/* Everything was written successfully: clear the timer and stop the io
* callback. */
FREE(client->buffer);
client->buffer_size = 0;
if (client->timeout) {
ev_timer_stop(main_loop, client->timeout);
FREE(client->timeout);
}
ev_io_stop(main_loop, client->callback);
return;
}
/* Otherwise, make sure that the io callback is enabled and create a new
* timer if needed. */
ev_io_start(main_loop, client->callback);
if (!client->timeout) {
struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer));
ev_timer_init(timeout, ipc_client_timeout, kill_timeout, 0.);
timeout->data = client;
client->timeout = timeout;
ev_set_priority(timeout, EV_MINPRI);
ev_timer_start(main_loop, client->timeout);
} else if (result > 0) {
/* Keep the old timeout when nothing is written. Otherwise, we would
* keep a dead connection by continuously renewing its timeouts. */
ev_timer_stop(main_loop, client->timeout);
ev_timer_set(client->timeout, kill_timeout, 0.0);
ev_timer_start(main_loop, client->timeout);
}
if (result == 0) {
return;
}
/* Shift the buffer to the left and reduce the allocated space. */
client->buffer_size -= (size_t)result;
memmove(client->buffer, client->buffer + result, client->buffer_size);
client->buffer = srealloc(client->buffer, client->buffer_size);
}
/*
* Sends the specified event to all IPC clients which are currently connected
* and subscribed to this kind of event.
@ -67,7 +159,11 @@ void ipc_send_event(const char *event, uint32_t message_type, const char *payloa
if (!interested)
continue;
ipc_send_message(current->fd, strlen(payload), message_type, (const uint8_t *)payload);
const bool push_now = (current->buffer_size == 0);
append_payload(current, message_type, payload);
if (push_now) {
ipc_push_pending(current);
}
}
}
@ -1286,6 +1382,60 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
FREE(message);
}
static void ipc_client_timeout(EV_P_ ev_timer *w, int revents) {
/* No need to be polite and check for writeability, the other callback would
* have been called by now. */
ipc_client *client = (ipc_client *)w->data;
char *cmdline = NULL;
#if defined(__linux__) && defined(SO_PEERCRED)
struct ucred peercred;
socklen_t so_len = sizeof(peercred);
if (getsockopt(client->fd, SOL_SOCKET, SO_PEERCRED, &peercred, &so_len) != 0) {
goto end;
}
char *exepath;
sasprintf(&exepath, "/proc/%d/cmdline", peercred.pid);
int fd = open(exepath, O_RDONLY);
free(exepath);
if (fd == -1) {
goto end;
}
char buf[512] = {'\0'}; /* cut off cmdline for the error message. */
const ssize_t n = read(fd, buf, sizeof(buf));
close(fd);
if (n < 0) {
goto end;
}
for (char *walk = buf; walk < buf + n - 1; walk++) {
if (*walk == '\0') {
*walk = ' ';
}
}
cmdline = buf;
#endif
end:
if (cmdline) {
ELOG("client %p with pid %d and cmdline '%s' on fd %d timed out, killing\n", client, peercred.pid, cmdline, client->fd);
} else {
ELOG("client %p on fd %d timed out, killing\n", client, client->fd);
}
free_ipc_client(client);
}
static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) {
DLOG("fd %d writeable\n", w->fd);
ipc_client *client = (ipc_client *)w->data;
/* If this callback is called then there should be a corresponding active
* timer. */
assert(client->timeout != NULL);
ipc_push_pending(client);
}
/*
* Handler for activity on the listening socket, meaning that a new client
* has just connected and we should accept() him. Sets up the event handler
@ -1314,10 +1464,16 @@ void ipc_new_client(EV_P_ struct ev_io *w, int revents) {
ev_io_init(package, ipc_receive_message, client, EV_READ);
ev_io_start(EV_A_ package);
ipc_client *new = scalloc(1, sizeof(ipc_client));
package = scalloc(1, sizeof(struct ev_io));
package->data = new;
ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE);
DLOG("IPC: new client connected on fd %d\n", w->fd);
ipc_client *new = scalloc(1, sizeof(ipc_client));
new->fd = client;
new->callback = package;
TAILQ_INSERT_TAIL(&all_clients, new, clients);
}