Subject
This POC app uses Boost ASIO with coroutines to serve HTTP requests. Once a request has been read completely, the connection handler outsources the actual request handling into a separate thread pool for CPU-bound ops and the connection handler coroutine gets paused. Once the response is done, that coroutine gets resumed to send it.
Problem
While being tested with wrk -d 30 -c 100 -t 100 http://127.0.0.1:8910/
the app fails sooner or later with:
grumpycathttpd: /usr/include/boost/coroutine/detail/push_coroutine_impl.hpp:258: void boost::coroutines::detail::push_coroutine_impl<void>::push(): Assertion `! is_running()' failed.
My Questions
- Is the app's concept (ASIO thread pool with coroutines + CPU-bound thread pool with queue) possible/realistic at all?
- If yes: What exactly am I doing wrong while pausing/resuming the coroutine? Shall I not use
async_completion
,completion_handler
andasio_handler_invoke
like the boost libs themselves do?
Code
AFAIK I should post the app's code – so here it is... I hope it doesn't matter that there are 223 lines.
IMO the important lines are 160, 193-198, 92-102 and 106-109.
/* 1 */ #include <condition_variable>
/* 2 */ using std::condition_variable;
/* 3 */
/* 4 */ #include <exception>
/* 5 */ using std::exception;
/* 6 */
/* 7 */ #include <functional>
/* 8 */ using std::function;
/* 9 */
/* 10 */ #include <iostream>
/* 11 */ using std::cout;
/* 12 */ using std::endl;
/* 13 */
/* 14 */ #include <limits>
/* 15 */ using std::numeric_limits;
/* 16 */
/* 17 */ #include <memory>
/* 18 */ using std::make_shared;
/* 19 */ using std::shared_ptr;
/* 20 */
/* 21 */ #include <mutex>
/* 22 */ using std::mutex;
/* 23 */ using std::unique_lock;
/* 24 */
/* 25 */ #include <queue>
/* 26 */ using std::queue;
/* 27 */
/* 28 */ #include <thread>
/* 29 */ using std::thread;
/* 30 */
/* 31 */ #include <utility>
/* 32 */ using std::move;
/* 33 */
/* 34 */ #include <vector>
/* 35 */ using std::vector;
/* 36 */
/* 37 */ #include <boost/asio/async_result.hpp>
/* 38 */ using boost::asio::async_completion;
/* 39 */
/* 40 */ #include <boost/asio/buffer.hpp>
/* 41 */ using boost::asio::mutable_buffer;
/* 42 */
/* 43 */ #include <boost/asio/buffered_stream.hpp>
/* 44 */ using boost::asio::buffered_stream;
/* 45 */
/* 46 */ #include <boost/asio/handler_invoke_hook.hpp>
/* 47 */ using boost::asio::asio_handler_invoke;
/* 48 */
/* 49 */ #include <boost/asio/io_service.hpp>
/* 50 */ using boost::asio::io_service;
/* 51 */
/* 52 */ #include <boost/asio/ip/tcp.hpp>
/* 53 */ using boost::asio::ip::tcp;
/* 54 */
/* 55 */ #include <boost/asio/spawn.hpp>
/* 56 */ using boost::asio::spawn;
/* 57 */
/* 58 */ #include <boost/asio/yield.hpp>
/* 59 */ using boost::asio::yield_context;
/* 60 */
/* 61 */ #include <boost/beast/core.hpp>
/* 62 */ namespace beast = boost::beast;
/* 63 */
/* 64 */ #include <boost/beast/http.hpp>
/* 65 */ namespace http = beast::http;
/* 66 */
/* 67 */ #include <boost/system/error_code.hpp>
/* 68 */ using boost::system::error_code;
/* 69 */
/* 70 */
/* 71 */ class work_queue {
/* 72 */ public:
/* 73 */ work_queue(io_service& io) : io(io), pool((vector<thread>::size_type)(thread::hardware_concurrency())), stop(false) {
/* 74 */ for (auto& thrd : pool) {
/* 75 */ thrd = thread([this](){ run(); });
/* 76 */ }
/* 77 */ }
/* 78 */
/* 79 */ ~work_queue() {
/* 80 */ {
/* 81 */ unique_lock<mutex> ul (mtx);
/* 82 */ stop = true;
/* 83 */ cond_var.notify_all();
/* 84 */ }
/* 85 */
/* 86 */ for (auto& thrd : pool) {
/* 87 */ thrd.join();
/* 88 */ }
/* 89 */ }
/* 90 */
/* 91 */ template<class Handler>
/* 92 */ void async_run(function<void()> task, Handler&& handler) {
/* 93 */ async_completion<Handler, void(error_code)> init(handler);
/* 94 */
/* 95 */ {
/* 96 */ auto completion_handler (make_shared<decltype(init.completion_handler)>(init.completion_handler));
/* 97 */ unique_lock<mutex> ul (mtx);
/* 98 */ tasks.emplace(enqueued_task({move(task), [completion_handler](){ asio_handler_invoke(*completion_handler); }}));
/* 99 */ cond_var.notify_all();
/* 100 */ }
/* 101 */
/* 102 */ init.result.get();
/* 103 */ }
/* 104 */
/* 105 */ private:
/* 106 */ struct enqueued_task {
/* 107 */ function<void()> task;
/* 108 */ function<void()> on_done;
/* 109 */ };
/* 110 */
/* 111 */ mutex mtx;
/* 112 */ condition_variable cond_var;
/* 113 */ io_service& io;
/* 114 */ queue<enqueued_task> tasks;
/* 115 */ vector<thread> pool;
/* 116 */ bool stop;
/* 117 */
/* 118 */ void run() {
/* 119 */ unique_lock<mutex> ul (mtx);
/* 120 */
/* 121 */ while (!stop) {
/* 122 */ while (!tasks.empty()) {
/* 123 */ auto task (move(tasks.front()));
/* 124 */ tasks.pop();
/* 125 */
/* 126 */ ul.unlock();
/* 127 */
/* 128 */ try {
/* 129 */ task.task();
/* 130 */ } catch (...) {
/* 131 */ }
/* 132 */
/* 133 */ io.post(move(task.on_done));
/* 134 */
/* 135 */ ul.lock();
/* 136 */ }
/* 137 */
/* 138 */ cond_var.wait(ul);
/* 139 */ }
/* 140 */ }
/* 141 */ };
/* 142 */
/* 143 */ int main() {
/* 144 */ vector<thread> pool ((vector<thread>::size_type)(thread::hardware_concurrency()));
/* 145 */ io_service io;
/* 146 */ work_queue wq (io);
/* 147 */ tcp::acceptor acceptor (io);
/* 148 */ tcp::endpoint endpoint (tcp::v6(), 8910);
/* 149 */
/* 150 */ acceptor.open(endpoint.protocol());
/* 151 */ acceptor.set_option(tcp::acceptor::reuse_address(true));
/* 152 */ acceptor.bind(endpoint);
/* 153 */ acceptor.listen(numeric_limits<int>::max());
/* 154 */
/* 155 */ spawn(acceptor.get_io_context(), [&acceptor, &wq](yield_context yc) {
/* 156 */ for (;;) {
/* 157 */ shared_ptr<tcp::socket> peer (new tcp::socket(acceptor.get_io_context()));
/* 158 */ acceptor.async_accept(*peer, yc);
/* 159 */
/* 160 */ spawn(acceptor.get_io_context(), [peer, &wq](yield_context yc) {
/* 161 */ try {
/* 162 */ {
/* 163 */ auto remote (peer->remote_endpoint());
/* 164 */ cout << "I has conn: [" << remote.address().to_string() << "]:" << remote.port() << endl;
/* 165 */ }
/* 166 */
/* 167 */ buffered_stream<decltype(*peer)> iobuf (*peer);
/* 168 */
/* 169 */ iobuf.async_fill(yc);
/* 170 */
/* 171 */ if (iobuf.in_avail() > 0) {
/* 172 */ char first_char;
/* 173 */
/* 174 */ {
/* 175 */ mutable_buffer first_char_buf (&first_char, 1);
/* 176 */ iobuf.peek(first_char_buf);
/* 177 */ }
/* 178 */
/* 179 */ if ('0' <= first_char && first_char <= '9') {
/* 180 */ cout << "I has JSON-RPC!" << endl;
/* 181 */ } else {
/* 182 */ beast::flat_buffer buf;
/* 183 */
/* 184 */ for (;;) {
/* 185 */ http::request<http::string_body> req;
/* 186 */
/* 187 */ http::async_read(iobuf, buf, req, yc);
/* 188 */
/* 189 */ cout << "I has req: '" << req.body() << '\'' << endl;
/* 190 */
/* 191 */ http::response<http::string_body> res;
/* 192 */
/* 193 */ wq.async_run([&req, &res](){
/* 194 */ res.result(http::status::internal_server_error);
/* 195 */ res.set(http::field::content_type, "text/plain");
/* 196 */ res.set(http::field::content_length, "36");
/* 197 */ res.body() = "I like onions. They make people cry.";
/* 198 */ }, yc);
/* 199 */
/* 200 */ http::async_write(iobuf, res, yc);
/* 201 */ iobuf.async_flush(yc);
/* 202 */
/* 203 */ cout << "I has res." << endl;
/* 204 */ }
/* 205 */ }
/* 206 */ }
/* 207 */
/* 208 */ peer->shutdown(peer->shutdown_both);
/* 209 */ } catch (const exception& e) {
/* 210 */ cout << "I has exception: " << e.what() << endl;
/* 211 */ }
/* 212 */ });
/* 213 */ }
/* 214 */ });
/* 215 */
/* 216 */ for (auto& thrd : pool) {
/* 217 */ thrd = thread([&io](){ io.run(); });
/* 218 */ }
/* 219 */
/* 220 */ for (auto& thrd : pool) {
/* 221 */ thrd.join();
/* 222 */ }
/* 223 */ }
Aucun commentaire:
Enregistrer un commentaire