Line data Source code
1 : /**
2 : Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #include "SPWebUnixConnectionQueue.h"
24 : #include "SPWebUnixConnectionWorker.h"
25 :
26 : #include <sys/types.h>
27 : #include <sys/eventfd.h>
28 : #include <sys/un.h>
29 : #include <sys/socket.h>
30 : #include <arpa/inet.h>
31 : #include <netinet/in.h>
32 : #include <unistd.h>
33 : #include <fcntl.h>
34 :
35 : namespace STAPPLER_VERSIONIZED stappler::web {
36 :
37 3375 : bool ConnectionQueue::setNonblocking(int fd) {
38 3375 : int flags = fcntl(fd, F_GETFL, 0);
39 3375 : if (flags == -1) {
40 0 : std::cout << toString("fcntl() fail to get flags for ", fd);
41 0 : return false;
42 : }
43 3375 : if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
44 0 : std::cout << toString("fcntl() setNonblocking failed for ", fd);
45 0 : return false;
46 : }
47 :
48 3375 : return true;
49 : }
50 :
51 50 : ConnectionQueue::~ConnectionQueue() {
52 25 : if (_pipe[0] > -1) { close(_pipe[0]); _pipe[0] = -1; }
53 25 : if (_pipe[1] > -1) { close(_pipe[1]); _pipe[1] = -1; }
54 25 : if (_eventFd > -1) { close(_eventFd); _eventFd = -1; }
55 25 : if (_socket > -1) { close(_socket); _socket = -1; }
56 :
57 25 : if (!_socketPath.empty()) {
58 0 : filesystem::native::unlink_fn(_socketPath);
59 : }
60 50 : }
61 :
62 25 : ConnectionQueue::ConnectionQueue(UnixRoot *r, pool_t *p, uint16_t w, UnixRoot::Config &&v)
63 25 : : _root(r), _originPool(p), _nWorkers(w), _config(move(v)) {
64 25 : _eventFd = ::eventfd(0, EFD_NONBLOCK);
65 25 : _taskCounter.store(0);
66 :
67 25 : _inputQueue.setQueueLocking(_inputMutexQueue);
68 25 : _inputQueue.setFreeLocking(_inputMutexFree);
69 25 : }
70 :
71 25 : bool ConnectionQueue::run() {
72 25 : pool::context ctx(_originPool);
73 :
74 25 : auto addr = StringView(_config.listen);
75 25 : if (addr.starts_with("/")) {
76 0 : _socket = openUnixSocket(addr);
77 25 : } else if (addr.starts_with("unix:")) {
78 0 : _socket = openUnixSocket(addr.sub("unix:"_len));
79 : } else {
80 25 : auto a = addr.readUntil<StringView::Chars<':'>>();
81 25 : if (addr.is(':')) {
82 25 : ++ addr;
83 25 : auto port = addr.readInteger(10).get(80);
84 25 : _socket = openNetworkSocket(a, uint16_t(port));
85 : } else {
86 0 : _socket = openNetworkSocket(a, uint16_t(80));
87 : }
88 : }
89 :
90 25 : if (_socket < 0) {
91 0 : return false;
92 : }
93 :
94 25 : _workers.reserve(_nWorkers);
95 25 : if (pipe2(_pipe, O_NONBLOCK) == 0) {
96 25 : setNonblocking(_pipe[0]);
97 25 : setNonblocking(_pipe[1]);
98 :
99 25 : retainSignals();
100 :
101 225 : for (uint32_t i = 0; i < _nWorkers; i++) {
102 200 : ConnectionWorker *worker = new (_originPool) ConnectionWorker(this, _root, _socket, _pipe[0], _eventFd);
103 200 : _workers.push_back(worker);
104 : }
105 25 : return true;
106 : }
107 :
108 0 : return false;
109 25 : }
110 :
111 25 : void ConnectionQueue::cancel() {
112 25 : _finalized = true;
113 25 : write(_pipe[1], "END!", 4);
114 :
115 225 : for (auto &it : _workers) {
116 200 : if (it->thread().joinable()) {
117 200 : it->thread().join();
118 200 : delete it;
119 : }
120 : }
121 :
122 50 : while (_sigCounter) {
123 25 : releaseSignals();
124 : }
125 25 : }
126 :
127 25 : void ConnectionQueue::retainSignals() {
128 25 : if (_sigCounter ++ == 0) {
129 25 : memset(&_sharedSigAction, 0, sizeof(_sharedSigAction));
130 25 : _sharedSigAction.sa_handler = SIG_IGN;
131 25 : sigemptyset(&_sharedSigAction.sa_mask);
132 25 : ::sigaction(SIGUSR1, &_sharedSigAction, &_sharedSigOldUsr1Action);
133 25 : ::sigaction(SIGUSR2, &_sharedSigAction, &_sharedSigOldUsr2Action);
134 25 : ::sigaction(SIGPIPE, &_sharedSigAction, &_sharedSigOldPipeAction);
135 :
136 : ::sigset_t mask;
137 25 : ::sigemptyset(&mask);
138 25 : ::sigaddset(&mask, SIGUSR1);
139 25 : ::sigaddset(&mask, SIGUSR2);
140 25 : ::sigaddset(&mask, SIGPIPE);
141 25 : ::sigprocmask(SIG_BLOCK, &mask, &_oldmask);
142 : }
143 25 : }
144 :
145 25 : void ConnectionQueue::releaseSignals() {
146 25 : if (_sigCounter -- == 1) {
147 25 : ::sigaction(SIGUSR1, &_sharedSigOldUsr1Action, nullptr);
148 25 : ::sigaction(SIGUSR2, &_sharedSigOldUsr2Action, nullptr);
149 25 : ::sigaction(SIGPIPE, &_sharedSigOldPipeAction, nullptr);
150 25 : ::sigprocmask(SIG_SETMASK, &_oldmask, nullptr);
151 : }
152 25 : }
153 :
154 200 : void ConnectionQueue::retain() {
155 200 : _refCount ++;
156 200 : }
157 :
158 225 : void ConnectionQueue::release() {
159 225 : if (--_refCount <= 0) {
160 25 : delete this;
161 : }
162 225 : }
163 :
164 100 : void ConnectionQueue::pushTask(AsyncTask *task) {
165 100 : if (auto g = task->getGroup()) {
166 0 : g->onAdded(task);
167 : }
168 100 : uint64_t value = 1;
169 :
170 100 : _inputQueue.push(task->getPriority(), false, task);
171 :
172 100 : ++ _taskCounter;
173 100 : write(_eventFd, &value, sizeof(uint64_t));
174 100 : }
175 :
176 100 : AsyncTask * ConnectionQueue::popTask() {
177 100 : AsyncTask *task = nullptr;
178 :
179 100 : _inputQueue.pop_direct([&] (int32_t, AsyncTask *t) {
180 100 : task = t;
181 100 : });
182 :
183 100 : return task;
184 : }
185 :
186 100 : void ConnectionQueue::releaseTask(AsyncTask *task) {
187 100 : if (!task->getGroup()) {
188 100 : AsyncTask::destroy(task);
189 : } else {
190 0 : task->getGroup()->onPerformed(task);
191 : }
192 100 : -- _taskCounter;
193 100 : }
194 :
195 0 : bool ConnectionQueue::hasTasks() {
196 0 : return _taskCounter.load();
197 : }
198 :
199 0 : int ConnectionQueue::openUnixSocket(StringView addr) {
200 0 : if (filesystem::native::access_fn(addr, filesystem::Access::Exists)) {
201 : // try unlink;
202 0 : if (!filesystem::native::unlink_fn(addr)) {
203 0 : log::error("Root:Socket", "Socket file exists, fail to unlink: ", addr);
204 0 : return -1;
205 : }
206 : }
207 :
208 0 : int socket = ::socket(AF_UNIX, SOCK_STREAM, 0);
209 0 : if (socket == -1) {
210 0 : log::error("Root:Socket", "Fail to open socket: ", addr);
211 0 : return -1;
212 : }
213 :
214 0 : int enable = 1;
215 0 : if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) {
216 0 : log::error("Root:Socket", "Fail to set socket option: ", addr);
217 0 : close(socket);
218 0 : return -1;
219 : }
220 :
221 : struct sockaddr_un sockaddr;
222 0 : memset(&sockaddr, 0, sizeof(struct sockaddr_un));
223 0 : sockaddr.sun_family = AF_UNIX;
224 0 : strncpy(sockaddr.sun_path, addr.data(), addr.size());
225 :
226 0 : if (::bind(socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr)) < 0) {
227 0 : log::error("Root:Socket", "Fail to bind socket: ", addr);
228 0 : close(socket);
229 0 : return -1;
230 : }
231 :
232 0 : if (!setNonblocking(socket)) {
233 0 : log::error("Root:Socket", "Fail to set socket nonblock: ", addr);
234 0 : filesystem::native::unlink_fn(addr);
235 0 : close(socket);
236 0 : return -1;
237 : }
238 :
239 0 : if (::listen(socket, SOMAXCONN) < 0) {
240 0 : log::error("Root:Socket", "Fail to listen on socket: ", addr);
241 0 : filesystem::native::unlink_fn(addr);
242 0 : close(socket);
243 0 : return -1;
244 : }
245 :
246 0 : _socketPath = addr;
247 :
248 0 : return socket;
249 : }
250 :
251 25 : int ConnectionQueue::openNetworkSocket(StringView addr, uint16_t port) {
252 25 : int socket = ::socket(AF_INET, SOCK_STREAM, 0);
253 25 : if (socket == -1) {
254 0 : log::error("Root:Socket", "Fail to open socket");
255 0 : return -1;
256 : }
257 :
258 25 : int enable = 1;
259 25 : if (setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(enable)) == -1) {
260 0 : log::error("Root:Socket", "Fail to set socket option");
261 0 : close(socket);
262 0 : return -1;
263 : }
264 :
265 : struct sockaddr_in sockaddr;
266 25 : memset(&addr, 0, sizeof(addr));
267 25 : sockaddr.sin_family = AF_INET;
268 25 : sockaddr.sin_addr.s_addr = !addr.empty() ? inet_addr(addr.data()) : htonl(INADDR_LOOPBACK);
269 25 : sockaddr.sin_port = htons(port);
270 25 : if (::bind(socket, (struct sockaddr *) &sockaddr, sizeof(sockaddr)) < 0) {
271 0 : log::error("Root:Socket", "Fail to bind socket");
272 0 : close(socket);
273 0 : return -1;
274 : }
275 :
276 25 : if (!setNonblocking(socket)) {
277 0 : log::error("Root:Socket", "Fail to set socket nonblock");
278 0 : close(socket);
279 0 : return -1;
280 : }
281 :
282 25 : if (::listen(socket, SOMAXCONN) < 0) {
283 0 : log::error("Root:Socket", "Fail to listen on socket");
284 0 : close(socket);
285 0 : return -1;
286 : }
287 25 : return socket;
288 : }
289 :
290 : }
|