diff --git a/Android.bp b/Android.bp index 5fa331e466d38a7f27c14e2428eee493dfafe922..fa190bcd4036e624021fcdc2463a23cb81cfd00b 100644 --- a/Android.bp +++ b/Android.bp @@ -6,15 +6,22 @@ cc_library { name: "liburingutils", srcs: [ - "src/LibUringUtils.cpp", + "src/IOUringSocketHandler.cpp", ], - cflags: ["-Werror"], + cflags: [ + "-Wall", + "-Werror", + ], export_include_dirs: ["include"], + static_libs: [ + "liburing", + ], shared_libs: [ "libbase", + "liblog", ], tidy: true, @@ -32,12 +39,15 @@ cc_library { } cc_test { - name: "liburingutils_tests", + name: "IOUringSocketHandler_tests", test_suites: ["device-tests"], srcs: [ - "src/LibUringUtils_test.cpp", + "src/IOUringSocketHandler_test.cpp", ], - shared_libs: [ + static_libs: [ + "liburing", "liburingutils", + "libbase", + "liblog", ], } diff --git a/include/IOUringSocketHandler/IOUringSocketHandler.h b/include/IOUringSocketHandler/IOUringSocketHandler.h new file mode 100644 index 0000000000000000000000000000000000000000..6322ef0e803e8b734285762633bec2237e499aa9 --- /dev/null +++ b/include/IOUringSocketHandler/IOUringSocketHandler.h @@ -0,0 +1,174 @@ +/* + * Copyright (C) 2025 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include <liburing.h> + +/* + * IOUringSocketHandler is a helper class for using io_uring with a socket. + * + * Typical usage from a given thread: + * + * As a one time setup: + * 1. Create an instance of IOUringSocketHandler with the socket file descriptor. + * 2. Setup io_uring ring buffer. + * 3. Allocate buffers for the ring buffer. + * 4. Register buffers with io_uring. + * 5. EnqueueMultishotRecvmsg() will submit the SQE to receive the data + * + * In the I/O path: + * + * 6. Receive data from the socket through ReceiveData() + * 7. Release the buffer to io_uring. + * + * Note that the thread which sets up the io_uring instance should handle the + * I/O through ReceiveData() call. + */ + +class IOUringSocketHandler { +public: + IOUringSocketHandler(int socket_fd); + ~IOUringSocketHandler(); + + // Setup io_uring ring buffer + // queue_size: The size of the io_uring submission queue. + // Determines the maximum number of outstanding I/O requests. + // return: true on success, false on failure (e.g., if io_uring_setup fails). + // + // This function initializes the io_uring context and sets up the submission + // and completion queues. It prepares the io_uring instance for I/O operations. + // A larger queue_size allows for more concurrent I/O operations but consumes + // more memory. + bool SetupIoUring(int queue_size); + + // Allocate 'num_buffers' of size 'buf_size' + // + // num_buffers: The number of buffers to allocate - Should be power of 2. + // buf_size: The size of each buffer in bytes. + // + // This function allocates a set of buffers that will be used for I/O operations + // with io_uring. These buffers are typically used to hold data that is read from + // or written to files or sockets. The allocated buffers are managed internally + // and are later registered with io_uring. + // + // The num_buffers will be the payload for the caller. Internally, it + // allocates additional metadata: + // a: sizeof(struct ucred) + sizeof(struct cmsghdr) + // b: sizeof(struct io_uring_recvmsg_out) + // This allows sender to send the ucred credential information if required. + // + // This function also registers the allocated buffers with the io_uring instance. + // Registering buffers allows the kernel to access them directly, avoiding the need + // to copy data between user space and kernel space during I/O operations. This + // improves performance. + // + // Please see additional details on how num_buffers will be used + // by the io_uring: https://man7.org/linux/man-pages/man3/io_uring_setup_buf_ring.3.html + bool AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size); + + // Free up registered buffers with the io_uring instance. + // + // All the buffers allocated using AllocateAndRegisterBuffers() API will be + // freed and de-registered. Callers can then call + // AllocateAndRegisterBuffers() to re-register new set of bufferes with the + // ring. + void DeRegisterBuffers(); + + // ARM io_uring recvmsg opcode + // + // return: true on success, false on failure (e.g., if submission queue is full). + // + // This function enqueues a "multishot recvmsg" operation into the io_uring submission queue. + // Multishot recvmsg allows receiving multiple messages from a socket with a single + // io_uring submission. The function prepares the submission queue + // entry (SQE) for the recvmsg operation. + bool EnqueueMultishotRecvmsg(); + + // Release the buffer to io_uring + // + // This function releases a buffer back to the io_uring subsystem after it has been + // used for an I/O operation. This makes the buffer available for reuse in subsequent + // I/O operations. + // + // Additionally, when the buffer is released, a check is done to see if + // there are more CQE entries available. If not, EnqueueMultishotRecvmsg() + // is invoked so that the SQE submission is done for receiving next set of + // I/O. + void ReleaseBuffer(); + + // Receive payload data of size payload_len. Additionally, receive + // credential data. + // + // payload: A pointer to a void pointer. This will be set to point to the received + // payload data. + // + // payload_len: A reference to a size_t. This will be set to the length of the + // received payload data. + // + // cred: A pointer to a struct ucred pointer. This will be set to point to the + // user credentials associated with the received data (if available). + // If the sender doesn't have credential information in the payload, + // then nullptr will be returned. + // + // This function retrieves the data received from a recvmsg operation. It extracts the payload + // data and its length, as well as the user credentials associated with the sender. The + // caller is responsible for freeing the allocated memory for the payload and credentials + // when they are no longer needed. + void ReceiveData(void** payload, size_t& payload_len, struct ucred** cred); + + // check if io_uring is supported + // + // return: true if io_uring is supported by the kernel, false otherwise. + // + // This function checks if the io_uring feature is supported by the underlying Linux kernel. + static bool isIouringEnabled(); + +private: + static bool isIouringSupportedByKernel(); + // Register buffers with io_uring + // + // return: true on success, false on failure (e.g., if io_uring_register_buffers fails). + // + // This function registers the previously allocated buffers with the io_uring instance. + // Registering buffers allows the kernel to access them directly, avoiding the need + // to copy data between user space and kernel space during I/O operations. This + // improves performance. + bool RegisterBuffers(); + + struct uring_context { + struct io_uring ring; + }; + // Socket fd + int socket_; + std::unique_ptr<uring_context> mCtx; + std::vector<std::unique_ptr<uint8_t[]>> buffers_; + struct msghdr msg; + int control_len_; + size_t num_buffers_ = 0; + int buffer_size_; + int active_buffer_id_ = -1; + struct io_uring_cqe* cqe; + // A constant buffer group id as we don't support multiple buffer groups + // yet. + const int bgid_ = 7; + struct io_uring_buf_ring* br_; + bool registered_buffers_ = false; + bool ring_setup_ = false; +}; diff --git a/include/liburingutils/LibUringUtils.h b/include/liburingutils/LibUringUtils.h deleted file mode 100644 index a6dbefc8909d6774601bb9ca01539f8b76c53639..0000000000000000000000000000000000000000 --- a/include/liburingutils/LibUringUtils.h +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (C) 2025 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef __LIBURING_UTILS_H -#define __LIBURING_UTILS_H - -class LibUringUtils { -public: - static bool isIouringEnabled(); - -private: - static bool isIouringSupportedByKernel(); -}; - -#endif diff --git a/src/IOUringSocketHandler.cpp b/src/IOUringSocketHandler.cpp new file mode 100644 index 0000000000000000000000000000000000000000..253158b4f7302458d417c1220356b89363872f26 --- /dev/null +++ b/src/IOUringSocketHandler.cpp @@ -0,0 +1,207 @@ +/* + * Copyright (C) 2025 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#define LOG_TAG "IOUringSocketHandler" + +#include <sys/resource.h> +#include <sys/utsname.h> +#include <unistd.h> + +#include <limits.h> +#include <linux/time_types.h> +#include <sys/cdefs.h> +#include <sys/prctl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/un.h> +#include <unistd.h> + +#include <chrono> +#include <thread> + +#include <cutils/sockets.h> +#include <private/android_logger.h> + +#include <IOUringSocketHandler/IOUringSocketHandler.h> + +#include <android-base/logging.h> +#include <android-base/scopeguard.h> + +bool IOUringSocketHandler::isIouringEnabled() { + return isIouringSupportedByKernel(); +} + +bool IOUringSocketHandler::isIouringSupportedByKernel() { + struct utsname uts {}; + unsigned int major, minor; + + uname(&uts); + if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) { + return false; + } + + // We will only support kernels from 6.1 and higher. + return major > 6 || (major == 6 && minor >= 1); +} + +IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {} + +IOUringSocketHandler::~IOUringSocketHandler() { + DeRegisterBuffers(); + if (ring_setup_) { + io_uring_queue_exit(&mCtx->ring); + } +} + +bool IOUringSocketHandler::EnqueueMultishotRecvmsg() { + struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring); + memset(&msg, 0, sizeof(msg)); + msg.msg_controllen = control_len_; + io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0); + sqe->flags |= IOSQE_BUFFER_SELECT; + sqe->buf_group = bgid_; + int ret = io_uring_submit(&mCtx->ring); + if (ret < 0) { + LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret; + return false; + } + return true; +} + +bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) { + num_buffers_ = num_buffers; + control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr); + + buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size; + + for (size_t i = 0; i < num_buffers_; i++) { + std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_); + buffers_.push_back(std::move(buffer)); + } + return RegisterBuffers(); +} + +bool IOUringSocketHandler::RegisterBuffers() { + int ret = 0; + br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret); + if (!br_) { + LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret; + return false; + } + for (size_t i = 0; i < num_buffers_; i++) { + void* buffer = buffers_[i].get(); + io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_), + i); + } + io_uring_buf_ring_advance(br_, num_buffers_); + LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_; + registered_buffers_ = true; + return true; +} + +void IOUringSocketHandler::DeRegisterBuffers() { + if (registered_buffers_) { + io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_); + registered_buffers_ = false; + } + buffers_.clear(); + num_buffers_ = 0; + control_len_ = 0; + buffer_size_ = 0; +} + +bool IOUringSocketHandler::SetupIoUring(int queue_size) { + mCtx = std::unique_ptr<uring_context>(new uring_context()); + struct io_uring_params params = {}; + + // COOP_TASKRUN - No IPI to logd + // SINGLE_ISSUER - Only one thread is doing the work on the ring + // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required + // DEFER_TASKRUN - trigger task work when CQE is explicitly polled + params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | + IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN); + + int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, ¶ms); + if (ret) { + LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret; + return false; + } else { + LOG(INFO) << "io_uring_queue_init_params success"; + } + + ring_setup_ = true; + return true; +} + +void IOUringSocketHandler::ReleaseBuffer() { + if (active_buffer_id_ == -1) { + return; + } + + // Put the buffer back to the pool + io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_, + io_uring_buf_ring_mask(num_buffers_), 0); + io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1); + active_buffer_id_ = -1; + + // If there are no more CQE data, re-arm the SQE + bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE); + if (!is_more_cqe) { + EnqueueMultishotRecvmsg(); + } +} + +void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) { + if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) { + int ret = io_uring_wait_cqe(&mCtx->ring, &cqe); + if (ret) { + LOG(ERROR) << "WaitCqe failed: " << ret; + EnqueueMultishotRecvmsg(); + return; + } + } + + if (cqe->res < 0) { + io_uring_cqe_seen(&mCtx->ring, cqe); + EnqueueMultishotRecvmsg(); + return; + } + + active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT; + + void* this_recv = buffers_[active_buffer_id_].get(); + struct io_uring_recvmsg_out* o = io_uring_recvmsg_validate(this_recv, cqe->res, &msg); + + if (!o) { + return; + } + + struct cmsghdr* cmsg; + cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg); + + struct ucred* cr = nullptr; + while (cmsg != nullptr) { + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) { + cr = (struct ucred*)CMSG_DATA(cmsg); + break; + } + cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg); + } + + *payload = io_uring_recvmsg_payload(o, &msg); + payload_len = io_uring_recvmsg_payload_length(o, cqe->res, &msg); + *cred = cr; +} diff --git a/src/IOUringSocketHandler_test.cpp b/src/IOUringSocketHandler_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bd352cffb1190b63257af4ce03191069ec0b19d5 --- /dev/null +++ b/src/IOUringSocketHandler_test.cpp @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2025 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <IOUringSocketHandler/IOUringSocketHandler.h> + +#include <gtest/gtest.h> + +class IOUringSocketHandlerTest : public testing::Test { +public: + bool IsIouringEnabled() { + return IOUringSocketHandler::isIouringEnabled(); + } + +protected: + std::unique_ptr<IOUringSocketHandler> handler_; + void InitializeHandler(int socket_fd = 1); + int queue_depth_ = 10; +}; + +void IOUringSocketHandlerTest::InitializeHandler(int socket_fd) { + handler_ = std::make_unique<IOUringSocketHandler>(socket_fd); +} + +TEST_F(IOUringSocketHandlerTest, SetupIoUring) { + if (!IsIouringEnabled()) { + return; + } + InitializeHandler(); + EXPECT_TRUE(handler_->SetupIoUring(queue_depth_)); +} + +TEST_F(IOUringSocketHandlerTest, AllocateAndRegisterBuffers) { + if (!IsIouringEnabled()) { + return; + } + InitializeHandler(); + EXPECT_TRUE(handler_->SetupIoUring(queue_depth_)); + EXPECT_TRUE(handler_->AllocateAndRegisterBuffers(8, 4096)); +} + +TEST_F(IOUringSocketHandlerTest, MultipleAllocateAndRegisterBuffers) { + if (!IsIouringEnabled()) { + return; + } + InitializeHandler(); + + EXPECT_TRUE(handler_->SetupIoUring(queue_depth_)); + + EXPECT_TRUE(handler_->AllocateAndRegisterBuffers(4, 4096)); + handler_->DeRegisterBuffers(); + + EXPECT_TRUE(handler_->AllocateAndRegisterBuffers(2, 1024*1024L)); + handler_->DeRegisterBuffers(); + + EXPECT_TRUE(handler_->AllocateAndRegisterBuffers(32, 1024)); + handler_->DeRegisterBuffers(); + + // num_buffers should be power of 2 + EXPECT_FALSE(handler_->AllocateAndRegisterBuffers(5, 4096)); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/LibUringUtils.cpp b/src/LibUringUtils.cpp deleted file mode 100644 index 25151d35cd0f0ec4acdfef25ee40740c756c37fb..0000000000000000000000000000000000000000 --- a/src/LibUringUtils.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2025 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#define LOG_TAG "LibUringUtils" - -#include <android-base/strings.h> -#include <liburingutils/LibUringUtils.h> -#include <sys/resource.h> -#include <sys/utsname.h> -#include <unistd.h> - -bool LibUringUtils::isIouringEnabled() { - // TODO: b/385143770 - Change this behavior to also check the Liburing version. - return isIouringSupportedByKernel(); -} - -bool LibUringUtils::isIouringSupportedByKernel() { - struct utsname uts {}; - unsigned int major, minor; - - uname(&uts); - if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) { - return false; - } - - // We will only support kernels from 6.1 and higher. - return major > 6 || (major == 6 && minor >= 1); -} diff --git a/src/LibUringUtils_test.cpp b/src/LibUringUtils_test.cpp deleted file mode 100644 index aff520640ea67d416c683934e274687021779f6f..0000000000000000000000000000000000000000 --- a/src/LibUringUtils_test.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2025 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <liburingutils/LibUringUtils.h> - -#include <gtest/gtest.h> - -class LibUringUtilsTest : public testing::Test { -public: - void testIsIouringEnabled(bool expectedResult) { - EXPECT_EQ(LibUringUtils::isIouringEnabled(), expectedResult); - } -}; - -TEST_F(LibUringUtilsTest, ReturnsIouringNotEnabled) { - // TODO: b/385143770 - Change this test to base on the real OS version, - // this default expected value is true for the binary built from the - // latest version of source code. - testIsIouringEnabled(true); -}