LCOV - code coverage report
Current view: top level - extra/webserver/unix - SPWebUnixConnectionWorker.cc (source / functions) Hit Total Coverage
Test: coverage.info Lines: 552 799 69.1 %
Date: 2024-05-12 00:16:13 Functions: 64 73 87.7 %

          Line data    Source code
       1             : /**
       2             :  Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
       3             : 
       4             :  Permission is hereby granted, free of charge, to any person obtaining a copy
       5             :  of this software and associated documentation files (the "Software"), to deal
       6             :  in the Software without restriction, including without limitation the rights
       7             :  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       8             :  copies of the Software, and to permit persons to whom the Software is
       9             :  furnished to do so, subject to the following conditions:
      10             : 
      11             :  The above copyright notice and this permission notice shall be included in
      12             :  all copies or substantial portions of the Software.
      13             : 
      14             :  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      15             :  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      16             :  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
      17             :  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      18             :  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      19             :  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
      20             :  THE SOFTWARE.
      21             :  **/
      22             : 
      23             : #include "SPWebUnixConfig.h"
      24             : #include "SPWebUnixConnectionWorker.h"
      25             : #include "SPWebUnixConnectionQueue.h"
      26             : #include "SPWebUnixRequest.h"
      27             : #include "SPWebRequestFilter.h"
      28             : #include "SPWebInputFilter.h"
      29             : 
      30             : #include <sys/signalfd.h>
      31             : #include <sys/eventfd.h>
      32             : #include <sys/ioctl.h>
      33             : #include <sys/sendfile.h>
      34             : #include <sys/types.h>
      35             : #include <sys/un.h>
      36             : #include <sys/socket.h>
      37             : #include <arpa/inet.h>
      38             : #include <netinet/in.h>
      39             : #include <unistd.h>
      40             : #include <fcntl.h>
      41             : 
      42             : namespace STAPPLER_VERSIONIZED stappler::web {
      43             : 
      44           0 : static StringView s_getSignalName(int sig) {
      45           0 :         switch (sig) {
      46           0 :         case SIGINT: return "SIGINT";
      47           0 :         case SIGILL: return "SIGILL";
      48           0 :         case SIGABRT: return "SIGABRT";
      49           0 :         case SIGFPE: return "SIGFPE";
      50           0 :         case SIGSEGV: return "SIGSEGV";
      51           0 :         case SIGTERM: return "SIGTERM";
      52           0 :         case SIGHUP: return "SIGHUP";
      53           0 :         case SIGQUIT: return "SIGQUIT";
      54           0 :         case SIGTRAP: return "SIGTRAP";
      55           0 :         case SIGKILL: return "SIGKILL";
      56           0 :         case SIGBUS: return "SIGBUS";
      57           0 :         case SIGSYS: return "SIGSYS";
      58           0 :         case SIGPIPE: return "SIGPIPE";
      59           0 :         case SIGALRM: return "SIGALRM";
      60           0 :         case SIGURG: return "SIGURG";
      61           0 :         case SIGSTOP: return "SIGSTOP";
      62           0 :         case SIGTSTP: return "SIGTSTP";
      63           0 :         case SIGCONT: return "SIGCONT";
      64           0 :         case SIGCHLD: return "SIGCHLD";
      65           0 :         case SIGTTIN: return "SIGTTIN";
      66           0 :         case SIGTTOU: return "SIGTTOU";
      67           0 :         case SIGPOLL: return "SIGPOLL";
      68           0 :         case SIGXCPU: return "SIGXCPU";
      69           0 :         case SIGXFSZ: return "SIGXFSZ";
      70           0 :         case SIGVTALRM: return "SIGVTALRM";
      71           0 :         case SIGPROF: return "SIGPROF";
      72           0 :         case SIGUSR1: return "SIGUSR1";
      73           0 :         case SIGUSR2: return "SIGUSR2";
      74           0 :         default: return "(unknown)";
      75             :         }
      76             :         return StringView();
      77             : }
      78             : 
      79         200 : ConnectionWorker::ConnectionWorker(ConnectionQueue *queue, UnixRoot *h, int socket, int pipe, int event)
      80         400 : : _queue(queue), _root(h)
      81         200 : , _inputClient(socket, EPOLLIN | EPOLLEXCLUSIVE)
      82         200 : , _cancelClient(pipe, EPOLLIN | EPOLLET)
      83         200 : , _eventClient(event, EPOLLIN | EPOLLET | EPOLLEXCLUSIVE)
      84         400 : ,_thread(ConnectionWorker::workerThread, this, queue) {
      85         200 :         _queue->retain();
      86         200 : }
      87             : 
      88         400 : ConnectionWorker::~ConnectionWorker() {
      89         200 :         _queue->release();
      90         400 : }
      91             : 
      92         197 : void ConnectionWorker::threadInit() {
      93         197 :         _threadAlloc = allocator::create();
      94         199 :         _threadPool = pool::create(_threadAlloc);
      95             : 
      96         200 :         _threadId = std::this_thread::get_id();
      97             : 
      98             :         sigset_t sigset;
      99         200 :         sigfillset(&sigset);
     100             : 
     101         200 :         _signalFd = ::signalfd(-1, &sigset, 0);
     102         200 :         ConnectionQueue::setNonblocking(_signalFd);
     103             : 
     104         200 :         _epollFd = epoll_create1(0);
     105             : 
     106         200 :         addClient(_inputClient);
     107         200 :         addClient(_cancelClient);
     108         200 :         addClient(_eventClient);
     109         200 : }
     110             : 
     111         181 : void ConnectionWorker::threadDispose() {
     112         181 :         if (_signalFd >= 0) {
     113         183 :                 close(_signalFd);
     114         195 :                 _signalFd = -1;
     115             :         }
     116         193 :         if (_epollFd >= 0) {
     117         195 :                 close(_epollFd);
     118         197 :                 _epollFd = -1;
     119             :         }
     120             : 
     121         195 :         pool::destroy(_threadPool);
     122         168 :         allocator::destroy(_threadAlloc);
     123         183 : }
     124             : 
     125         386 : bool ConnectionWorker::worker() {
     126         386 :         if (_shouldClose) {
     127         186 :                 return false;
     128             :         }
     129             : 
     130         200 :         while (poll(_epollFd)) {
     131             :                 struct signalfd_siginfo si;
     132           0 :                 int nr = ::read(_signalFd, &si, sizeof si);
     133           0 :                 while (nr == sizeof si) {
     134           0 :                         if (si.ssi_signo != SIGINT) {
     135           0 :                                 log::error("ConnectionWorker", "epoll_wait() exit with signal: ", si.ssi_signo, " ", s_getSignalName(si.ssi_signo));
     136             :                         }
     137           0 :                         nr = ::read(_signalFd, &si, sizeof si);
     138             :                 }
     139             :         }
     140             : 
     141         184 :         return true;
     142             : }
     143             : 
     144         200 : bool ConnectionWorker::poll(int epollFd) {
     145             :         std::array<struct epoll_event, ConnectionWorker::MaxEvents> _events;
     146             : 
     147        6807 :         while (!_shouldClose) {
     148        6619 :                 int nevents = epoll_wait(epollFd, _events.data(), ConnectionWorker::MaxEvents, -1);
     149        6617 :                 if (nevents == -1 && errno != EINTR) {
     150           0 :                         char buf[256] = { 0 };
     151           0 :                         log::error("ConnectionWorker", "epoll_wait() failed with errno ", errno, " (", strerror_r(errno, buf, 255), ")");
     152           0 :                         return false;
     153        6617 :                 } else if (errno == EINTR) {
     154           0 :                         return true;
     155             :                 }
     156             : 
     157             :                 /// process high-priority events
     158       13227 :                 for (int i = 0; i < nevents; i++) {
     159        6617 :                         Client *client = static_cast<Client *>(_events[i].data.ptr);
     160             : 
     161        6610 :                         if ((_events[i].events & EPOLLIN)) {
     162        6491 :                                 if (client == &_eventClient) {
     163         101 :                                         uint64_t value = 0;
     164         101 :                                         auto sz = read(_eventClient.fd, &value, sizeof(uint64_t));
     165         101 :                                         if (sz == 8 && value) {
     166         100 :                                                 auto ev = _queue->popTask();
     167         100 :                                                 if (value > 1) {
     168           2 :                                                         value -= 1;
     169             :                                                         // forward event to another worker
     170           2 :                                                         write(_eventClient.fd, &value, sizeof(uint64_t));
     171             :                                                 }
     172         100 :                                                 if (ev) {
     173         100 :                                                         runTask(ev);
     174             :                                                 }
     175             :                                         }
     176             :                                 }
     177             :                         }
     178             :                 }
     179             : 
     180       10116 :                 for (int i = 0; i < nevents; i++) {
     181        6609 :                         Client *client = static_cast<Client *>(_events[i].data.ptr);
     182        6606 :                         if ((_events[i].events & EPOLLERR)) {
     183           0 :                                 if (client == &_inputClient) {
     184           0 :                                         log::error("ConnectionWorker", "epoll error on server socket ", client->fd);
     185           0 :                                         _shouldClose = true;
     186             :                                 } else {
     187           0 :                                         log::error("ConnectionWorker", "epoll error on socket ", client->fd);
     188           0 :                                         client->gen->releaseClient(client);
     189             :                                 }
     190           0 :                                 continue;
     191             :                         }
     192             : 
     193        6608 :                         if ((_events[i].events & EPOLLIN)) {
     194        6487 :                                 if (client == &_inputClient) {
     195             :                                         char str[INET_ADDRSTRLEN + 1];
     196             : 
     197             :                                         struct sockaddr client_addr;
     198        3100 :                                         socklen_t client_addr_len = sizeof(client_addr);
     199        3100 :                                         int newClient = ::accept(_inputClient.fd, &client_addr, &client_addr_len);
     200        6200 :                                         while (newClient != -1) {
     201        3100 :                                                 struct sockaddr_in* pV4Addr = (struct sockaddr_in*)&client_addr;
     202        3100 :                                                 struct in_addr ipAddr = pV4Addr->sin_addr;
     203             : 
     204        3100 :                                                 memset(str, 0, INET_ADDRSTRLEN + 1);
     205        3100 :                                                 inet_ntop( AF_INET, &ipAddr, str, INET_ADDRSTRLEN );
     206             : 
     207        3100 :                                                 pushFd(epollFd, newClient, StringView(str), pV4Addr->sin_port);
     208        3100 :                                                 newClient = ::accept(_inputClient.fd, &client_addr, &client_addr_len);
     209             :                                         }
     210        3100 :                                         if (errno == EAGAIN || errno == EWOULDBLOCK) {
     211             :                                                 // we processed all of the connections
     212             :                                                 break;
     213             :                                         } else {
     214           0 :                                                 log::error("ConnectionWorker", "accept() failed");
     215           0 :                                                 break;
     216             :                                         }
     217        3387 :                                 } else if (client == &_cancelClient) {
     218             :                                         //onError("Received end signal");
     219         189 :                                         _shouldClose = true;
     220        3198 :                                 } else if (client == &_eventClient) {
     221             :                                         // do nothing
     222             :                                 } else {
     223        3100 :                                         client->performRead();
     224             :                                 }
     225             :                         }
     226             : 
     227        3504 :                         if ((_events[i].events & EPOLLOUT) && !client->system) {
     228        3218 :                                 client->performWrite();
     229             :                         }
     230             : 
     231        3502 :                         if ((_events[i].events & EPOLLHUP) || (_events[i].events & EPOLLRDHUP)) {
     232           0 :                                 if (client != &_eventClient && client != &_cancelClient) {
     233           0 :                                         removeClient(*client);
     234             :                                 }
     235             :                         }
     236             : 
     237        3508 :                         if (!client->system && !client->valid) {
     238           0 :                                 client->shutdownAll();
     239             :                         }
     240             : 
     241        3508 :                         if (client->shutdownReadSend && client->shutdownWriteSend) {
     242        3100 :                                 removeClient(*client);
     243             :                         }
     244             :                 }
     245             :         }
     246             : 
     247         188 :         if (_shouldClose) {
     248         184 :                 auto gen = _generation;
     249         233 :                 while (gen) {
     250          49 :                         gen->releaseAll();
     251          49 :                         gen = _generation->prev;
     252             :                 }
     253             :         }
     254             : 
     255         188 :         return !_shouldClose;
     256             : }
     257             : 
     258         100 : void ConnectionWorker::runTask(AsyncTask *task) {
     259         100 :         auto host = task->getHost();
     260         100 :         perform([&] {
     261         100 :                 AsyncTask::run(task);
     262         100 :         }, task->pool(), config::TAG_HOST, host.getController());
     263         100 :         _queue->releaseTask(task);
     264         100 : }
     265             : 
     266        3100 : UnixRequestController *ConnectionWorker::readRequest(Client *client, BufferChain &chain) {
     267        3100 :         RequestInfo info;
     268             : 
     269        3100 :         StringView str;
     270        3100 :         pool_t *tmpPool = nullptr;
     271             : 
     272        3100 :         if (chain.isSingle()) {
     273        3025 :                 str = StringView((const char *)chain.front->buf, chain.getBytesRead());
     274             :         } else {
     275          75 :                 tmpPool = pool::create(client->pool);
     276             : 
     277          75 :                 auto data = chain.extract(tmpPool, 0, std::min(chain.getBytesRead(), config::UNIX_MAX_REQUEST_LINE));
     278          75 :                 str = data.toStringView();
     279             :         }
     280             : 
     281        3100 :         auto requestLineSize = RequestFilter::readRequestLine(str, info);
     282        3100 :         if (requestLineSize == 0) {
     283           0 :                 if (tmpPool) {
     284           0 :                         pool::destroy(tmpPool);
     285             :                 }
     286           0 :                 client->cancelWithResult(HTTP_INTERNAL_SERVER_ERROR);
     287           0 :                 return nullptr;
     288             :         }
     289             : 
     290        3100 :         UnixRequestController *cfg = nullptr;
     291        3100 :         auto p = pool::create(_threadPool);
     292        3100 :         perform([&] {
     293        3100 :                 cfg = new (p) UnixRequestController(p, info.clone(p), client);
     294        3100 :         }, p);
     295             : 
     296        3100 :         if (tmpPool) {
     297          75 :                 pool::destroy(tmpPool);
     298             :         }
     299             : 
     300        3100 :         client->bytesRead = chain.getBytesRead();
     301        3100 :         chain.releaseEmpty();
     302             : 
     303        3100 :         return cfg;
     304        3100 : }
     305             : 
     306       20150 : Status ConnectionWorker::parseRequestHeader(UnixRequestController *cfg, Client *client, BufferChain &chain) {
     307       20150 :         StringView name;
     308       20150 :         StringView value;
     309             : 
     310       20150 :         BytesView str;
     311       20150 :         pool_t *tmpPool = nullptr;
     312       20150 :         size_t headerLen = chain.getBytesRead() - client->bytesRead;
     313             : 
     314       20150 :         if (chain.isSingle()) {
     315       19550 :                 str = BytesView(chain.front->buf + (client->bytesRead - chain.front->absolute), headerLen);
     316             :         } else {
     317         600 :                 tmpPool = pool::create(client->pool);
     318             : 
     319         600 :                 str = chain.extract(tmpPool, client->bytesRead, std::min(headerLen, config::UNIX_MAX_HEADER_LINE));
     320             :         }
     321             : 
     322       20150 :         if (str.size() == 2 && str.equals((const uint8_t *)"\r\n", 2)) {
     323        3100 :                 client->bytesRead += headerLen;
     324        3100 :                 chain.releaseEmpty();
     325        3100 :                 return DONE;
     326             :         }
     327             : 
     328       17050 :         auto headerLineLength = RequestFilter::readRequestHeader(str, name, value);
     329       17050 :         if (headerLineLength == 0) {
     330           0 :                 if (tmpPool) {
     331           0 :                         pool::destroy(tmpPool);
     332             :                 }
     333           0 :                 return HTTP_INTERNAL_SERVER_ERROR;
     334             :         }
     335             : 
     336       17050 :         perform([&] {
     337       17050 :                 cfg->setRequestHeader(name, value);
     338       17050 :         }, cfg->getPool());
     339             : 
     340       17050 :         if (tmpPool) {
     341         525 :                 pool::destroy(tmpPool);
     342             :         }
     343             : 
     344       17050 :         client->bytesRead = chain.getBytesRead();
     345       17050 :         chain.releaseEmpty();
     346       17050 :         return OK;
     347             : }
     348             : 
     349        3100 : Status ConnectionWorker::processRequest(UnixRequestController *req) {
     350        6200 :         return perform([&, this] {
     351        3100 :                 auto ret = _root->processRequest(req);
     352        3100 :                 switch (ret) {
     353        1975 :                 case DONE:
     354        1975 :                         break;
     355           0 :                 case DECLINED:
     356           0 :                         break;
     357        1125 :                 case SUSPENDED:
     358             :                 case OK:
     359             :                 default:
     360        1125 :                         break;
     361             :                 }
     362        3100 :                 return ret;
     363        6200 :         }, req->getPool(), config::TAG_REQUEST, req);
     364             : }
     365             : 
     366        3100 : void ConnectionWorker::pushFd(int epollFd, int fd, StringView addr, uint16_t port) {
     367        3100 :         if (!_generation) {
     368          50 :                 _generation = makeGeneration();
     369             :         }
     370             : 
     371        3100 :         auto c = _generation->pushFd(fd, addr, port);
     372        3100 :         if (addClient(*c)) {
     373        3100 :                 ++ _fdCount;
     374             :         } else {
     375           0 :                 c->gen->releaseClient(c);
     376             :         }
     377        3100 : }
     378             : 
     379        3699 : bool ConnectionWorker::addClient(Client &client) {
     380        3699 :         if (client.fd < 0) {
     381           0 :                 return false;
     382             :         }
     383             : 
     384        3699 :         int err = ::epoll_ctl(_epollFd, EPOLL_CTL_ADD, client.fd, &client.event);
     385        3697 :         if (err == -1) {
     386           0 :                 char buf[256] = { 0 };
     387           0 :                 log::error("ConnectionWorker", "Failed to add client epoll_ctl("
     388           0 :                                 , client.fd,  ", EPOLL_CTL_ADD): ",  strerror_r(errno, buf, 255));
     389           0 :                 return false;
     390             :         }
     391        3697 :         return true;
     392             : }
     393             : 
     394        3100 : void ConnectionWorker::removeClient(Client &client) {
     395        3100 :         if (client.fd < 0) {
     396           0 :                 return;
     397             :         }
     398             : 
     399        3100 :         int err = ::epoll_ctl(_epollFd, EPOLL_CTL_DEL, client.fd, &client.event);
     400        3100 :         if (err == -1) {
     401           0 :                 char buf[256] = { 0 };
     402           0 :                 log::error("ConnectionWorker", "Failed to remove client epoll_ctl("
     403           0 :                                 , client.fd,  ", EPOLL_CTL_DEL): ",  strerror_r(errno, buf, 255));
     404             :         }
     405             : 
     406        3100 :         client.release();
     407             : }
     408             : 
     409        6425 : ConnectionWorker::Buffer *ConnectionWorker::Buffer::create(pool_t *p, size_t abs) {
     410        6425 :         auto requestSize = config::UNIX_CLIENT_BUFFER_SIZE;
     411        6425 :         auto block = pool::palloc(p, requestSize);
     412             : 
     413        6425 :         auto b = new (block) Buffer();
     414             : 
     415        6425 :         b->next = nullptr;
     416        6425 :         b->pool = p;
     417        6425 :         b->buf = (uint8_t *)block + sizeof(Buffer);
     418        6425 :         b->capacity = requestSize - sizeof(Buffer);
     419        6425 :         b->size = 0;
     420        6425 :         b->offset = 0;
     421        6425 :         b->absolute = abs;
     422             : 
     423        6425 :         return b;
     424             : }
     425             : 
     426          50 : ConnectionWorker::Buffer *ConnectionWorker::Buffer::create(pool_t *p, StringView path, off_t rangeStart, size_t rangeLen, size_t abs) {
     427          50 :         if (!filesystem::exists(path)) {
     428           0 :                 return nullptr;
     429             :         }
     430             : 
     431          50 :         auto requestSize = config::UNIX_CLIENT_BUFFER_SIZE;
     432          50 :         auto block = pool::palloc(p, requestSize);
     433             : 
     434          50 :         auto b = new (block) Buffer();
     435             : 
     436          50 :         b->next = nullptr;
     437          50 :         b->pool = p;
     438          50 :         b->buf = (uint8_t *)block + sizeof(Buffer);
     439          50 :         b->capacity = requestSize - sizeof(Buffer);
     440          50 :         b->offset = rangeStart;
     441          50 :         b->absolute = abs;
     442          50 :         b->flags |= IsOutFile;
     443             : 
     444          50 :         BufferFile *file = new (b->buf) BufferFile;
     445          50 :         filesystem::stat(path, file->stat);
     446          50 :         file->fd = ::open(path.data(), O_RDONLY);
     447          50 :         file->extraBuffer = b->buf + sizeof(BufferFile);
     448          50 :         b->capacity -= sizeof(BufferFile);
     449             : 
     450          50 :         b->size = std::min(rangeLen, file->stat.size - rangeStart);
     451             : 
     452          50 :         return b;
     453             : }
     454             : 
     455        5825 : void ConnectionWorker::Buffer::release() {
     456        5825 :         if (auto f = getFile()) {
     457          50 :                 if (f->fd >= 0) {
     458          50 :                         ::close(f->fd);
     459          50 :                         f->fd = -1;
     460             :                 }
     461             :         }
     462        5825 :         pool::free(pool, this, capacity + sizeof(this));
     463        5825 : }
     464             : 
     465           0 : StringView ConnectionWorker::Buffer::str() const {
     466           0 :         if (isOutFile()) {
     467           0 :                 return StringView();
     468             :         }
     469             : 
     470           0 :         return StringView(reinterpret_cast<const char *>(buf + offset), size - offset);
     471             : }
     472             : 
     473     2443225 : size_t ConnectionWorker::Buffer::availableForWrite() const {
     474     2443225 :         if (isOutFile()) {
     475           0 :                 return 0;
     476             :         }
     477             : 
     478     2443225 :         return capacity - size;
     479             : }
     480             : 
     481      107375 : size_t ConnectionWorker::Buffer::availableForRead() const {
     482      107375 :         return size - offset;
     483             : }
     484             : 
     485     1224675 : uint8_t *ConnectionWorker::Buffer::writeTarget() const {
     486     1224675 :         if (isOutFile()) {
     487           0 :                 return nullptr;
     488             :         }
     489             : 
     490     1224675 :         return buf + size;
     491             : }
     492             : 
     493       48075 : uint8_t *ConnectionWorker::Buffer::readSource() const {
     494       48075 :         if (isOutFile()) {
     495          50 :                 return nullptr;
     496             :         }
     497             : 
     498       48025 :         return buf + offset;
     499             : }
     500             : 
     501     1221375 : size_t ConnectionWorker::Buffer::write(const uint8_t *b, size_t s) {
     502     1221375 :         if (isOutFile()) {
     503           0 :                 return 0;
     504             :         }
     505             : 
     506     1221375 :         auto writeSize = std::min(availableForWrite(), s);
     507     1221375 :         memcpy(writeTarget(), b, writeSize);
     508     1221375 :         size += writeSize;
     509     1221375 :         return writeSize;
     510             : }
     511             : 
     512        9000 : ConnectionWorker::BufferFile *ConnectionWorker::Buffer::getFile() const {
     513        9000 :         if (isOutFile()) {
     514         100 :                 return (ConnectionWorker::BufferFile *)buf;
     515             :         }
     516        8900 :         return nullptr;
     517             : }
     518             : 
     519     2531625 : bool ConnectionWorker::BufferChain::isEos() const {
     520     2531625 :         if (eos || (back && (back->flags & Buffer::Eos) != Buffer::None)) {
     521         675 :                 return true;
     522             :         }
     523     2530950 :         return false;
     524             : }
     525             : 
     526       30750 : bool ConnectionWorker::BufferChain::empty() const {
     527       30750 :         if (!back || back->availableForRead() == 0) {
     528         525 :                 return true;
     529             :         }
     530       30225 :         return false;
     531             : }
     532             : 
     533        3100 : size_t ConnectionWorker::BufferChain::size() const {
     534        3100 :         size_t ret = 0;
     535        3100 :         auto b = front;
     536        6275 :         while (b) {
     537        3175 :                 ret += b->size;
     538        3175 :                 b = b->next;
     539             :         }
     540        3100 :         return ret;
     541             : }
     542             : 
     543     1224675 : ConnectionWorker::Buffer *ConnectionWorker::BufferChain::getWriteTarget(pool_t *pool) {
     544     1224675 :         if (back && back->availableForWrite() > 0) {
     545     1218250 :                 return back;
     546             :         } else {
     547        6425 :                 auto b = Buffer::create(pool, back ? back->absolute + back->capacity : 0);
     548             : 
     549        6425 :                 if (back) {
     550         300 :                         back->next = b;
     551         300 :                         back = b;
     552         300 :                         tail = &(back->next);
     553             :                 } else {
     554        6125 :                         front = back = b;
     555        6125 :                         tail = &(back->next);
     556             :                 }
     557             : 
     558        6425 :                 return b;
     559             :         }
     560             : }
     561             : 
     562     1224550 : bool ConnectionWorker::BufferChain::write(pool_t *p, const uint8_t *buf, size_t size, Buffer::Flags flags) {
     563     1224550 :         Buffer *targetBuffer = nullptr;
     564     1224550 :         size_t written = 0;
     565             : 
     566     1224550 :         if (isEos()) {
     567         675 :                 return false;
     568             :         }
     569             : 
     570     1223875 :         if (size == 0 && flags != Buffer::Flags::None) {
     571        2600 :                 if (back) {
     572        2575 :                         back->flags |= flags;
     573             :                 } else {
     574          25 :                         if ((flags & Buffer::Eos) != Buffer::None) {
     575          25 :                                 eos = true;
     576             :                         }
     577             :                 }
     578        2600 :                 return true;
     579             :         }
     580             : 
     581     2442650 :         while (size > 0) {
     582     1221375 :                 targetBuffer = getWriteTarget(p);
     583             : 
     584     1221375 :                 written = targetBuffer->write(buf, size);
     585     1221375 :                 buf += written; size -= written;
     586             :         }
     587             : 
     588     1221275 :         if (targetBuffer && flags != Buffer::Flags::None) {
     589         450 :                 targetBuffer->flags |= flags;
     590             :         }
     591             : 
     592     1221275 :         return true;
     593             : }
     594             : 
     595          50 : bool ConnectionWorker::BufferChain::write(Buffer *newBuf) {
     596          50 :         if (back) {
     597           0 :                 back->next = newBuf;
     598           0 :                 back = newBuf;
     599             :         } else {
     600          50 :                 front = back = newBuf;
     601             :         }
     602          50 :         tail = &(back->next);
     603          50 :         return true;
     604             : }
     605             : 
     606           0 : bool ConnectionWorker::BufferChain::write(BufferChain &chain) {
     607           0 :         if (back) {
     608           0 :                 back->next = chain.front;
     609           0 :                 back = chain.back;
     610             :         } else {
     611           0 :                 front = chain.front;
     612           0 :                 back = chain.back;
     613             :         }
     614           0 :         tail = &(back->next);
     615             : 
     616           0 :         chain.front = chain.back = nullptr;
     617           0 :         chain.tail = nullptr;
     618             : 
     619           0 :         return true;
     620             : }
     621             : 
     622        3100 : bool ConnectionWorker::BufferChain::readFromFd(pool_t *p, int fd) {
     623        3100 :         int sz = 0;
     624        3100 :         ::ioctl (fd, FIONREAD, &sz);
     625             : 
     626        6400 :         while (sz > 0) {
     627        3300 :                 Buffer *targetBuffer = getWriteTarget(p);
     628             : 
     629        3300 :                 sz = ::read(fd, targetBuffer->writeTarget(), targetBuffer->availableForWrite());
     630        3300 :                 if (sz > 0) {
     631        3300 :                         targetBuffer->size += sz;
     632             : 
     633        3300 :                         sz = ::read(fd, nullptr, 0);
     634        3300 :                         if (sz == 0) {
     635        3300 :                                 ::ioctl (fd, FIONREAD, &sz);
     636             :                         }
     637             :                 }
     638             : 
     639        3300 :                 if (sz == -1) {
     640           0 :                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
     641           0 :                                 char tmp[256] = { 0 };
     642           0 :                                 log::error("ConnectionWorker", "Fail to read from client: ", strerror_r(errno, tmp, 255));
     643           0 :                                 return false;
     644             :                         }
     645             :                 }
     646             :         }
     647             : 
     648        3100 :         return true;
     649             : }
     650             : 
     651       26975 : Status ConnectionWorker::BufferChain::read(const Callback<int(const Buffer *, const uint8_t *, size_t)> &cb, bool release) {
     652       26975 :         auto buf = front;
     653       29725 :         while (buf) {
     654       27275 :                 auto size = buf->availableForRead();
     655       27275 :                 auto ret = cb(buf, buf->readSource(), size);
     656       53900 :                 while (size > 0 && ret > 0) {
     657       26625 :                         buf->offset += ret;
     658       26625 :                         size = buf->availableForRead();
     659       26625 :                         if (size > 0) {
     660       20800 :                                 ret = cb(buf, buf->readSource(), size);
     661             :                         }
     662             :                 }
     663             : 
     664       27275 :                 if (size == 0) {
     665        5825 :                         if ((buf->flags & Buffer::Eos) != Buffer::None) {
     666        3075 :                                 eos = true;
     667             :                         }
     668        5825 :                         if (release) {
     669        3375 :                                 auto f = front;
     670        3375 :                                 front = front->next;
     671        3375 :                                 if (&f->next ==tail) {
     672        3075 :                                         tail = nullptr;
     673             :                                 }
     674        3375 :                                 if (f == back) {
     675        3075 :                                         back = nullptr;
     676             :                                 }
     677        3375 :                                 f->release();
     678        3375 :                                 buf = front;
     679             :                         } else {
     680        2450 :                                 buf = buf->next;
     681             :                         }
     682             :                 }
     683             : 
     684       27275 :                 if (ret == DONE || eos) {
     685       24525 :                         return DONE;
     686        2750 :                 } else if (ret == SUSPENDED) {
     687           0 :                         return SUSPENDED;
     688        2750 :                 } else if (ret == DECLINED) {
     689           0 :                         return DECLINED;
     690             :                 }
     691             :         }
     692        2450 :         return OK;
     693             : }
     694             : 
     695        3075 : Status ConnectionWorker::BufferChain::writeToFd(int fd, size_t &bytesWritten) {
     696        6150 :         return read([&] (const Buffer *source, const uint8_t *buf, size_t size) {
     697        3175 :                 ssize_t ret = 0;
     698        3175 :                 if (auto f = source->getFile()) {
     699          50 :                         off_t offset = source->offset;
     700          50 :                         ret = ::sendfile(fd, f->fd, &offset, source->size);
     701             :                 } else {
     702        3125 :                         ret = ::write(fd, buf, size);
     703             :                 }
     704        3175 :                 if (ret >= 0) {
     705        3175 :                         bytesWritten += ret;
     706        3175 :                         return int(ret);
     707             :                 } else {
     708           0 :                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
     709           0 :                                 return int(SUSPENDED);
     710             :                         }
     711           0 :                         return int(DECLINED);
     712             :                 }
     713        6150 :         }, true);
     714             : }
     715             : 
     716       44075 : size_t ConnectionWorker::BufferChain::getBytesRead() const {
     717       44075 :         if (front) {
     718       44075 :                 return front->absolute + front->offset;
     719             :         }
     720           0 :         return 0;
     721             : }
     722             : 
     723         675 : BytesView ConnectionWorker::BufferChain::extract(pool_t *pool, size_t initOffset, size_t blockSize) const {
     724         675 :         if (!front) {
     725           0 :                 return BytesView();
     726             :         }
     727             : 
     728         675 :         if (initOffset + blockSize > getBytesRead()) {
     729           0 :                 return BytesView();
     730             :         }
     731             : 
     732         675 :         if (front && front->absolute > initOffset) {
     733           0 :                 return BytesView();
     734             :         }
     735             : 
     736         675 :         size_t targetSize = blockSize;
     737         675 :         size_t writeSize = blockSize;
     738         675 :         auto block = (uint8_t *)pool::alloc(pool, targetSize);
     739         675 :         auto target = block;
     740             : 
     741         675 :         Buffer *first = front;
     742         675 :         while (first->absolute + first->capacity < initOffset) {
     743           0 :                 first = first->next;
     744             :         }
     745             : 
     746         675 :         initOffset -= first->absolute;
     747             : 
     748        1350 :         while (writeSize > 0 && first) {
     749         675 :                 auto copySize = std::min(first->capacity - initOffset, writeSize);
     750         675 :                 ::memcpy(target, first->buf + initOffset, copySize);
     751         675 :                 initOffset = 0;
     752         675 :                 writeSize -= copySize;
     753         675 :                 target += copySize;
     754         675 :                 first = first->next;
     755             :         }
     756             : 
     757         675 :         return BytesView(block, targetSize);
     758             : }
     759             : 
     760             : /*void ConnectionWorker::BufferChain::release(size_t offset) {
     761             :         while (front && offset > 0) {
     762             :                 if (front->size < front->offset + offset) {
     763             :                         offset -= (front->size - front->offset);
     764             : 
     765             :                         auto f = front;
     766             :                         front = front->next;
     767             :                         if (&f->next ==tail) {
     768             :                                 tail = nullptr;
     769             :                         }
     770             :                         if (f == back) {
     771             :                                 back = nullptr;
     772             :                         }
     773             :                         f->release();
     774             :                 } else {
     775             :                         front->offset += offset;
     776             :                         offset = 0;
     777             :                 }
     778             :         }
     779             : }*/
     780             : 
     781       23250 : void ConnectionWorker::BufferChain::releaseEmpty() {
     782       25700 :         while (front && front->availableForRead() == 0) {
     783        2450 :                 auto f = front;
     784        2450 :                 front = front->next;
     785        2450 :                 if (&f->next ==tail) {
     786        2450 :                         tail = nullptr;
     787             :                 }
     788        2450 :                 if (f == back) {
     789        2450 :                         back = nullptr;
     790             :                 }
     791        2450 :                 f->release();
     792             :         }
     793       23250 : }
     794             : 
     795           0 : void ConnectionWorker::BufferChain::clear() {
     796           0 :         while (front) {
     797           0 :                 auto f = front;
     798           0 :                 front = front->next;
     799           0 :                 if (&f->next ==tail) {
     800           0 :                         tail = nullptr;
     801             :                 }
     802           0 :                 if (f == back) {
     803           0 :                         back = nullptr;
     804             :                 }
     805           0 :                 f->release();
     806             :         }
     807           0 : }
     808             : 
     809        3100 : ConnectionWorker::Client::Client(Generation *g, pool_t *p, int ifd, StringView inAddr, uint16_t inPort)
     810        3100 : : gen(g), pool(p), addr(inAddr.pdup(p)), port(inPort) {
     811        3100 :         memset(&event, 0, sizeof(event));
     812        3100 :         event.data.ptr = this;
     813        3100 :         event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
     814        3100 :         fd = ifd;
     815             : 
     816        3100 :         ConnectionQueue::setNonblocking(fd);
     817        3100 : }
     818             : 
     819         600 : ConnectionWorker::Client::Client(int f, int mode) {
     820         600 :         memset(&event, 0, sizeof(event));
     821         600 :         fd = f;
     822         600 :         event.data.ptr = this;
     823         600 :         event.events = mode;
     824         600 :         system = true;
     825         600 : }
     826             : 
     827        3100 : void ConnectionWorker::Client::shutdownRead() {
     828        3100 :         if (!shutdownReadSend) {
     829        3100 :                 shutdownReadSend = true;
     830        3100 :                 ::shutdown(event.data.fd, SHUT_RD);
     831             :         }
     832        3100 : }
     833             : 
     834        3100 : void ConnectionWorker::Client::shutdownWrite() {
     835        3100 :         if (!shutdownWriteSend) {
     836        3100 :                 shutdownWriteSend = true;
     837        3100 :                 ::shutdown(event.data.fd, SHUT_WR);
     838             :         }
     839        3100 : }
     840             : 
     841           0 : void ConnectionWorker::Client::shutdownAll() {
     842           0 :         if (!shutdownReadSend && !shutdownWriteSend) {
     843           0 :                 shutdownWriteSend = true;
     844           0 :                 shutdownReadSend = true;
     845           0 :                 ::shutdown(event.data.fd, SHUT_RDWR);
     846             :         } else {
     847           0 :                 shutdownRead();
     848           0 :                 shutdownWrite();
     849             :         }
     850           0 : }
     851             : 
     852        3100 : void ConnectionWorker::Client::release() {
     853        3100 :         if (request) {
     854        3100 :                 auto p = request->getPool();
     855        3100 :                 request->finalize();
     856        3100 :                 pool::destroy(p);
     857        3100 :                 request = nullptr;
     858             :         }
     859             : 
     860        3100 :         close(event.data.fd);
     861        3100 :         if (gen) {
     862        3100 :                 gen->releaseClient(this);
     863             :         }
     864             : 
     865        3100 :         if (pool) {
     866        3100 :                 pool::destroy(pool);
     867             :         }
     868        3100 : }
     869             : 
     870        3100 : bool ConnectionWorker::Client::performRead() {
     871        3100 :         if (shutdownReadSend) {
     872           0 :                 return false;
     873             :         }
     874             : 
     875        3100 :         input.readFromFd(pool, fd);
     876             : 
     877        3100 :         if (input) {
     878        3100 :                 auto ret = runInputFilter(input);
     879        3100 :                 switch (ret) {
     880           0 :                 case OK:
     881             :                 case SUSPENDED:
     882           0 :                         break;
     883           0 :                 case DECLINED:
     884           0 :                         shutdownRead();
     885           0 :                         break;
     886        2625 :                 case DONE:
     887        2625 :                         perform([&] {
     888        2625 :                                 request->submitResponse(ret);
     889        2625 :                         }, request->getPool(), config::TAG_REQUEST, request);
     890        2625 :                         shutdownRead();
     891        2625 :                         break;
     892         475 :                 default:
     893         475 :                         if (request) {
     894         475 :                                 perform([&] {
     895         475 :                                         request->submitResponse(ret);
     896         475 :                                 }, request->getPool(), config::TAG_REQUEST, request);
     897             :                         } else {
     898           0 :                                 cancelWithResult(ret);
     899             :                         }
     900         475 :                         shutdownRead();
     901         475 :                         return false;
     902             :                 }
     903             :         }
     904             : 
     905        2625 :         return true;
     906             : }
     907             : 
     908        3218 : bool ConnectionWorker::Client::performWrite() {
     909        3218 :         if (!output || !valid) {
     910        3218 :                 return true;
     911             :         }
     912             : 
     913           0 :         auto ret = output.writeToFd(fd, bytesSent);
     914           0 :         switch (ret) {
     915           0 :         case SUSPENDED:
     916           0 :                 return true;
     917           0 :         case DONE: // eos found
     918             :         case DECLINED: // unrecoverable error
     919           0 :                 shutdownWrite();
     920           0 :                 return false;
     921           0 :         default:
     922           0 :                 break;
     923             :         }
     924             : 
     925           0 :         return true;
     926             : }
     927             : 
     928        3075 : bool ConnectionWorker::Client::write(BufferChain &target, BufferChain &source) {
     929        3075 :         if (output.isEos() || shutdownWriteSend) {
     930           0 :                 return false;
     931             :         }
     932             : 
     933        3075 :         if (!target) {
     934        3075 :                 auto ret = source.writeToFd(fd, bytesSent);
     935        3075 :                 switch (ret) {
     936        3075 :                 case DONE:
     937        3075 :                         shutdownWrite();
     938        3075 :                         return true;
     939             :                         break;
     940           0 :                 case OK:
     941             :                 case SUSPENDED:
     942           0 :                         return true;
     943             :                         break;
     944           0 :                 case DECLINED:
     945           0 :                         shutdownWrite();
     946           0 :                         return false;
     947             :                         break;
     948           0 :                 default:
     949           0 :                         break;
     950             :                 }
     951             : 
     952           0 :                 if (source) {
     953           0 :                         target.write(source);
     954             :                 }
     955             :         } else {
     956           0 :                 target.write(source);
     957             :         }
     958           0 :         return true;
     959             : }
     960             : 
     961     1303950 : bool ConnectionWorker::Client::write(BufferChain &chain, const uint8_t *buf, size_t size, Buffer::Flags flags) {
     962     1303950 :         if (output.isEos() || shutdownWriteSend) {
     963           0 :                 return false;
     964             :         }
     965             : 
     966     1303950 :         if (!chain && &chain == &output) {
     967       82200 :                 auto ret = ::write(fd, buf, size);
     968      164325 :                 while (ret > 0) {
     969       82125 :                         buf += ret; size -= ret;
     970       82125 :                         if (size > 0) {
     971           0 :                                 ret = ::write(fd, buf, size);
     972             :                         } else {
     973       82125 :                                 ret = 0;
     974             :                         }
     975             :                 }
     976             : 
     977       82200 :                 if (ret < 0) {
     978           0 :                         if (errno != EAGAIN && errno != EWOULDBLOCK) {
     979           0 :                                 log::error("ConnectionWorker", "fail to write to client");
     980           0 :                                 size = 0;
     981           0 :                                 flags = Buffer::Flags::Eos;
     982             :                         }
     983             :                 } else {
     984       82200 :                         bytesSent += ret;
     985             :                 }
     986             :         }
     987             : 
     988     1303950 :         if (size > 0) {
     989     1220825 :                 return chain.write(pool, buf, size, flags);
     990       83125 :         } else if (size == 0 && flags != Buffer::None) {
     991         650 :                 if (&chain == &output && (flags & Buffer::Eos) != Buffer::None) {
     992          25 :                         shutdownWrite();
     993          25 :                         return true;
     994             :                 } else {
     995         625 :                         return chain.write(pool, nullptr, 0, flags);
     996             :                 }
     997             :         }
     998       82475 :         return true;
     999             : }
    1000             : 
    1001       82175 : bool ConnectionWorker::Client::write(BufferChain &chain, StringView data, Buffer::Flags flags) {
    1002       82175 :         return write(chain, (const uint8_t *)data.data(), data.size(), flags);
    1003             : }
    1004             : 
    1005           0 : bool ConnectionWorker::Client::write(BufferChain &chain, BytesView data, Buffer::Flags flags) {
    1006           0 :         return write(chain, data.data(), data.size(), flags);
    1007             : }
    1008             : 
    1009          50 : bool ConnectionWorker::Client::writeFile(BufferChain &chain, StringView filename, size_t offset, size_t size, Buffer::Flags flags) {
    1010          50 :         if (output.isEos() || shutdownWriteSend) {
    1011           0 :                 return false;
    1012             :         }
    1013             : 
    1014          50 :         auto buf = Buffer::create(pool, filename, offset, size, bytesSent);
    1015          50 :         buf->flags |= flags;
    1016          50 :         if (!buf) {
    1017           0 :                 return false;
    1018             :         }
    1019             : 
    1020          50 :         if (!chain.write(buf)) {
    1021           0 :                 buf->release();
    1022           0 :                 return false;
    1023             :         }
    1024             : 
    1025          50 :         if (!chain && &chain == &output) {
    1026           0 :                 auto ret = chain.writeToFd(fd, bytesSent);
    1027           0 :                 switch (ret) {
    1028           0 :                 case DONE:
    1029             :                 case SUSPENDED:
    1030           0 :                         return true;
    1031             :                         break;
    1032           0 :                 case DECLINED:
    1033             :                         // eos found
    1034           0 :                         shutdownWrite();
    1035           0 :                         return false;
    1036             :                         break;
    1037           0 :                 default:
    1038           0 :                         break;
    1039             :                 }
    1040             :         }
    1041             : 
    1042          50 :         return true;
    1043             : }
    1044             : 
    1045        3100 : Status ConnectionWorker::Client::runInputFilter(BufferChain &chain) {
    1046        3100 :         if (shutdownReadSend) {
    1047           0 :                 return DECLINED;
    1048             :         }
    1049       23900 :         while (!chain.empty()) {
    1050       23900 :                 switch (requestState) {
    1051        3100 :                 case RequestLine: {
    1052        3100 :                         auto ret = checkForReqeust(chain);
    1053             :                         switch (ret) {
    1054           0 :                         case DECLINED:
    1055           0 :                                 return HTTP_REQUEST_ENTITY_TOO_LARGE;
    1056             :                                 break;
    1057        3100 :                         case DONE:
    1058        3100 :                                 request = gen->worker->readRequest(this, chain);
    1059        3100 :                                 if (!request) {
    1060           0 :                                         shutdownRead();
    1061           0 :                                         return HTTP_BAD_REQUEST;
    1062             :                                 } else {
    1063        3100 :                                         requestState = RequestHeaders;
    1064             :                                 }
    1065        3100 :                                 break;
    1066           0 :                         default:
    1067           0 :                                 break;
    1068             :                         }
    1069        3100 :                         break;
    1070             :                 }
    1071       20150 :                 case RequestHeaders: {
    1072       20150 :                         auto ret = checkForHeader(chain);
    1073             :                         switch (ret) {
    1074           0 :                         case DECLINED:
    1075           0 :                                 return HTTP_REQUEST_ENTITY_TOO_LARGE;
    1076             :                                 break;
    1077       20150 :                         case DONE:
    1078       20150 :                                 ret = gen->worker->parseRequestHeader(request, this, chain);
    1079             :                                 switch (ret) {
    1080           0 :                                 case DECLINED:
    1081           0 :                                         return HTTP_INTERNAL_SERVER_ERROR;
    1082             :                                         break;
    1083       17050 :                                 case OK:
    1084       17050 :                                         break;
    1085        3100 :                                 case DONE:
    1086        3100 :                                         requestState = RequestProcess;
    1087        3100 :                                         break;
    1088           0 :                                 default:
    1089           0 :                                         return ret;
    1090             :                                         break;
    1091             :                                 }
    1092       20150 :                                 break;
    1093           0 :                         default:
    1094           0 :                                 break;
    1095             :                         }
    1096       20150 :                         break;
    1097             :                 }
    1098           0 :                 case RequestProcess: {
    1099           0 :                         return HTTP_INTERNAL_SERVER_ERROR;
    1100             :                         break;
    1101             :                 }
    1102         650 :                 case RequestInput: {
    1103         650 :                         auto ret = request->processInput(chain);
    1104             :                         switch (ret) {
    1105           0 :                         case DECLINED:
    1106           0 :                                 requestState = ReqeustInvalid;
    1107           0 :                                 chain.clear();
    1108           0 :                                 return HTTP_BAD_REQUEST;
    1109             :                                 break;
    1110           0 :                         case OK:
    1111             :                         case SUSPENDED:
    1112           0 :                                 break;
    1113         650 :                         case DONE:
    1114         650 :                                 if (!response.empty()) {
    1115         625 :                                         write(response, nullptr, 0, Buffer::Flags::Eos);
    1116             :                                 }
    1117         650 :                                 requestState = ReqeustInvalid;
    1118         650 :                                 return DONE;
    1119             :                                 break;
    1120           0 :                         default:
    1121           0 :                                 requestState = ReqeustInvalid;
    1122           0 :                                 chain.clear();
    1123           0 :                                 return ret;
    1124             :                                 break;
    1125             :                         }
    1126           0 :                         break;
    1127             :                 }
    1128           0 :                 case ReqeustClosed:
    1129           0 :                         requestState = ReqeustInvalid;
    1130           0 :                         return DONE;
    1131             :                         break;
    1132           0 :                 case ReqeustInvalid:
    1133           0 :                         return DECLINED;
    1134             :                         break;
    1135             :                 }
    1136       23250 :                 if (requestState == RequestProcess) {
    1137        3100 :                         auto ret = gen->worker->processRequest(request);
    1138        3100 :                         switch (ret) {
    1139         650 :                         case OK:
    1140             :                         case SUSPENDED:
    1141         650 :                                 if (request->getInputFilter() && request->getInfo().contentLength > 0) {
    1142         650 :                                         perform([&] {
    1143         650 :                                                 ret = request->getInputFilter()->init();
    1144         650 :                                         }, request->getInputFilter()->getPool());
    1145         650 :                                         if (ret == OK) {
    1146         650 :                                                 requestState = RequestInput;
    1147             :                                         }
    1148             :                                 }
    1149         650 :                                 break;
    1150        2450 :                         default:
    1151        2450 :                                 return ret;
    1152             :                                 break;
    1153             :                         }
    1154             :                 }
    1155             :         }
    1156           0 :         return SUSPENDED;
    1157             : }
    1158             : 
    1159        3100 : Status ConnectionWorker::Client::checkForReqeust(BufferChain &chain) {
    1160        3100 :         bool found = false;
    1161        6200 :         return chain.read([&] (const Buffer *buf, const uint8_t *b, size_t s) {
    1162        6200 :                 if (found) {
    1163        3100 :                         return int(DONE);
    1164             :                 }
    1165             : 
    1166        3100 :                 if (buf->absolute > config::UNIX_MAX_REQUEST_LINE) {
    1167           0 :                         return int(DECLINED);
    1168             :                 }
    1169             : 
    1170        3100 :                 StringView r((const char *)b, std::min(s, config::UNIX_MAX_REQUEST_LINE - buf->absolute));
    1171        3100 :                 r.skipUntil<StringView::Chars<'\n'>>();
    1172        3100 :                 if (r.is('\n')) {
    1173        3100 :                         found = true;
    1174        3100 :                         ++ r;
    1175        3100 :                         return int((const uint8_t *)r.data() - b);
    1176             :                 }
    1177             : 
    1178           0 :                 if (buf->absolute + s >= config::UNIX_MAX_REQUEST_LINE) {
    1179           0 :                         return int(DECLINED);
    1180             :                 }
    1181             : 
    1182           0 :                 return int((const uint8_t *)r.data() - b);
    1183        6200 :         }, false);
    1184             : }
    1185             : 
    1186       20150 : Status ConnectionWorker::Client::checkForHeader(BufferChain &chain) {
    1187       20150 :         bool found = false;
    1188       20150 :         auto ret = chain.read([&, this] (const Buffer *buf, const uint8_t *b, size_t s) {
    1189       37850 :                 if (found) {
    1190       17700 :                         return int(DONE);
    1191             :                 }
    1192             : 
    1193       20150 :                 if (buf->absolute > config::UNIX_MAX_HEADER_LINE + bytesRead) {
    1194           0 :                         return int(DECLINED);
    1195             :                 }
    1196             : 
    1197       20150 :                 StringView r((const char *)b, std::min(s, config::UNIX_MAX_REQUEST_LINE - buf->absolute));
    1198       20150 :                 r.skipUntil<StringView::Chars<'\n'>>();
    1199       20150 :                 if (r.is('\n')) {
    1200       20150 :                         found = true;
    1201       20150 :                         ++ r;
    1202       20150 :                         return int((const uint8_t *)r.data() - b);
    1203             :                 }
    1204             : 
    1205           0 :                 if ((buf->absolute + s) >= config::UNIX_MAX_REQUEST_LINE + bytesRead) {
    1206           0 :                         return int(DECLINED);
    1207             :                 }
    1208             : 
    1209           0 :                 return int((const uint8_t *)r.data() - b);
    1210             :         }, false);
    1211       20150 :         if (ret == OK) {
    1212        2450 :                 return found ? DONE : OK;
    1213             :         }
    1214       17700 :         return ret;
    1215             : }
    1216             : 
    1217           0 : void ConnectionWorker::Client::cancelWithResult(Status status) {
    1218           0 :         if (output.isEos() || shutdownWriteSend) {
    1219           0 :                 return;
    1220             :         }
    1221             : 
    1222           0 :         perform([&, this] {
    1223           0 :                 Time date = Time::now();
    1224             :                 Value result {
    1225           0 :                         pair("date", Value(date.toMicros())),
    1226           0 :                         pair("status", Value(toInt(status)))
    1227           0 :                 };
    1228             : 
    1229           0 :                 auto data = data::write<Interface>(result, data::EncodeFormat::Json);
    1230             : 
    1231           0 :                 StringView crlf("\r\n");
    1232           0 :                 StringView statusLine = getStatusLine(status);
    1233           0 :                 if (statusLine.empty()) {
    1234           0 :                         statusLine = getStatusLine(HTTP_INTERNAL_SERVER_ERROR);
    1235             :                 }
    1236             : 
    1237           0 :                 sp_time_exp_t xt(date);
    1238           0 :                 char dateBuf[30] = { 0 };
    1239           0 :                 xt.encodeRfc822(dateBuf);
    1240             : 
    1241           0 :                 auto outFn = [&, this] (StringView str) {
    1242           0 :                         write(output, str);
    1243           0 :                 };
    1244             : 
    1245           0 :                 auto out = Callback<void(StringView)>(outFn);
    1246             : 
    1247           0 :                 out << StringView("HTTP/1.1 ") << statusLine << crlf;
    1248           0 :                 out << StringView("Date: ") << dateBuf << crlf;
    1249           0 :                 out << StringView("Connection: close\r\n");
    1250           0 :                 out << StringView("Server: ") << gen->worker->getRoot()->getServerNameLine() << crlf;
    1251           0 :                 out << StringView("Content-Length: ") << data.size() << crlf;
    1252           0 :                 out << StringView("Content-Type: application/json; charset=utf-8\r\n");
    1253           0 :                 out << crlf;
    1254             : 
    1255           0 :                 write(output, data, Buffer::Flags::Eos);
    1256           0 :         }, pool);
    1257           0 :         shutdownWrite();
    1258             : }
    1259             : 
    1260          50 : ConnectionWorker::Generation::Generation(ConnectionWorker *w, pool_t *p)
    1261          50 : : pool(p), worker(w) { }
    1262             : 
    1263        3100 : ConnectionWorker::Client *ConnectionWorker::Generation::pushFd(int fd, StringView addr, uint16_t port) {
    1264        3100 :         auto clientPool = pool::create(pool);
    1265        3100 :         ConnectionWorker::Client *ret = nullptr;
    1266             : 
    1267        3100 :         auto memBlock = pool::palloc(clientPool, sizeof(Client));
    1268        3100 :         ret = new (memBlock) Client(this, clientPool, fd, addr, port);
    1269             : 
    1270        3100 :         ret->next = active;
    1271        3100 :         ret->prev = nullptr;
    1272        3100 :         if (active) { active->prev = ret; }
    1273        3100 :         active = ret;
    1274             : 
    1275        3100 :         ++ activeClients;
    1276             : 
    1277        3100 :         return ret;
    1278             : }
    1279             : 
    1280        3100 : void ConnectionWorker::Generation::releaseClient(Client *client) {
    1281        3100 :         if (client == active) {
    1282        3100 :                 active = client->next;
    1283        3100 :                 if (active) { active->prev = nullptr; }
    1284             :         } else {
    1285           0 :                 if (client->prev) { client->prev->next = client->next; }
    1286           0 :                 if (client->next) { client->next->prev = client->prev; }
    1287             :         }
    1288             : 
    1289        3100 :         -- activeClients;
    1290        3100 : }
    1291             : 
    1292          50 : void ConnectionWorker::Generation::releaseAll() {
    1293          49 :         while (active) {
    1294           0 :                 releaseClient(active);
    1295             :         }
    1296          49 : }
    1297             : 
    1298          50 : ConnectionWorker::Generation *ConnectionWorker::makeGeneration() {
    1299          50 :         auto p = pool::create(_threadPool);
    1300          50 :         return new (p) Generation(this, p);
    1301             : }
    1302             : 
    1303             : }

Generated by: LCOV version 1.14