Line data Source code
1 : /**
2 : Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #include "SPWebUnixConfig.h"
24 : #include "SPWebUnixConnectionWorker.h"
25 : #include "SPWebUnixConnectionQueue.h"
26 : #include "SPWebUnixRequest.h"
27 : #include "SPWebRequestFilter.h"
28 : #include "SPWebInputFilter.h"
29 :
30 : #include <sys/signalfd.h>
31 : #include <sys/eventfd.h>
32 : #include <sys/ioctl.h>
33 : #include <sys/sendfile.h>
34 : #include <sys/types.h>
35 : #include <sys/un.h>
36 : #include <sys/socket.h>
37 : #include <arpa/inet.h>
38 : #include <netinet/in.h>
39 : #include <unistd.h>
40 : #include <fcntl.h>
41 :
42 : namespace STAPPLER_VERSIONIZED stappler::web {
43 :
44 0 : static StringView s_getSignalName(int sig) {
45 0 : switch (sig) {
46 0 : case SIGINT: return "SIGINT";
47 0 : case SIGILL: return "SIGILL";
48 0 : case SIGABRT: return "SIGABRT";
49 0 : case SIGFPE: return "SIGFPE";
50 0 : case SIGSEGV: return "SIGSEGV";
51 0 : case SIGTERM: return "SIGTERM";
52 0 : case SIGHUP: return "SIGHUP";
53 0 : case SIGQUIT: return "SIGQUIT";
54 0 : case SIGTRAP: return "SIGTRAP";
55 0 : case SIGKILL: return "SIGKILL";
56 0 : case SIGBUS: return "SIGBUS";
57 0 : case SIGSYS: return "SIGSYS";
58 0 : case SIGPIPE: return "SIGPIPE";
59 0 : case SIGALRM: return "SIGALRM";
60 0 : case SIGURG: return "SIGURG";
61 0 : case SIGSTOP: return "SIGSTOP";
62 0 : case SIGTSTP: return "SIGTSTP";
63 0 : case SIGCONT: return "SIGCONT";
64 0 : case SIGCHLD: return "SIGCHLD";
65 0 : case SIGTTIN: return "SIGTTIN";
66 0 : case SIGTTOU: return "SIGTTOU";
67 0 : case SIGPOLL: return "SIGPOLL";
68 0 : case SIGXCPU: return "SIGXCPU";
69 0 : case SIGXFSZ: return "SIGXFSZ";
70 0 : case SIGVTALRM: return "SIGVTALRM";
71 0 : case SIGPROF: return "SIGPROF";
72 0 : case SIGUSR1: return "SIGUSR1";
73 0 : case SIGUSR2: return "SIGUSR2";
74 0 : default: return "(unknown)";
75 : }
76 : return StringView();
77 : }
78 :
79 200 : ConnectionWorker::ConnectionWorker(ConnectionQueue *queue, UnixRoot *h, int socket, int pipe, int event)
80 400 : : _queue(queue), _root(h)
81 200 : , _inputClient(socket, EPOLLIN | EPOLLEXCLUSIVE)
82 200 : , _cancelClient(pipe, EPOLLIN | EPOLLET)
83 200 : , _eventClient(event, EPOLLIN | EPOLLET | EPOLLEXCLUSIVE)
84 400 : ,_thread(ConnectionWorker::workerThread, this, queue) {
85 200 : _queue->retain();
86 200 : }
87 :
88 400 : ConnectionWorker::~ConnectionWorker() {
89 200 : _queue->release();
90 400 : }
91 :
92 197 : void ConnectionWorker::threadInit() {
93 197 : _threadAlloc = allocator::create();
94 199 : _threadPool = pool::create(_threadAlloc);
95 :
96 200 : _threadId = std::this_thread::get_id();
97 :
98 : sigset_t sigset;
99 200 : sigfillset(&sigset);
100 :
101 200 : _signalFd = ::signalfd(-1, &sigset, 0);
102 200 : ConnectionQueue::setNonblocking(_signalFd);
103 :
104 200 : _epollFd = epoll_create1(0);
105 :
106 200 : addClient(_inputClient);
107 200 : addClient(_cancelClient);
108 200 : addClient(_eventClient);
109 200 : }
110 :
111 181 : void ConnectionWorker::threadDispose() {
112 181 : if (_signalFd >= 0) {
113 183 : close(_signalFd);
114 195 : _signalFd = -1;
115 : }
116 193 : if (_epollFd >= 0) {
117 195 : close(_epollFd);
118 197 : _epollFd = -1;
119 : }
120 :
121 195 : pool::destroy(_threadPool);
122 168 : allocator::destroy(_threadAlloc);
123 183 : }
124 :
125 386 : bool ConnectionWorker::worker() {
126 386 : if (_shouldClose) {
127 186 : return false;
128 : }
129 :
130 200 : while (poll(_epollFd)) {
131 : struct signalfd_siginfo si;
132 0 : int nr = ::read(_signalFd, &si, sizeof si);
133 0 : while (nr == sizeof si) {
134 0 : if (si.ssi_signo != SIGINT) {
135 0 : log::error("ConnectionWorker", "epoll_wait() exit with signal: ", si.ssi_signo, " ", s_getSignalName(si.ssi_signo));
136 : }
137 0 : nr = ::read(_signalFd, &si, sizeof si);
138 : }
139 : }
140 :
141 184 : return true;
142 : }
143 :
144 200 : bool ConnectionWorker::poll(int epollFd) {
145 : std::array<struct epoll_event, ConnectionWorker::MaxEvents> _events;
146 :
147 6807 : while (!_shouldClose) {
148 6619 : int nevents = epoll_wait(epollFd, _events.data(), ConnectionWorker::MaxEvents, -1);
149 6617 : if (nevents == -1 && errno != EINTR) {
150 0 : char buf[256] = { 0 };
151 0 : log::error("ConnectionWorker", "epoll_wait() failed with errno ", errno, " (", strerror_r(errno, buf, 255), ")");
152 0 : return false;
153 6617 : } else if (errno == EINTR) {
154 0 : return true;
155 : }
156 :
157 : /// process high-priority events
158 13227 : for (int i = 0; i < nevents; i++) {
159 6617 : Client *client = static_cast<Client *>(_events[i].data.ptr);
160 :
161 6610 : if ((_events[i].events & EPOLLIN)) {
162 6491 : if (client == &_eventClient) {
163 101 : uint64_t value = 0;
164 101 : auto sz = read(_eventClient.fd, &value, sizeof(uint64_t));
165 101 : if (sz == 8 && value) {
166 100 : auto ev = _queue->popTask();
167 100 : if (value > 1) {
168 2 : value -= 1;
169 : // forward event to another worker
170 2 : write(_eventClient.fd, &value, sizeof(uint64_t));
171 : }
172 100 : if (ev) {
173 100 : runTask(ev);
174 : }
175 : }
176 : }
177 : }
178 : }
179 :
180 10116 : for (int i = 0; i < nevents; i++) {
181 6609 : Client *client = static_cast<Client *>(_events[i].data.ptr);
182 6606 : if ((_events[i].events & EPOLLERR)) {
183 0 : if (client == &_inputClient) {
184 0 : log::error("ConnectionWorker", "epoll error on server socket ", client->fd);
185 0 : _shouldClose = true;
186 : } else {
187 0 : log::error("ConnectionWorker", "epoll error on socket ", client->fd);
188 0 : client->gen->releaseClient(client);
189 : }
190 0 : continue;
191 : }
192 :
193 6608 : if ((_events[i].events & EPOLLIN)) {
194 6487 : if (client == &_inputClient) {
195 : char str[INET_ADDRSTRLEN + 1];
196 :
197 : struct sockaddr client_addr;
198 3100 : socklen_t client_addr_len = sizeof(client_addr);
199 3100 : int newClient = ::accept(_inputClient.fd, &client_addr, &client_addr_len);
200 6200 : while (newClient != -1) {
201 3100 : struct sockaddr_in* pV4Addr = (struct sockaddr_in*)&client_addr;
202 3100 : struct in_addr ipAddr = pV4Addr->sin_addr;
203 :
204 3100 : memset(str, 0, INET_ADDRSTRLEN + 1);
205 3100 : inet_ntop( AF_INET, &ipAddr, str, INET_ADDRSTRLEN );
206 :
207 3100 : pushFd(epollFd, newClient, StringView(str), pV4Addr->sin_port);
208 3100 : newClient = ::accept(_inputClient.fd, &client_addr, &client_addr_len);
209 : }
210 3100 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
211 : // we processed all of the connections
212 : break;
213 : } else {
214 0 : log::error("ConnectionWorker", "accept() failed");
215 0 : break;
216 : }
217 3387 : } else if (client == &_cancelClient) {
218 : //onError("Received end signal");
219 189 : _shouldClose = true;
220 3198 : } else if (client == &_eventClient) {
221 : // do nothing
222 : } else {
223 3100 : client->performRead();
224 : }
225 : }
226 :
227 3504 : if ((_events[i].events & EPOLLOUT) && !client->system) {
228 3218 : client->performWrite();
229 : }
230 :
231 3502 : if ((_events[i].events & EPOLLHUP) || (_events[i].events & EPOLLRDHUP)) {
232 0 : if (client != &_eventClient && client != &_cancelClient) {
233 0 : removeClient(*client);
234 : }
235 : }
236 :
237 3508 : if (!client->system && !client->valid) {
238 0 : client->shutdownAll();
239 : }
240 :
241 3508 : if (client->shutdownReadSend && client->shutdownWriteSend) {
242 3100 : removeClient(*client);
243 : }
244 : }
245 : }
246 :
247 188 : if (_shouldClose) {
248 184 : auto gen = _generation;
249 233 : while (gen) {
250 49 : gen->releaseAll();
251 49 : gen = _generation->prev;
252 : }
253 : }
254 :
255 188 : return !_shouldClose;
256 : }
257 :
258 100 : void ConnectionWorker::runTask(AsyncTask *task) {
259 100 : auto host = task->getHost();
260 100 : perform([&] {
261 100 : AsyncTask::run(task);
262 100 : }, task->pool(), config::TAG_HOST, host.getController());
263 100 : _queue->releaseTask(task);
264 100 : }
265 :
266 3100 : UnixRequestController *ConnectionWorker::readRequest(Client *client, BufferChain &chain) {
267 3100 : RequestInfo info;
268 :
269 3100 : StringView str;
270 3100 : pool_t *tmpPool = nullptr;
271 :
272 3100 : if (chain.isSingle()) {
273 3025 : str = StringView((const char *)chain.front->buf, chain.getBytesRead());
274 : } else {
275 75 : tmpPool = pool::create(client->pool);
276 :
277 75 : auto data = chain.extract(tmpPool, 0, std::min(chain.getBytesRead(), config::UNIX_MAX_REQUEST_LINE));
278 75 : str = data.toStringView();
279 : }
280 :
281 3100 : auto requestLineSize = RequestFilter::readRequestLine(str, info);
282 3100 : if (requestLineSize == 0) {
283 0 : if (tmpPool) {
284 0 : pool::destroy(tmpPool);
285 : }
286 0 : client->cancelWithResult(HTTP_INTERNAL_SERVER_ERROR);
287 0 : return nullptr;
288 : }
289 :
290 3100 : UnixRequestController *cfg = nullptr;
291 3100 : auto p = pool::create(_threadPool);
292 3100 : perform([&] {
293 3100 : cfg = new (p) UnixRequestController(p, info.clone(p), client);
294 3100 : }, p);
295 :
296 3100 : if (tmpPool) {
297 75 : pool::destroy(tmpPool);
298 : }
299 :
300 3100 : client->bytesRead = chain.getBytesRead();
301 3100 : chain.releaseEmpty();
302 :
303 3100 : return cfg;
304 3100 : }
305 :
306 20150 : Status ConnectionWorker::parseRequestHeader(UnixRequestController *cfg, Client *client, BufferChain &chain) {
307 20150 : StringView name;
308 20150 : StringView value;
309 :
310 20150 : BytesView str;
311 20150 : pool_t *tmpPool = nullptr;
312 20150 : size_t headerLen = chain.getBytesRead() - client->bytesRead;
313 :
314 20150 : if (chain.isSingle()) {
315 19550 : str = BytesView(chain.front->buf + (client->bytesRead - chain.front->absolute), headerLen);
316 : } else {
317 600 : tmpPool = pool::create(client->pool);
318 :
319 600 : str = chain.extract(tmpPool, client->bytesRead, std::min(headerLen, config::UNIX_MAX_HEADER_LINE));
320 : }
321 :
322 20150 : if (str.size() == 2 && str.equals((const uint8_t *)"\r\n", 2)) {
323 3100 : client->bytesRead += headerLen;
324 3100 : chain.releaseEmpty();
325 3100 : return DONE;
326 : }
327 :
328 17050 : auto headerLineLength = RequestFilter::readRequestHeader(str, name, value);
329 17050 : if (headerLineLength == 0) {
330 0 : if (tmpPool) {
331 0 : pool::destroy(tmpPool);
332 : }
333 0 : return HTTP_INTERNAL_SERVER_ERROR;
334 : }
335 :
336 17050 : perform([&] {
337 17050 : cfg->setRequestHeader(name, value);
338 17050 : }, cfg->getPool());
339 :
340 17050 : if (tmpPool) {
341 525 : pool::destroy(tmpPool);
342 : }
343 :
344 17050 : client->bytesRead = chain.getBytesRead();
345 17050 : chain.releaseEmpty();
346 17050 : return OK;
347 : }
348 :
349 3100 : Status ConnectionWorker::processRequest(UnixRequestController *req) {
350 6200 : return perform([&, this] {
351 3100 : auto ret = _root->processRequest(req);
352 3100 : switch (ret) {
353 1975 : case DONE:
354 1975 : break;
355 0 : case DECLINED:
356 0 : break;
357 1125 : case SUSPENDED:
358 : case OK:
359 : default:
360 1125 : break;
361 : }
362 3100 : return ret;
363 6200 : }, req->getPool(), config::TAG_REQUEST, req);
364 : }
365 :
366 3100 : void ConnectionWorker::pushFd(int epollFd, int fd, StringView addr, uint16_t port) {
367 3100 : if (!_generation) {
368 50 : _generation = makeGeneration();
369 : }
370 :
371 3100 : auto c = _generation->pushFd(fd, addr, port);
372 3100 : if (addClient(*c)) {
373 3100 : ++ _fdCount;
374 : } else {
375 0 : c->gen->releaseClient(c);
376 : }
377 3100 : }
378 :
379 3699 : bool ConnectionWorker::addClient(Client &client) {
380 3699 : if (client.fd < 0) {
381 0 : return false;
382 : }
383 :
384 3699 : int err = ::epoll_ctl(_epollFd, EPOLL_CTL_ADD, client.fd, &client.event);
385 3697 : if (err == -1) {
386 0 : char buf[256] = { 0 };
387 0 : log::error("ConnectionWorker", "Failed to add client epoll_ctl("
388 0 : , client.fd, ", EPOLL_CTL_ADD): ", strerror_r(errno, buf, 255));
389 0 : return false;
390 : }
391 3697 : return true;
392 : }
393 :
394 3100 : void ConnectionWorker::removeClient(Client &client) {
395 3100 : if (client.fd < 0) {
396 0 : return;
397 : }
398 :
399 3100 : int err = ::epoll_ctl(_epollFd, EPOLL_CTL_DEL, client.fd, &client.event);
400 3100 : if (err == -1) {
401 0 : char buf[256] = { 0 };
402 0 : log::error("ConnectionWorker", "Failed to remove client epoll_ctl("
403 0 : , client.fd, ", EPOLL_CTL_DEL): ", strerror_r(errno, buf, 255));
404 : }
405 :
406 3100 : client.release();
407 : }
408 :
409 6425 : ConnectionWorker::Buffer *ConnectionWorker::Buffer::create(pool_t *p, size_t abs) {
410 6425 : auto requestSize = config::UNIX_CLIENT_BUFFER_SIZE;
411 6425 : auto block = pool::palloc(p, requestSize);
412 :
413 6425 : auto b = new (block) Buffer();
414 :
415 6425 : b->next = nullptr;
416 6425 : b->pool = p;
417 6425 : b->buf = (uint8_t *)block + sizeof(Buffer);
418 6425 : b->capacity = requestSize - sizeof(Buffer);
419 6425 : b->size = 0;
420 6425 : b->offset = 0;
421 6425 : b->absolute = abs;
422 :
423 6425 : return b;
424 : }
425 :
426 50 : ConnectionWorker::Buffer *ConnectionWorker::Buffer::create(pool_t *p, StringView path, off_t rangeStart, size_t rangeLen, size_t abs) {
427 50 : if (!filesystem::exists(path)) {
428 0 : return nullptr;
429 : }
430 :
431 50 : auto requestSize = config::UNIX_CLIENT_BUFFER_SIZE;
432 50 : auto block = pool::palloc(p, requestSize);
433 :
434 50 : auto b = new (block) Buffer();
435 :
436 50 : b->next = nullptr;
437 50 : b->pool = p;
438 50 : b->buf = (uint8_t *)block + sizeof(Buffer);
439 50 : b->capacity = requestSize - sizeof(Buffer);
440 50 : b->offset = rangeStart;
441 50 : b->absolute = abs;
442 50 : b->flags |= IsOutFile;
443 :
444 50 : BufferFile *file = new (b->buf) BufferFile;
445 50 : filesystem::stat(path, file->stat);
446 50 : file->fd = ::open(path.data(), O_RDONLY);
447 50 : file->extraBuffer = b->buf + sizeof(BufferFile);
448 50 : b->capacity -= sizeof(BufferFile);
449 :
450 50 : b->size = std::min(rangeLen, file->stat.size - rangeStart);
451 :
452 50 : return b;
453 : }
454 :
455 5825 : void ConnectionWorker::Buffer::release() {
456 5825 : if (auto f = getFile()) {
457 50 : if (f->fd >= 0) {
458 50 : ::close(f->fd);
459 50 : f->fd = -1;
460 : }
461 : }
462 5825 : pool::free(pool, this, capacity + sizeof(this));
463 5825 : }
464 :
465 0 : StringView ConnectionWorker::Buffer::str() const {
466 0 : if (isOutFile()) {
467 0 : return StringView();
468 : }
469 :
470 0 : return StringView(reinterpret_cast<const char *>(buf + offset), size - offset);
471 : }
472 :
473 2443225 : size_t ConnectionWorker::Buffer::availableForWrite() const {
474 2443225 : if (isOutFile()) {
475 0 : return 0;
476 : }
477 :
478 2443225 : return capacity - size;
479 : }
480 :
481 107375 : size_t ConnectionWorker::Buffer::availableForRead() const {
482 107375 : return size - offset;
483 : }
484 :
485 1224675 : uint8_t *ConnectionWorker::Buffer::writeTarget() const {
486 1224675 : if (isOutFile()) {
487 0 : return nullptr;
488 : }
489 :
490 1224675 : return buf + size;
491 : }
492 :
493 48075 : uint8_t *ConnectionWorker::Buffer::readSource() const {
494 48075 : if (isOutFile()) {
495 50 : return nullptr;
496 : }
497 :
498 48025 : return buf + offset;
499 : }
500 :
501 1221375 : size_t ConnectionWorker::Buffer::write(const uint8_t *b, size_t s) {
502 1221375 : if (isOutFile()) {
503 0 : return 0;
504 : }
505 :
506 1221375 : auto writeSize = std::min(availableForWrite(), s);
507 1221375 : memcpy(writeTarget(), b, writeSize);
508 1221375 : size += writeSize;
509 1221375 : return writeSize;
510 : }
511 :
512 9000 : ConnectionWorker::BufferFile *ConnectionWorker::Buffer::getFile() const {
513 9000 : if (isOutFile()) {
514 100 : return (ConnectionWorker::BufferFile *)buf;
515 : }
516 8900 : return nullptr;
517 : }
518 :
519 2531625 : bool ConnectionWorker::BufferChain::isEos() const {
520 2531625 : if (eos || (back && (back->flags & Buffer::Eos) != Buffer::None)) {
521 675 : return true;
522 : }
523 2530950 : return false;
524 : }
525 :
526 30750 : bool ConnectionWorker::BufferChain::empty() const {
527 30750 : if (!back || back->availableForRead() == 0) {
528 525 : return true;
529 : }
530 30225 : return false;
531 : }
532 :
533 3100 : size_t ConnectionWorker::BufferChain::size() const {
534 3100 : size_t ret = 0;
535 3100 : auto b = front;
536 6275 : while (b) {
537 3175 : ret += b->size;
538 3175 : b = b->next;
539 : }
540 3100 : return ret;
541 : }
542 :
543 1224675 : ConnectionWorker::Buffer *ConnectionWorker::BufferChain::getWriteTarget(pool_t *pool) {
544 1224675 : if (back && back->availableForWrite() > 0) {
545 1218250 : return back;
546 : } else {
547 6425 : auto b = Buffer::create(pool, back ? back->absolute + back->capacity : 0);
548 :
549 6425 : if (back) {
550 300 : back->next = b;
551 300 : back = b;
552 300 : tail = &(back->next);
553 : } else {
554 6125 : front = back = b;
555 6125 : tail = &(back->next);
556 : }
557 :
558 6425 : return b;
559 : }
560 : }
561 :
562 1224550 : bool ConnectionWorker::BufferChain::write(pool_t *p, const uint8_t *buf, size_t size, Buffer::Flags flags) {
563 1224550 : Buffer *targetBuffer = nullptr;
564 1224550 : size_t written = 0;
565 :
566 1224550 : if (isEos()) {
567 675 : return false;
568 : }
569 :
570 1223875 : if (size == 0 && flags != Buffer::Flags::None) {
571 2600 : if (back) {
572 2575 : back->flags |= flags;
573 : } else {
574 25 : if ((flags & Buffer::Eos) != Buffer::None) {
575 25 : eos = true;
576 : }
577 : }
578 2600 : return true;
579 : }
580 :
581 2442650 : while (size > 0) {
582 1221375 : targetBuffer = getWriteTarget(p);
583 :
584 1221375 : written = targetBuffer->write(buf, size);
585 1221375 : buf += written; size -= written;
586 : }
587 :
588 1221275 : if (targetBuffer && flags != Buffer::Flags::None) {
589 450 : targetBuffer->flags |= flags;
590 : }
591 :
592 1221275 : return true;
593 : }
594 :
595 50 : bool ConnectionWorker::BufferChain::write(Buffer *newBuf) {
596 50 : if (back) {
597 0 : back->next = newBuf;
598 0 : back = newBuf;
599 : } else {
600 50 : front = back = newBuf;
601 : }
602 50 : tail = &(back->next);
603 50 : return true;
604 : }
605 :
606 0 : bool ConnectionWorker::BufferChain::write(BufferChain &chain) {
607 0 : if (back) {
608 0 : back->next = chain.front;
609 0 : back = chain.back;
610 : } else {
611 0 : front = chain.front;
612 0 : back = chain.back;
613 : }
614 0 : tail = &(back->next);
615 :
616 0 : chain.front = chain.back = nullptr;
617 0 : chain.tail = nullptr;
618 :
619 0 : return true;
620 : }
621 :
622 3100 : bool ConnectionWorker::BufferChain::readFromFd(pool_t *p, int fd) {
623 3100 : int sz = 0;
624 3100 : ::ioctl (fd, FIONREAD, &sz);
625 :
626 6400 : while (sz > 0) {
627 3300 : Buffer *targetBuffer = getWriteTarget(p);
628 :
629 3300 : sz = ::read(fd, targetBuffer->writeTarget(), targetBuffer->availableForWrite());
630 3300 : if (sz > 0) {
631 3300 : targetBuffer->size += sz;
632 :
633 3300 : sz = ::read(fd, nullptr, 0);
634 3300 : if (sz == 0) {
635 3300 : ::ioctl (fd, FIONREAD, &sz);
636 : }
637 : }
638 :
639 3300 : if (sz == -1) {
640 0 : if (errno != EAGAIN && errno != EWOULDBLOCK) {
641 0 : char tmp[256] = { 0 };
642 0 : log::error("ConnectionWorker", "Fail to read from client: ", strerror_r(errno, tmp, 255));
643 0 : return false;
644 : }
645 : }
646 : }
647 :
648 3100 : return true;
649 : }
650 :
651 26975 : Status ConnectionWorker::BufferChain::read(const Callback<int(const Buffer *, const uint8_t *, size_t)> &cb, bool release) {
652 26975 : auto buf = front;
653 29725 : while (buf) {
654 27275 : auto size = buf->availableForRead();
655 27275 : auto ret = cb(buf, buf->readSource(), size);
656 53900 : while (size > 0 && ret > 0) {
657 26625 : buf->offset += ret;
658 26625 : size = buf->availableForRead();
659 26625 : if (size > 0) {
660 20800 : ret = cb(buf, buf->readSource(), size);
661 : }
662 : }
663 :
664 27275 : if (size == 0) {
665 5825 : if ((buf->flags & Buffer::Eos) != Buffer::None) {
666 3075 : eos = true;
667 : }
668 5825 : if (release) {
669 3375 : auto f = front;
670 3375 : front = front->next;
671 3375 : if (&f->next ==tail) {
672 3075 : tail = nullptr;
673 : }
674 3375 : if (f == back) {
675 3075 : back = nullptr;
676 : }
677 3375 : f->release();
678 3375 : buf = front;
679 : } else {
680 2450 : buf = buf->next;
681 : }
682 : }
683 :
684 27275 : if (ret == DONE || eos) {
685 24525 : return DONE;
686 2750 : } else if (ret == SUSPENDED) {
687 0 : return SUSPENDED;
688 2750 : } else if (ret == DECLINED) {
689 0 : return DECLINED;
690 : }
691 : }
692 2450 : return OK;
693 : }
694 :
695 3075 : Status ConnectionWorker::BufferChain::writeToFd(int fd, size_t &bytesWritten) {
696 6150 : return read([&] (const Buffer *source, const uint8_t *buf, size_t size) {
697 3175 : ssize_t ret = 0;
698 3175 : if (auto f = source->getFile()) {
699 50 : off_t offset = source->offset;
700 50 : ret = ::sendfile(fd, f->fd, &offset, source->size);
701 : } else {
702 3125 : ret = ::write(fd, buf, size);
703 : }
704 3175 : if (ret >= 0) {
705 3175 : bytesWritten += ret;
706 3175 : return int(ret);
707 : } else {
708 0 : if (errno != EAGAIN && errno != EWOULDBLOCK) {
709 0 : return int(SUSPENDED);
710 : }
711 0 : return int(DECLINED);
712 : }
713 6150 : }, true);
714 : }
715 :
716 44075 : size_t ConnectionWorker::BufferChain::getBytesRead() const {
717 44075 : if (front) {
718 44075 : return front->absolute + front->offset;
719 : }
720 0 : return 0;
721 : }
722 :
723 675 : BytesView ConnectionWorker::BufferChain::extract(pool_t *pool, size_t initOffset, size_t blockSize) const {
724 675 : if (!front) {
725 0 : return BytesView();
726 : }
727 :
728 675 : if (initOffset + blockSize > getBytesRead()) {
729 0 : return BytesView();
730 : }
731 :
732 675 : if (front && front->absolute > initOffset) {
733 0 : return BytesView();
734 : }
735 :
736 675 : size_t targetSize = blockSize;
737 675 : size_t writeSize = blockSize;
738 675 : auto block = (uint8_t *)pool::alloc(pool, targetSize);
739 675 : auto target = block;
740 :
741 675 : Buffer *first = front;
742 675 : while (first->absolute + first->capacity < initOffset) {
743 0 : first = first->next;
744 : }
745 :
746 675 : initOffset -= first->absolute;
747 :
748 1350 : while (writeSize > 0 && first) {
749 675 : auto copySize = std::min(first->capacity - initOffset, writeSize);
750 675 : ::memcpy(target, first->buf + initOffset, copySize);
751 675 : initOffset = 0;
752 675 : writeSize -= copySize;
753 675 : target += copySize;
754 675 : first = first->next;
755 : }
756 :
757 675 : return BytesView(block, targetSize);
758 : }
759 :
760 : /*void ConnectionWorker::BufferChain::release(size_t offset) {
761 : while (front && offset > 0) {
762 : if (front->size < front->offset + offset) {
763 : offset -= (front->size - front->offset);
764 :
765 : auto f = front;
766 : front = front->next;
767 : if (&f->next ==tail) {
768 : tail = nullptr;
769 : }
770 : if (f == back) {
771 : back = nullptr;
772 : }
773 : f->release();
774 : } else {
775 : front->offset += offset;
776 : offset = 0;
777 : }
778 : }
779 : }*/
780 :
781 23250 : void ConnectionWorker::BufferChain::releaseEmpty() {
782 25700 : while (front && front->availableForRead() == 0) {
783 2450 : auto f = front;
784 2450 : front = front->next;
785 2450 : if (&f->next ==tail) {
786 2450 : tail = nullptr;
787 : }
788 2450 : if (f == back) {
789 2450 : back = nullptr;
790 : }
791 2450 : f->release();
792 : }
793 23250 : }
794 :
795 0 : void ConnectionWorker::BufferChain::clear() {
796 0 : while (front) {
797 0 : auto f = front;
798 0 : front = front->next;
799 0 : if (&f->next ==tail) {
800 0 : tail = nullptr;
801 : }
802 0 : if (f == back) {
803 0 : back = nullptr;
804 : }
805 0 : f->release();
806 : }
807 0 : }
808 :
809 3100 : ConnectionWorker::Client::Client(Generation *g, pool_t *p, int ifd, StringView inAddr, uint16_t inPort)
810 3100 : : gen(g), pool(p), addr(inAddr.pdup(p)), port(inPort) {
811 3100 : memset(&event, 0, sizeof(event));
812 3100 : event.data.ptr = this;
813 3100 : event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
814 3100 : fd = ifd;
815 :
816 3100 : ConnectionQueue::setNonblocking(fd);
817 3100 : }
818 :
819 600 : ConnectionWorker::Client::Client(int f, int mode) {
820 600 : memset(&event, 0, sizeof(event));
821 600 : fd = f;
822 600 : event.data.ptr = this;
823 600 : event.events = mode;
824 600 : system = true;
825 600 : }
826 :
827 3100 : void ConnectionWorker::Client::shutdownRead() {
828 3100 : if (!shutdownReadSend) {
829 3100 : shutdownReadSend = true;
830 3100 : ::shutdown(event.data.fd, SHUT_RD);
831 : }
832 3100 : }
833 :
834 3100 : void ConnectionWorker::Client::shutdownWrite() {
835 3100 : if (!shutdownWriteSend) {
836 3100 : shutdownWriteSend = true;
837 3100 : ::shutdown(event.data.fd, SHUT_WR);
838 : }
839 3100 : }
840 :
841 0 : void ConnectionWorker::Client::shutdownAll() {
842 0 : if (!shutdownReadSend && !shutdownWriteSend) {
843 0 : shutdownWriteSend = true;
844 0 : shutdownReadSend = true;
845 0 : ::shutdown(event.data.fd, SHUT_RDWR);
846 : } else {
847 0 : shutdownRead();
848 0 : shutdownWrite();
849 : }
850 0 : }
851 :
852 3100 : void ConnectionWorker::Client::release() {
853 3100 : if (request) {
854 3100 : auto p = request->getPool();
855 3100 : request->finalize();
856 3100 : pool::destroy(p);
857 3100 : request = nullptr;
858 : }
859 :
860 3100 : close(event.data.fd);
861 3100 : if (gen) {
862 3100 : gen->releaseClient(this);
863 : }
864 :
865 3100 : if (pool) {
866 3100 : pool::destroy(pool);
867 : }
868 3100 : }
869 :
870 3100 : bool ConnectionWorker::Client::performRead() {
871 3100 : if (shutdownReadSend) {
872 0 : return false;
873 : }
874 :
875 3100 : input.readFromFd(pool, fd);
876 :
877 3100 : if (input) {
878 3100 : auto ret = runInputFilter(input);
879 3100 : switch (ret) {
880 0 : case OK:
881 : case SUSPENDED:
882 0 : break;
883 0 : case DECLINED:
884 0 : shutdownRead();
885 0 : break;
886 2625 : case DONE:
887 2625 : perform([&] {
888 2625 : request->submitResponse(ret);
889 2625 : }, request->getPool(), config::TAG_REQUEST, request);
890 2625 : shutdownRead();
891 2625 : break;
892 475 : default:
893 475 : if (request) {
894 475 : perform([&] {
895 475 : request->submitResponse(ret);
896 475 : }, request->getPool(), config::TAG_REQUEST, request);
897 : } else {
898 0 : cancelWithResult(ret);
899 : }
900 475 : shutdownRead();
901 475 : return false;
902 : }
903 : }
904 :
905 2625 : return true;
906 : }
907 :
908 3218 : bool ConnectionWorker::Client::performWrite() {
909 3218 : if (!output || !valid) {
910 3218 : return true;
911 : }
912 :
913 0 : auto ret = output.writeToFd(fd, bytesSent);
914 0 : switch (ret) {
915 0 : case SUSPENDED:
916 0 : return true;
917 0 : case DONE: // eos found
918 : case DECLINED: // unrecoverable error
919 0 : shutdownWrite();
920 0 : return false;
921 0 : default:
922 0 : break;
923 : }
924 :
925 0 : return true;
926 : }
927 :
928 3075 : bool ConnectionWorker::Client::write(BufferChain &target, BufferChain &source) {
929 3075 : if (output.isEos() || shutdownWriteSend) {
930 0 : return false;
931 : }
932 :
933 3075 : if (!target) {
934 3075 : auto ret = source.writeToFd(fd, bytesSent);
935 3075 : switch (ret) {
936 3075 : case DONE:
937 3075 : shutdownWrite();
938 3075 : return true;
939 : break;
940 0 : case OK:
941 : case SUSPENDED:
942 0 : return true;
943 : break;
944 0 : case DECLINED:
945 0 : shutdownWrite();
946 0 : return false;
947 : break;
948 0 : default:
949 0 : break;
950 : }
951 :
952 0 : if (source) {
953 0 : target.write(source);
954 : }
955 : } else {
956 0 : target.write(source);
957 : }
958 0 : return true;
959 : }
960 :
961 1303950 : bool ConnectionWorker::Client::write(BufferChain &chain, const uint8_t *buf, size_t size, Buffer::Flags flags) {
962 1303950 : if (output.isEos() || shutdownWriteSend) {
963 0 : return false;
964 : }
965 :
966 1303950 : if (!chain && &chain == &output) {
967 82200 : auto ret = ::write(fd, buf, size);
968 164325 : while (ret > 0) {
969 82125 : buf += ret; size -= ret;
970 82125 : if (size > 0) {
971 0 : ret = ::write(fd, buf, size);
972 : } else {
973 82125 : ret = 0;
974 : }
975 : }
976 :
977 82200 : if (ret < 0) {
978 0 : if (errno != EAGAIN && errno != EWOULDBLOCK) {
979 0 : log::error("ConnectionWorker", "fail to write to client");
980 0 : size = 0;
981 0 : flags = Buffer::Flags::Eos;
982 : }
983 : } else {
984 82200 : bytesSent += ret;
985 : }
986 : }
987 :
988 1303950 : if (size > 0) {
989 1220825 : return chain.write(pool, buf, size, flags);
990 83125 : } else if (size == 0 && flags != Buffer::None) {
991 650 : if (&chain == &output && (flags & Buffer::Eos) != Buffer::None) {
992 25 : shutdownWrite();
993 25 : return true;
994 : } else {
995 625 : return chain.write(pool, nullptr, 0, flags);
996 : }
997 : }
998 82475 : return true;
999 : }
1000 :
1001 82175 : bool ConnectionWorker::Client::write(BufferChain &chain, StringView data, Buffer::Flags flags) {
1002 82175 : return write(chain, (const uint8_t *)data.data(), data.size(), flags);
1003 : }
1004 :
1005 0 : bool ConnectionWorker::Client::write(BufferChain &chain, BytesView data, Buffer::Flags flags) {
1006 0 : return write(chain, data.data(), data.size(), flags);
1007 : }
1008 :
1009 50 : bool ConnectionWorker::Client::writeFile(BufferChain &chain, StringView filename, size_t offset, size_t size, Buffer::Flags flags) {
1010 50 : if (output.isEos() || shutdownWriteSend) {
1011 0 : return false;
1012 : }
1013 :
1014 50 : auto buf = Buffer::create(pool, filename, offset, size, bytesSent);
1015 50 : buf->flags |= flags;
1016 50 : if (!buf) {
1017 0 : return false;
1018 : }
1019 :
1020 50 : if (!chain.write(buf)) {
1021 0 : buf->release();
1022 0 : return false;
1023 : }
1024 :
1025 50 : if (!chain && &chain == &output) {
1026 0 : auto ret = chain.writeToFd(fd, bytesSent);
1027 0 : switch (ret) {
1028 0 : case DONE:
1029 : case SUSPENDED:
1030 0 : return true;
1031 : break;
1032 0 : case DECLINED:
1033 : // eos found
1034 0 : shutdownWrite();
1035 0 : return false;
1036 : break;
1037 0 : default:
1038 0 : break;
1039 : }
1040 : }
1041 :
1042 50 : return true;
1043 : }
1044 :
1045 3100 : Status ConnectionWorker::Client::runInputFilter(BufferChain &chain) {
1046 3100 : if (shutdownReadSend) {
1047 0 : return DECLINED;
1048 : }
1049 23900 : while (!chain.empty()) {
1050 23900 : switch (requestState) {
1051 3100 : case RequestLine: {
1052 3100 : auto ret = checkForReqeust(chain);
1053 : switch (ret) {
1054 0 : case DECLINED:
1055 0 : return HTTP_REQUEST_ENTITY_TOO_LARGE;
1056 : break;
1057 3100 : case DONE:
1058 3100 : request = gen->worker->readRequest(this, chain);
1059 3100 : if (!request) {
1060 0 : shutdownRead();
1061 0 : return HTTP_BAD_REQUEST;
1062 : } else {
1063 3100 : requestState = RequestHeaders;
1064 : }
1065 3100 : break;
1066 0 : default:
1067 0 : break;
1068 : }
1069 3100 : break;
1070 : }
1071 20150 : case RequestHeaders: {
1072 20150 : auto ret = checkForHeader(chain);
1073 : switch (ret) {
1074 0 : case DECLINED:
1075 0 : return HTTP_REQUEST_ENTITY_TOO_LARGE;
1076 : break;
1077 20150 : case DONE:
1078 20150 : ret = gen->worker->parseRequestHeader(request, this, chain);
1079 : switch (ret) {
1080 0 : case DECLINED:
1081 0 : return HTTP_INTERNAL_SERVER_ERROR;
1082 : break;
1083 17050 : case OK:
1084 17050 : break;
1085 3100 : case DONE:
1086 3100 : requestState = RequestProcess;
1087 3100 : break;
1088 0 : default:
1089 0 : return ret;
1090 : break;
1091 : }
1092 20150 : break;
1093 0 : default:
1094 0 : break;
1095 : }
1096 20150 : break;
1097 : }
1098 0 : case RequestProcess: {
1099 0 : return HTTP_INTERNAL_SERVER_ERROR;
1100 : break;
1101 : }
1102 650 : case RequestInput: {
1103 650 : auto ret = request->processInput(chain);
1104 : switch (ret) {
1105 0 : case DECLINED:
1106 0 : requestState = ReqeustInvalid;
1107 0 : chain.clear();
1108 0 : return HTTP_BAD_REQUEST;
1109 : break;
1110 0 : case OK:
1111 : case SUSPENDED:
1112 0 : break;
1113 650 : case DONE:
1114 650 : if (!response.empty()) {
1115 625 : write(response, nullptr, 0, Buffer::Flags::Eos);
1116 : }
1117 650 : requestState = ReqeustInvalid;
1118 650 : return DONE;
1119 : break;
1120 0 : default:
1121 0 : requestState = ReqeustInvalid;
1122 0 : chain.clear();
1123 0 : return ret;
1124 : break;
1125 : }
1126 0 : break;
1127 : }
1128 0 : case ReqeustClosed:
1129 0 : requestState = ReqeustInvalid;
1130 0 : return DONE;
1131 : break;
1132 0 : case ReqeustInvalid:
1133 0 : return DECLINED;
1134 : break;
1135 : }
1136 23250 : if (requestState == RequestProcess) {
1137 3100 : auto ret = gen->worker->processRequest(request);
1138 3100 : switch (ret) {
1139 650 : case OK:
1140 : case SUSPENDED:
1141 650 : if (request->getInputFilter() && request->getInfo().contentLength > 0) {
1142 650 : perform([&] {
1143 650 : ret = request->getInputFilter()->init();
1144 650 : }, request->getInputFilter()->getPool());
1145 650 : if (ret == OK) {
1146 650 : requestState = RequestInput;
1147 : }
1148 : }
1149 650 : break;
1150 2450 : default:
1151 2450 : return ret;
1152 : break;
1153 : }
1154 : }
1155 : }
1156 0 : return SUSPENDED;
1157 : }
1158 :
1159 3100 : Status ConnectionWorker::Client::checkForReqeust(BufferChain &chain) {
1160 3100 : bool found = false;
1161 6200 : return chain.read([&] (const Buffer *buf, const uint8_t *b, size_t s) {
1162 6200 : if (found) {
1163 3100 : return int(DONE);
1164 : }
1165 :
1166 3100 : if (buf->absolute > config::UNIX_MAX_REQUEST_LINE) {
1167 0 : return int(DECLINED);
1168 : }
1169 :
1170 3100 : StringView r((const char *)b, std::min(s, config::UNIX_MAX_REQUEST_LINE - buf->absolute));
1171 3100 : r.skipUntil<StringView::Chars<'\n'>>();
1172 3100 : if (r.is('\n')) {
1173 3100 : found = true;
1174 3100 : ++ r;
1175 3100 : return int((const uint8_t *)r.data() - b);
1176 : }
1177 :
1178 0 : if (buf->absolute + s >= config::UNIX_MAX_REQUEST_LINE) {
1179 0 : return int(DECLINED);
1180 : }
1181 :
1182 0 : return int((const uint8_t *)r.data() - b);
1183 6200 : }, false);
1184 : }
1185 :
1186 20150 : Status ConnectionWorker::Client::checkForHeader(BufferChain &chain) {
1187 20150 : bool found = false;
1188 20150 : auto ret = chain.read([&, this] (const Buffer *buf, const uint8_t *b, size_t s) {
1189 37850 : if (found) {
1190 17700 : return int(DONE);
1191 : }
1192 :
1193 20150 : if (buf->absolute > config::UNIX_MAX_HEADER_LINE + bytesRead) {
1194 0 : return int(DECLINED);
1195 : }
1196 :
1197 20150 : StringView r((const char *)b, std::min(s, config::UNIX_MAX_REQUEST_LINE - buf->absolute));
1198 20150 : r.skipUntil<StringView::Chars<'\n'>>();
1199 20150 : if (r.is('\n')) {
1200 20150 : found = true;
1201 20150 : ++ r;
1202 20150 : return int((const uint8_t *)r.data() - b);
1203 : }
1204 :
1205 0 : if ((buf->absolute + s) >= config::UNIX_MAX_REQUEST_LINE + bytesRead) {
1206 0 : return int(DECLINED);
1207 : }
1208 :
1209 0 : return int((const uint8_t *)r.data() - b);
1210 : }, false);
1211 20150 : if (ret == OK) {
1212 2450 : return found ? DONE : OK;
1213 : }
1214 17700 : return ret;
1215 : }
1216 :
1217 0 : void ConnectionWorker::Client::cancelWithResult(Status status) {
1218 0 : if (output.isEos() || shutdownWriteSend) {
1219 0 : return;
1220 : }
1221 :
1222 0 : perform([&, this] {
1223 0 : Time date = Time::now();
1224 : Value result {
1225 0 : pair("date", Value(date.toMicros())),
1226 0 : pair("status", Value(toInt(status)))
1227 0 : };
1228 :
1229 0 : auto data = data::write<Interface>(result, data::EncodeFormat::Json);
1230 :
1231 0 : StringView crlf("\r\n");
1232 0 : StringView statusLine = getStatusLine(status);
1233 0 : if (statusLine.empty()) {
1234 0 : statusLine = getStatusLine(HTTP_INTERNAL_SERVER_ERROR);
1235 : }
1236 :
1237 0 : sp_time_exp_t xt(date);
1238 0 : char dateBuf[30] = { 0 };
1239 0 : xt.encodeRfc822(dateBuf);
1240 :
1241 0 : auto outFn = [&, this] (StringView str) {
1242 0 : write(output, str);
1243 0 : };
1244 :
1245 0 : auto out = Callback<void(StringView)>(outFn);
1246 :
1247 0 : out << StringView("HTTP/1.1 ") << statusLine << crlf;
1248 0 : out << StringView("Date: ") << dateBuf << crlf;
1249 0 : out << StringView("Connection: close\r\n");
1250 0 : out << StringView("Server: ") << gen->worker->getRoot()->getServerNameLine() << crlf;
1251 0 : out << StringView("Content-Length: ") << data.size() << crlf;
1252 0 : out << StringView("Content-Type: application/json; charset=utf-8\r\n");
1253 0 : out << crlf;
1254 :
1255 0 : write(output, data, Buffer::Flags::Eos);
1256 0 : }, pool);
1257 0 : shutdownWrite();
1258 : }
1259 :
1260 50 : ConnectionWorker::Generation::Generation(ConnectionWorker *w, pool_t *p)
1261 50 : : pool(p), worker(w) { }
1262 :
1263 3100 : ConnectionWorker::Client *ConnectionWorker::Generation::pushFd(int fd, StringView addr, uint16_t port) {
1264 3100 : auto clientPool = pool::create(pool);
1265 3100 : ConnectionWorker::Client *ret = nullptr;
1266 :
1267 3100 : auto memBlock = pool::palloc(clientPool, sizeof(Client));
1268 3100 : ret = new (memBlock) Client(this, clientPool, fd, addr, port);
1269 :
1270 3100 : ret->next = active;
1271 3100 : ret->prev = nullptr;
1272 3100 : if (active) { active->prev = ret; }
1273 3100 : active = ret;
1274 :
1275 3100 : ++ activeClients;
1276 :
1277 3100 : return ret;
1278 : }
1279 :
1280 3100 : void ConnectionWorker::Generation::releaseClient(Client *client) {
1281 3100 : if (client == active) {
1282 3100 : active = client->next;
1283 3100 : if (active) { active->prev = nullptr; }
1284 : } else {
1285 0 : if (client->prev) { client->prev->next = client->next; }
1286 0 : if (client->next) { client->next->prev = client->prev; }
1287 : }
1288 :
1289 3100 : -- activeClients;
1290 3100 : }
1291 :
1292 50 : void ConnectionWorker::Generation::releaseAll() {
1293 49 : while (active) {
1294 0 : releaseClient(active);
1295 : }
1296 49 : }
1297 :
1298 50 : ConnectionWorker::Generation *ConnectionWorker::makeGeneration() {
1299 50 : auto p = pool::create(_threadPool);
1300 50 : return new (p) Generation(this, p);
1301 : }
1302 :
1303 : }
|