From 1c5a96c112eea193f30360eae831de2e54a32ac2 Mon Sep 17 00:00:00 2001 From: Ilya Lukyanov Date: Tue, 8 Aug 2017 19:23:50 +0300 Subject: [PATCH] Implement nonblocking IO in IPC server Added client write buffer and handler for writable status on client socket. --- sway/ipc-server.c | 101 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 96 insertions(+), 5 deletions(-) diff --git a/sway/ipc-server.c b/sway/ipc-server.c index dca881fa..46a37225 100644 --- a/sway/ipc-server.c +++ b/sway/ipc-server.c @@ -40,11 +40,15 @@ static const char ipc_magic[] = {'i', '3', '-', 'i', 'p', 'c'}; struct ipc_client { struct wlc_event_source *event_source; + struct wlc_event_source *writable_event_source; int fd; uint32_t payload_length; uint32_t security_policy; enum ipc_command_type current_command; enum ipc_command_type subscribed_events; + size_t write_buffer_len; + size_t write_buffer_size; + char *write_buffer; }; static list_t *ipc_get_pixel_requests = NULL; @@ -58,6 +62,7 @@ struct get_pixels_request { struct sockaddr_un *ipc_user_sockaddr(void); int ipc_handle_connection(int fd, uint32_t mask, void *data); int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data); +int ipc_client_handle_writable(int client_fd, uint32_t mask, void *data); void ipc_client_disconnect(struct ipc_client *client); void ipc_client_handle_command(struct ipc_client *client); bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t payload_length); @@ -168,6 +173,12 @@ int ipc_handle_connection(int fd, uint32_t mask, void *data) { close(client_fd); return 0; } + if ((flags = fcntl(client_fd, F_GETFL)) == -1 + || fcntl(client_fd, F_SETFL, flags|O_NONBLOCK) == -1) { + sway_log_errno(L_ERROR, "Unable to set NONBLOCK on IPC client socket"); + close(client_fd); + return 0; + } struct ipc_client* client = malloc(sizeof(struct ipc_client)); if (!client) { @@ -179,10 +190,22 @@ int ipc_handle_connection(int fd, uint32_t mask, void *data) { client->fd = client_fd; client->subscribed_events = 0; client->event_source = wlc_event_loop_add_fd(client_fd, WLC_EVENT_READABLE, ipc_client_handle_readable, client); + client->writable_event_source = NULL; + + client->write_buffer_size = 128; + client->write_buffer_len = 0; + client->write_buffer = malloc(client->write_buffer_size); + if (!client->write_buffer) { + sway_log(L_ERROR, "Unable to allocate ipc client write buffer"); + close(client_fd); + return 0; + } pid_t pid = get_client_pid(client->fd); client->security_policy = get_ipc_policy_mask(pid); + sway_log(L_DEBUG, "New client: fd %d, pid %d", client_fd, pid); + list_add(ipc_client_list, client); return 0; @@ -205,6 +228,8 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) { return 0; } + sway_log(L_DEBUG, "Client %d readable", client->fd); + int read_available; if (ioctl(client_fd, FIONREAD, &read_available) == -1) { sway_log_errno(L_INFO, "Unable to read IPC socket buffer size"); @@ -226,6 +251,7 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) { uint8_t buf[ipc_header_size]; uint32_t *buf32 = (uint32_t*)(buf + sizeof(ipc_magic)); + // Should be fully available, because read_available >= ipc_header_size ssize_t received = recv(client_fd, buf, ipc_header_size, 0); if (received == -1) { sway_log_errno(L_INFO, "Unable to receive header from IPC client"); @@ -249,6 +275,48 @@ int ipc_client_handle_readable(int client_fd, uint32_t mask, void *data) { return 0; } +int ipc_client_handle_writable(int client_fd, uint32_t mask, void *data) { + struct ipc_client *client = data; + + if (mask & WLC_EVENT_ERROR) { + sway_log(L_ERROR, "IPC Client socket error, removing client"); + ipc_client_disconnect(client); + return 0; + } + + if (mask & WLC_EVENT_HANGUP) { + sway_log(L_DEBUG, "Client %d hung up", client->fd); + ipc_client_disconnect(client); + return 0; + } + + if (client->write_buffer_len <= 0) { + return 0; + } + + sway_log(L_DEBUG, "Client %d writable", client->fd); + + ssize_t written = write(client->fd, client->write_buffer, client->write_buffer_len); + + if (written == -1 && errno == EAGAIN) { + return 0; + } else if (written == -1) { + sway_log_errno(L_INFO, "Unable to send data from queue to IPC client"); + ipc_client_disconnect(client); + return 0; + } + + memmove(client->write_buffer, client->write_buffer + written, client->write_buffer_len - written); + client->write_buffer_len -= written; + + if (client->write_buffer_len == 0 && client->writable_event_source) { + wlc_event_source_remove(client->writable_event_source); + client->writable_event_source = NULL; + } + + return 0; +} + void ipc_client_disconnect(struct ipc_client *client) { if (!sway_assert(client != NULL, "client != NULL")) { return; @@ -260,9 +328,13 @@ void ipc_client_disconnect(struct ipc_client *client) { sway_log(L_INFO, "IPC Client %d disconnected", client->fd); wlc_event_source_remove(client->event_source); + if (client->writable_event_source) { + wlc_event_source_remove(client->writable_event_source); + } int i = 0; while (i < ipc_client_list->length && ipc_client_list->items[i] != client) i++; list_del(ipc_client_list, i); + free(client->write_buffer); close(client->fd); free(client); } @@ -334,6 +406,7 @@ void ipc_client_handle_command(struct ipc_client *client) { return; } if (client->payload_length > 0) { + // Payload should be fully available ssize_t received = recv(client->fd, buf, client->payload_length, 0); if (received == -1) { @@ -590,17 +663,35 @@ bool ipc_send_reply(struct ipc_client *client, const char *payload, uint32_t pay data32[0] = payload_length; data32[1] = client->current_command; - if (write(client->fd, data, ipc_header_size) == -1) { - sway_log_errno(L_INFO, "Unable to send header to IPC client"); + while (client->write_buffer_len + ipc_header_size + payload_length >= + client->write_buffer_size) { + client->write_buffer_size *= 2; + } + + if (client->write_buffer_size > (1 << 22)) { // 4 MB + sway_log(L_ERROR, "Client write buffer too big, disconnecting client"); + ipc_client_disconnect(client); return false; } - if (write(client->fd, payload, payload_length) == -1) { - sway_log_errno(L_INFO, "Unable to send payload to IPC client"); + char *new_buffer = realloc(client->write_buffer, client->write_buffer_size); + if (!new_buffer) { + sway_log(L_ERROR, "Unable to reallocate ipc client write buffer"); + ipc_client_disconnect(client); return false; } + client->write_buffer = new_buffer; - sway_log(L_DEBUG, "Send IPC reply: %s", payload); + memcpy(client->write_buffer + client->write_buffer_len, data, ipc_header_size); + client->write_buffer_len += ipc_header_size; + memcpy(client->write_buffer + client->write_buffer_len, payload, payload_length); + client->write_buffer_len += payload_length; + + if (!client->writable_event_source) { + client->writable_event_source = wlc_event_loop_add_fd(client->fd, WLC_EVENT_WRITABLE, ipc_client_handle_writable, client); + } + + sway_log(L_DEBUG, "Added IPC reply to client %d queue: %s", client->fd, payload); return true; }