Line data Source code
1 : /**
2 : Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #ifndef EXTRA_WEBSERVER_UNIX_SPWEBUNIXCONNECTIONWORKER_H_
24 : #define EXTRA_WEBSERVER_UNIX_SPWEBUNIXCONNECTIONWORKER_H_
25 :
26 : #include "SPWebUnixRoot.h"
27 : #include "SPWebRequestController.h"
28 : #include "SPThread.h"
29 :
30 : #include <sys/epoll.h>
31 :
32 : namespace STAPPLER_VERSIONIZED stappler::web {
33 :
34 : class UnixRequestController;
35 : class ConnectionQueue;
36 :
37 : class ConnectionWorker : public thread::ThreadInterface<Interface> {
38 : public:
39 : struct Buffer;
40 : struct Client;
41 : struct Generation;
42 :
43 : struct BufferFile {
44 : filesystem::Stat stat;
45 : int fd = -1;
46 : uint8_t *extraBuffer;
47 : };
48 :
49 : struct Buffer : AllocBase {
50 : enum Flags {
51 : None = 0,
52 : Eos = 1 << 0,
53 : IsOutFile = 1 << 1,
54 : };
55 :
56 : Buffer *next = nullptr;
57 : pool_t *pool = nullptr;
58 :
59 : uint8_t *buf = nullptr;
60 : size_t capacity = 0;
61 : size_t size = 0;
62 : size_t offset = 0;
63 : size_t absolute = 0;
64 : Flags flags = Flags::None;
65 :
66 : static Buffer *create(pool_t *, size_t = 0);
67 : static Buffer *create(pool_t *, StringView path, off_t rangeStart, size_t rangeLen = maxOf<size_t>(), size_t = 0);
68 :
69 : void release();
70 :
71 : StringView str() const;
72 :
73 : size_t availableForWrite() const;
74 : size_t availableForRead() const;
75 :
76 : uint8_t *writeTarget() const;
77 : uint8_t *readSource() const;
78 :
79 : size_t write(const uint8_t *, size_t);
80 :
81 : BufferFile *getFile() const;
82 :
83 4946350 : bool isOutFile() const { return (flags & IsOutFile) != None; }
84 : };
85 :
86 : struct BufferChain {
87 : Buffer *front = nullptr;
88 : Buffer *back = nullptr;
89 : Buffer **tail = nullptr;
90 :
91 : bool eos = false;
92 :
93 1313393 : explicit operator bool() const { return front != nullptr; }
94 :
95 23250 : bool isSingle() const { return front != nullptr && front == back; }
96 :
97 : bool isEos() const;
98 :
99 : bool empty() const;
100 :
101 : size_t size() const;
102 :
103 : Buffer *getWriteTarget(pool_t *p);
104 :
105 : bool write(pool_t *, const uint8_t *, size_t, Buffer::Flags flags = Buffer::None);
106 : bool write(Buffer *);
107 : bool write(BufferChain &);
108 : bool readFromFd(pool_t *, int);
109 :
110 : Status read(const Callback<int(const Buffer *, const uint8_t *, size_t)> &, bool release);
111 : Status writeToFd(int, size_t &);
112 :
113 : size_t getBytesRead() const;
114 :
115 : BytesView extract(pool_t *, size_t initOffset, size_t blockSize) const;
116 :
117 : void releaseEmpty();
118 : void clear();
119 : };
120 :
121 : struct Client : AllocBase {
122 : enum RequestReadState {
123 : RequestLine,
124 : RequestHeaders,
125 : RequestProcess,
126 : RequestInput,
127 : ReqeustClosed,
128 : ReqeustInvalid,
129 : };
130 :
131 : Client *next = nullptr;
132 : Client *prev = nullptr;
133 :
134 : Generation *gen = nullptr;
135 : pool_t *pool = nullptr;
136 :
137 : BufferChain input;
138 : BufferChain output;
139 : BufferChain response;
140 :
141 : StringView addr;
142 : uint16_t port = 0;
143 :
144 : int fd = -1;
145 : struct epoll_event event;
146 : bool system = false;
147 : bool valid = true;
148 : bool shutdownReadSend = false;
149 : bool shutdownWriteSend = false;
150 :
151 : RequestReadState requestState = RequestReadState::RequestLine;
152 : UnixRequestController *request = nullptr;
153 : size_t bytesSent = 0;
154 : size_t bytesRead = 0;
155 :
156 : Client(Generation *, pool_t *, int, StringView addr, uint16_t port);
157 : Client(int fd, int mode);
158 :
159 : void init(int);
160 : void shutdownRead();
161 : void shutdownWrite();
162 : void shutdownAll();
163 : void release();
164 :
165 : bool performRead();
166 : bool performWrite();
167 :
168 : bool write(BufferChain &, BufferChain &);
169 : bool write(BufferChain &, const uint8_t *, size_t, Buffer::Flags = Buffer::None);
170 : bool write(BufferChain &, StringView, Buffer::Flags = Buffer::None);
171 : bool write(BufferChain &, BytesView, Buffer::Flags = Buffer::None);
172 : bool writeFile(BufferChain &, StringView filename, size_t offset = 0, size_t size = maxOf<size_t>(), Buffer::Flags = Buffer::None);
173 :
174 : Status runInputFilter(BufferChain &);
175 :
176 : Status checkForReqeust(BufferChain &);
177 : Status checkForHeader(BufferChain &);
178 :
179 : void cancelWithResult(Status);
180 : };
181 :
182 : struct Generation : AllocBase {
183 : Generation *prev = nullptr;
184 : Generation *next = nullptr;
185 : Client *active = nullptr;
186 : size_t activeClients = 0;
187 :
188 : pool_t *pool = nullptr;
189 : ConnectionWorker *worker = nullptr;
190 : bool endOfLife = false;
191 :
192 : Generation(ConnectionWorker *, pool_t *);
193 :
194 : Client *pushFd(int, StringView addr, uint16_t port);
195 : void releaseClient(Client *);
196 : void releaseAll();
197 : };
198 :
199 : static constexpr size_t MaxEvents = 16;
200 :
201 : ConnectionWorker(ConnectionQueue *queue, UnixRoot *, int socket, int pipe, int event);
202 : ~ConnectionWorker();
203 :
204 : virtual void threadInit() override;
205 : virtual void threadDispose() override;
206 : virtual bool worker() override;
207 :
208 : bool poll(int);
209 :
210 0 : Root *getRoot() const { return _root; }
211 :
212 400 : std::thread & thread() { return _thread; }
213 :
214 : void runTask(AsyncTask *);
215 :
216 : UnixRequestController *readRequest(Client *, BufferChain &chain);
217 : Status parseRequestHeader(UnixRequestController *, Client *, BufferChain &chain);
218 : Status processRequest(UnixRequestController *);
219 :
220 : protected:
221 : Generation *makeGeneration();
222 : void pushFd(int epollFd, int fd, StringView addr, uint16_t port);
223 :
224 : bool addClient(Client &);
225 : void removeClient(Client &);
226 :
227 : ConnectionQueue *_queue;
228 : std::thread::id _threadId;
229 :
230 : UnixRoot *_root = nullptr;
231 : allocator_t *_threadAlloc = nullptr;
232 : pool_t *_threadPool = nullptr;
233 :
234 : Client _inputClient;
235 : Client _cancelClient;
236 : Client _eventClient;
237 :
238 : bool _shouldClose = false;
239 : int _epollFd = -1;
240 : int _signalFd = -1;
241 :
242 : size_t _fdCount = 0;
243 :
244 : Generation *_generation = nullptr;
245 :
246 : std::thread _thread;
247 : };
248 :
249 : SP_DEFINE_ENUM_AS_MASK(ConnectionWorker::Buffer::Flags)
250 :
251 : }
252 :
253 : #endif /* EXTRA_WEBSERVER_UNIX_SPWEBUNIXCONNECTIONWORKER_H_ */
|