LCOV - code coverage report
Current view: top level - extra/webserver/unix - SPWebUnixConnectionQueue.cc (source / functions) Hit Total Coverage
Test: coverage.info Lines: 109 170 64.1 %
Date: 2024-05-12 00:16:13 Functions: 15 17 88.2 %

          Line data    Source code
       1             : /**
       2             :  Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
       3             : 
       4             :  Permission is hereby granted, free of charge, to any person obtaining a copy
       5             :  of this software and associated documentation files (the "Software"), to deal
       6             :  in the Software without restriction, including without limitation the rights
       7             :  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       8             :  copies of the Software, and to permit persons to whom the Software is
       9             :  furnished to do so, subject to the following conditions:
      10             : 
      11             :  The above copyright notice and this permission notice shall be included in
      12             :  all copies or substantial portions of the Software.
      13             : 
      14             :  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      15             :  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      16             :  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
      17             :  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      18             :  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      19             :  OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
      20             :  THE SOFTWARE.
      21             :  **/
      22             : 
      23             : #include "SPWebUnixConnectionQueue.h"
      24             : #include "SPWebUnixConnectionWorker.h"
      25             : 
      26             : #include <sys/types.h>
      27             : #include <sys/eventfd.h>
      28             : #include <sys/un.h>
      29             : #include <sys/socket.h>
      30             : #include <arpa/inet.h>
      31             : #include <netinet/in.h>
      32             : #include <unistd.h>
      33             : #include <fcntl.h>
      34             : 
      35             : namespace STAPPLER_VERSIONIZED stappler::web {
      36             : 
      37        3375 : bool ConnectionQueue::setNonblocking(int fd) {
      38        3375 :         int flags = fcntl(fd, F_GETFL, 0);
      39        3375 :         if (flags == -1) {
      40           0 :                 std::cout << toString("fcntl() fail to get flags for ", fd);
      41           0 :                 return false;
      42             :         }
      43        3375 :         if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
      44           0 :                 std::cout << toString("fcntl() setNonblocking failed for ", fd);
      45           0 :                 return false;
      46             :         }
      47             : 
      48        3375 :         return true;
      49             : }
      50             : 
      51          50 : ConnectionQueue::~ConnectionQueue() {
      52          25 :         if (_pipe[0] > -1) { close(_pipe[0]); _pipe[0] = -1; }
      53          25 :         if (_pipe[1] > -1) { close(_pipe[1]); _pipe[1] = -1; }
      54          25 :         if (_eventFd > -1) { close(_eventFd); _eventFd = -1; }
      55          25 :         if (_socket > -1) { close(_socket); _socket = -1; }
      56             : 
      57          25 :         if (!_socketPath.empty()) {
      58           0 :                 filesystem::native::unlink_fn(_socketPath);
      59             :         }
      60          50 : }
      61             : 
      62          25 : ConnectionQueue::ConnectionQueue(UnixRoot *r, pool_t *p, uint16_t w, UnixRoot::Config &&v)
      63          25 : : _root(r), _originPool(p), _nWorkers(w), _config(move(v)) {
      64          25 :         _eventFd = ::eventfd(0, EFD_NONBLOCK);
      65          25 :         _taskCounter.store(0);
      66             : 
      67          25 :         _inputQueue.setQueueLocking(_inputMutexQueue);
      68          25 :         _inputQueue.setFreeLocking(_inputMutexFree);
      69          25 : }
      70             : 
      71          25 : bool ConnectionQueue::run() {
      72          25 :         pool::context ctx(_originPool);
      73             : 
      74          25 :         auto addr = StringView(_config.listen);
      75          25 :         if (addr.starts_with("/")) {
      76           0 :                 _socket = openUnixSocket(addr);
      77          25 :         } else if (addr.starts_with("unix:")) {
      78           0 :                 _socket = openUnixSocket(addr.sub("unix:"_len));
      79             :         } else {
      80          25 :                 auto a = addr.readUntil<StringView::Chars<':'>>();
      81          25 :                 if (addr.is(':')) {
      82          25 :                         ++ addr;
      83          25 :                         auto port = addr.readInteger(10).get(80);
      84          25 :                         _socket = openNetworkSocket(a, uint16_t(port));
      85             :                 } else {
      86           0 :                         _socket = openNetworkSocket(a, uint16_t(80));
      87             :                 }
      88             :         }
      89             : 
      90          25 :         if (_socket < 0) {
      91           0 :                 return false;
      92             :         }
      93             : 
      94          25 :         _workers.reserve(_nWorkers);
      95          25 :         if (pipe2(_pipe, O_NONBLOCK) == 0) {
      96          25 :                 setNonblocking(_pipe[0]);
      97          25 :                 setNonblocking(_pipe[1]);
      98             : 
      99          25 :                 retainSignals();
     100             : 
     101         225 :                 for (uint32_t i = 0; i < _nWorkers; i++) {
     102         200 :                         ConnectionWorker *worker = new (_originPool) ConnectionWorker(this, _root, _socket, _pipe[0], _eventFd);
     103         200 :                         _workers.push_back(worker);
     104             :                 }
     105          25 :                 return true;
     106             :         }
     107             : 
     108           0 :         return false;
     109          25 : }
     110             : 
     111          25 : void ConnectionQueue::cancel() {
     112          25 :         _finalized = true;
     113          25 :         write(_pipe[1], "END!", 4);
     114             : 
     115         225 :         for (auto &it : _workers) {
     116         200 :                 if (it->thread().joinable()) {
     117         200 :                         it->thread().join();
     118         200 :                         delete it;
     119             :                 }
     120             :         }
     121             : 
     122          50 :         while (_sigCounter) {
     123          25 :                 releaseSignals();
     124             :         }
     125          25 : }
     126             : 
     127          25 : void ConnectionQueue::retainSignals() {
     128          25 :         if (_sigCounter ++ == 0) {
     129          25 :                 memset(&_sharedSigAction, 0, sizeof(_sharedSigAction));
     130          25 :                 _sharedSigAction.sa_handler = SIG_IGN;
     131          25 :                 sigemptyset(&_sharedSigAction.sa_mask);
     132          25 :                 ::sigaction(SIGUSR1, &_sharedSigAction, &_sharedSigOldUsr1Action);
     133          25 :                 ::sigaction(SIGUSR2, &_sharedSigAction, &_sharedSigOldUsr2Action);
     134          25 :                 ::sigaction(SIGPIPE, &_sharedSigAction, &_sharedSigOldPipeAction);
     135             : 
     136             :                 ::sigset_t mask;
     137          25 :                 ::sigemptyset(&mask);
     138          25 :                 ::sigaddset(&mask, SIGUSR1);
     139          25 :                 ::sigaddset(&mask, SIGUSR2);
     140          25 :                 ::sigaddset(&mask, SIGPIPE);
     141          25 :                 ::sigprocmask(SIG_BLOCK, &mask, &_oldmask);
     142             :         }
     143          25 : }
     144             : 
     145          25 : void ConnectionQueue::releaseSignals() {
     146          25 :         if (_sigCounter -- == 1) {
     147          25 :                 ::sigaction(SIGUSR1, &_sharedSigOldUsr1Action, nullptr);
     148          25 :                 ::sigaction(SIGUSR2, &_sharedSigOldUsr2Action, nullptr);
     149          25 :                 ::sigaction(SIGPIPE, &_sharedSigOldPipeAction, nullptr);
     150          25 :                 ::sigprocmask(SIG_SETMASK, &_oldmask, nullptr);
     151             :         }
     152          25 : }
     153             : 
     154         200 : void ConnectionQueue::retain() {
     155         200 :         _refCount ++;
     156         200 : }
     157             : 
     158         225 : void ConnectionQueue::release() {
     159         225 :         if (--_refCount <= 0) {
     160          25 :                 delete this;
     161             :         }
     162         225 : }
     163             : 
     164         100 : void ConnectionQueue::pushTask(AsyncTask *task) {
     165         100 :         if (auto g = task->getGroup()) {
     166           0 :                 g->onAdded(task);
     167             :         }
     168         100 :         uint64_t value = 1;
     169             : 
     170         100 :         _inputQueue.push(task->getPriority(), false, task);
     171             : 
     172         100 :         ++ _taskCounter;
     173         100 :         write(_eventFd, &value, sizeof(uint64_t));
     174         100 : }
     175             : 
     176         100 : AsyncTask * ConnectionQueue::popTask() {
     177         100 :         AsyncTask *task = nullptr;
     178             : 
     179         100 :         _inputQueue.pop_direct([&] (int32_t, AsyncTask *t) {
     180         100 :                 task = t;
     181         100 :         });
     182             : 
     183         100 :         return task;
     184             : }
     185             : 
     186         100 : void ConnectionQueue::releaseTask(AsyncTask *task) {
     187         100 :         if (!task->getGroup()) {
     188         100 :                 AsyncTask::destroy(task);
     189             :         } else {
     190           0 :                 task->getGroup()->onPerformed(task);
     191             :         }
     192         100 :         -- _taskCounter;
     193         100 : }
     194             : 
     195           0 : bool ConnectionQueue::hasTasks() {
     196           0 :         return _taskCounter.load();
     197             : }
     198             : 
     199           0 : int ConnectionQueue::openUnixSocket(StringView addr) {
     200           0 :         if (filesystem::native::access_fn(addr, filesystem::Access::Exists)) {
     201             :                 // try unlink;
     202           0 :                 if (!filesystem::native::unlink_fn(addr)) {
     203           0 :                         log::error("Root:Socket", "Socket file exists, fail to unlink: ", addr);
     204           0 :                         return -1;
     205             :                 }
     206             :         }
     207             : 
     208           0 :         int socket = ::socket(AF_UNIX, SOCK_STREAM, 0);
     209           0 :         if (socket == -1) {
     210           0 :                 log::error("Root:Socket", "Fail to open socket: ", addr);
     211           0 :                 return -1;
     212             :         }
     213             : 
     214           0 :         int enable = 1;
     215           0 :         if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) {
     216           0 :                 log::error("Root:Socket", "Fail to set socket option: ", addr);
     217           0 :                 close(socket);
     218           0 :                 return -1;
     219             :         }
     220             : 
     221             :         struct sockaddr_un sockaddr;
     222           0 :         memset(&sockaddr, 0, sizeof(struct sockaddr_un));
     223           0 :         sockaddr.sun_family = AF_UNIX;
     224           0 :         strncpy(sockaddr.sun_path, addr.data(), addr.size());
     225             : 
     226           0 :         if (::bind(socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr)) < 0) {
     227           0 :                 log::error("Root:Socket", "Fail to bind socket: ", addr);
     228           0 :                 close(socket);
     229           0 :                 return -1;
     230             :         }
     231             : 
     232           0 :         if (!setNonblocking(socket)) {
     233           0 :                 log::error("Root:Socket", "Fail to set socket nonblock: ", addr);
     234           0 :                 filesystem::native::unlink_fn(addr);
     235           0 :                 close(socket);
     236           0 :                 return -1;
     237             :         }
     238             : 
     239           0 :         if (::listen(socket, SOMAXCONN) < 0) {
     240           0 :                 log::error("Root:Socket", "Fail to listen on socket: ", addr);
     241           0 :                 filesystem::native::unlink_fn(addr);
     242           0 :                 close(socket);
     243           0 :                 return -1;
     244             :         }
     245             : 
     246           0 :         _socketPath = addr;
     247             : 
     248           0 :         return socket;
     249             : }
     250             : 
     251          25 : int ConnectionQueue::openNetworkSocket(StringView addr, uint16_t port) {
     252          25 :         int socket = ::socket(AF_INET, SOCK_STREAM, 0);
     253          25 :         if (socket == -1) {
     254           0 :                 log::error("Root:Socket", "Fail to open socket");
     255           0 :                 return -1;
     256             :         }
     257             : 
     258          25 :         int enable = 1;
     259          25 :         if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) {
     260           0 :                 log::error("Root:Socket", "Fail to set socket option");
     261           0 :                 close(socket);
     262           0 :                 return -1;
     263             :         }
     264             : 
     265             :         struct sockaddr_in sockaddr;
     266          25 :         memset(&addr, 0, sizeof(addr));
     267          25 :         sockaddr.sin_family = AF_INET;
     268          25 :         sockaddr.sin_addr.s_addr = !addr.empty() ? inet_addr(addr.data()) : htonl(INADDR_LOOPBACK);
     269          25 :         sockaddr.sin_port = htons(port);
     270          25 :         if (::bind(socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr)) < 0) {
     271           0 :                 log::error("Root:Socket", "Fail to bind socket");
     272           0 :                 close(socket);
     273           0 :                 return -1;
     274             :         }
     275             : 
     276          25 :         if (!setNonblocking(socket)) {
     277           0 :                 log::error("Root:Socket", "Fail to set socket nonblock");
     278           0 :                 close(socket);
     279           0 :                 return -1;
     280             :         }
     281             : 
     282          25 :         if (::listen(socket, SOMAXCONN) < 0) {
     283           0 :                 log::error("Root:Socket", "Fail to listen on socket");
     284           0 :                 close(socket);
     285           0 :                 return -1;
     286             :         }
     287          25 :         return socket;
     288             : }
     289             : 
     290             : }

Generated by: LCOV version 1.14