vendredi 1 février 2019

void boost::coroutines::detail::push_coroutine_impl

Subject

This POC app uses Boost ASIO with coroutines to serve HTTP requests. Once a request has been read completely, the connection handler outsources the actual request handling into a separate thread pool for CPU-bound ops and the connection handler coroutine gets paused. Once the response is done, that coroutine gets resumed to send it.

Problem

While being tested with wrk -d 30 -c 100 -t 100 http://127.0.0.1:8910/ the app fails sooner or later with:

grumpycathttpd: /usr/include/boost/coroutine/detail/push_coroutine_impl.hpp:258: void boost::coroutines::detail::push_coroutine_impl<void>::push(): Assertion `! is_running()' failed.

My Questions

  1. Is the app's concept (ASIO thread pool with coroutines + CPU-bound thread pool with queue) possible/realistic at all?
  2. If yes: What exactly am I doing wrong while pausing/resuming the coroutine? Shall I not use async_completion, completion_handler and asio_handler_invoke like the boost libs themselves do?

Code

AFAIK I should post the app's code – so here it is... I hope it doesn't matter that there are 223 lines.

IMO the important lines are 160, 193-198, 92-102 and 106-109.

/* 1 */ #include <condition_variable>
/* 2 */ using std::condition_variable;
/* 3 */ 
/* 4 */ #include <exception>
/* 5 */ using std::exception;
/* 6 */ 
/* 7 */ #include <functional>
/* 8 */ using std::function;
/* 9 */ 
/* 10 */ #include <iostream>
/* 11 */ using std::cout;
/* 12 */ using std::endl;
/* 13 */ 
/* 14 */ #include <limits>
/* 15 */ using std::numeric_limits;
/* 16 */ 
/* 17 */ #include <memory>
/* 18 */ using std::make_shared;
/* 19 */ using std::shared_ptr;
/* 20 */ 
/* 21 */ #include <mutex>
/* 22 */ using std::mutex;
/* 23 */ using std::unique_lock;
/* 24 */ 
/* 25 */ #include <queue>
/* 26 */ using std::queue;
/* 27 */ 
/* 28 */ #include <thread>
/* 29 */ using std::thread;
/* 30 */ 
/* 31 */ #include <utility>
/* 32 */ using std::move;
/* 33 */ 
/* 34 */ #include <vector>
/* 35 */ using std::vector;
/* 36 */ 
/* 37 */ #include <boost/asio/async_result.hpp>
/* 38 */ using boost::asio::async_completion;
/* 39 */ 
/* 40 */ #include <boost/asio/buffer.hpp>
/* 41 */ using boost::asio::mutable_buffer;
/* 42 */ 
/* 43 */ #include <boost/asio/buffered_stream.hpp>
/* 44 */ using boost::asio::buffered_stream;
/* 45 */ 
/* 46 */ #include <boost/asio/handler_invoke_hook.hpp>
/* 47 */ using boost::asio::asio_handler_invoke;
/* 48 */ 
/* 49 */ #include <boost/asio/io_service.hpp>
/* 50 */ using boost::asio::io_service;
/* 51 */ 
/* 52 */ #include <boost/asio/ip/tcp.hpp>
/* 53 */ using boost::asio::ip::tcp;
/* 54 */ 
/* 55 */ #include <boost/asio/spawn.hpp>
/* 56 */ using boost::asio::spawn;
/* 57 */ 
/* 58 */ #include <boost/asio/yield.hpp>
/* 59 */ using boost::asio::yield_context;
/* 60 */ 
/* 61 */ #include <boost/beast/core.hpp>
/* 62 */ namespace beast = boost::beast;
/* 63 */ 
/* 64 */ #include <boost/beast/http.hpp>
/* 65 */ namespace http = beast::http;
/* 66 */ 
/* 67 */ #include <boost/system/error_code.hpp>
/* 68 */ using boost::system::error_code;
/* 69 */ 
/* 70 */ 
/* 71 */ class work_queue {
/* 72 */ public:
/* 73 */    work_queue(io_service& io) : io(io), pool((vector<thread>::size_type)(thread::hardware_concurrency())), stop(false) {
/* 74 */        for (auto& thrd : pool) {
/* 75 */            thrd = thread([this](){ run(); });
/* 76 */        }
/* 77 */    }
/* 78 */ 
/* 79 */    ~work_queue() {
/* 80 */        {
/* 81 */            unique_lock<mutex> ul (mtx);
/* 82 */            stop = true;
/* 83 */            cond_var.notify_all();
/* 84 */        }
/* 85 */ 
/* 86 */        for (auto& thrd : pool) {
/* 87 */            thrd.join();
/* 88 */        }
/* 89 */    }
/* 90 */ 
/* 91 */    template<class Handler>
/* 92 */    void async_run(function<void()> task, Handler&& handler) {
/* 93 */        async_completion<Handler, void(error_code)> init(handler);
/* 94 */ 
/* 95 */        {
/* 96 */            auto completion_handler (make_shared<decltype(init.completion_handler)>(init.completion_handler));
/* 97 */            unique_lock<mutex> ul (mtx);
/* 98 */            tasks.emplace(enqueued_task({move(task), [completion_handler](){ asio_handler_invoke(*completion_handler); }}));
/* 99 */            cond_var.notify_all();
/* 100 */       }
/* 101 */ 
/* 102 */       init.result.get();
/* 103 */   }
/* 104 */ 
/* 105 */ private:
/* 106 */   struct enqueued_task {
/* 107 */       function<void()> task;
/* 108 */       function<void()> on_done;
/* 109 */   };
/* 110 */ 
/* 111 */   mutex mtx;
/* 112 */   condition_variable cond_var;
/* 113 */   io_service& io;
/* 114 */   queue<enqueued_task> tasks;
/* 115 */   vector<thread> pool;
/* 116 */   bool stop;
/* 117 */ 
/* 118 */   void run() {
/* 119 */       unique_lock<mutex> ul (mtx);
/* 120 */ 
/* 121 */       while (!stop) {
/* 122 */           while (!tasks.empty()) {
/* 123 */               auto task (move(tasks.front()));
/* 124 */               tasks.pop();
/* 125 */ 
/* 126 */               ul.unlock();
/* 127 */ 
/* 128 */               try {
/* 129 */                   task.task();
/* 130 */               } catch (...) {
/* 131 */               }
/* 132 */ 
/* 133 */               io.post(move(task.on_done));
/* 134 */ 
/* 135 */               ul.lock();
/* 136 */           }
/* 137 */ 
/* 138 */           cond_var.wait(ul);
/* 139 */       }
/* 140 */   }
/* 141 */ };
/* 142 */ 
/* 143 */ int main() {
/* 144 */   vector<thread> pool ((vector<thread>::size_type)(thread::hardware_concurrency()));
/* 145 */   io_service io;
/* 146 */   work_queue wq (io);
/* 147 */   tcp::acceptor acceptor (io);
/* 148 */   tcp::endpoint endpoint (tcp::v6(), 8910);
/* 149 */ 
/* 150 */   acceptor.open(endpoint.protocol());
/* 151 */   acceptor.set_option(tcp::acceptor::reuse_address(true));
/* 152 */   acceptor.bind(endpoint);
/* 153 */   acceptor.listen(numeric_limits<int>::max());
/* 154 */ 
/* 155 */   spawn(acceptor.get_io_context(), [&acceptor, &wq](yield_context yc) {
/* 156 */       for (;;) {
/* 157 */           shared_ptr<tcp::socket> peer (new tcp::socket(acceptor.get_io_context()));
/* 158 */           acceptor.async_accept(*peer, yc);
/* 159 */ 
/* 160 */           spawn(acceptor.get_io_context(), [peer, &wq](yield_context yc) {
/* 161 */               try {
/* 162 */                   {
/* 163 */                       auto remote (peer->remote_endpoint());
/* 164 */                       cout << "I has conn: [" << remote.address().to_string() << "]:" << remote.port() << endl;
/* 165 */                   }
/* 166 */ 
/* 167 */                   buffered_stream<decltype(*peer)> iobuf (*peer);
/* 168 */ 
/* 169 */                   iobuf.async_fill(yc);
/* 170 */ 
/* 171 */                   if (iobuf.in_avail() > 0) {
/* 172 */                       char first_char;
/* 173 */ 
/* 174 */                       {
/* 175 */                           mutable_buffer first_char_buf (&first_char, 1);
/* 176 */                           iobuf.peek(first_char_buf);
/* 177 */                       }
/* 178 */ 
/* 179 */                       if ('0' <= first_char && first_char <= '9') {
/* 180 */                           cout << "I has JSON-RPC!" << endl;
/* 181 */                       } else {
/* 182 */                           beast::flat_buffer buf;
/* 183 */ 
/* 184 */                           for (;;) {
/* 185 */                               http::request<http::string_body> req;
/* 186 */ 
/* 187 */                               http::async_read(iobuf, buf, req, yc);
/* 188 */ 
/* 189 */                               cout << "I has req: '" << req.body() << '\'' << endl;
/* 190 */ 
/* 191 */                               http::response<http::string_body> res;
/* 192 */ 
/* 193 */                               wq.async_run([&req, &res](){
/* 194 */                                   res.result(http::status::internal_server_error);
/* 195 */                                   res.set(http::field::content_type, "text/plain");
/* 196 */                                   res.set(http::field::content_length, "36");
/* 197 */                                   res.body() = "I like onions. They make people cry.";
/* 198 */                               }, yc);
/* 199 */ 
/* 200 */                               http::async_write(iobuf, res, yc);
/* 201 */                               iobuf.async_flush(yc);
/* 202 */ 
/* 203 */                               cout << "I has res." << endl;
/* 204 */                           }
/* 205 */                       }
/* 206 */                   }
/* 207 */ 
/* 208 */                   peer->shutdown(peer->shutdown_both);
/* 209 */               } catch (const exception& e) {
/* 210 */                   cout << "I has exception: " << e.what() << endl;
/* 211 */               }
/* 212 */           });
/* 213 */       }
/* 214 */   });
/* 215 */ 
/* 216 */   for (auto& thrd : pool) {
/* 217 */       thrd = thread([&io](){ io.run(); });
/* 218 */   }
/* 219 */ 
/* 220 */   for (auto& thrd : pool) {
/* 221 */       thrd.join();
/* 222 */   }
/* 223 */ }

Aucun commentaire:

Enregistrer un commentaire