Line data Source code
1 : /**
2 : Copyright (c) 2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #include "SPWebUnixWebsocket.h"
24 :
25 : namespace STAPPLER_VERSIONIZED stappler::web {
26 :
27 : class UnixRequestController;
28 :
29 25 : UnixWebsocketSim::~UnixWebsocketSim() {
30 25 : memory::pool::destroy(_pool);
31 25 : memory::allocator::destroy(_alloc);
32 25 : }
33 :
34 25 : void UnixWebsocketSim::attachRequest(allocator_t *a, pool_t *p, UnixRequestController *c) {
35 25 : _alloc = a;
36 25 : _pool = p;
37 25 : _request = c;
38 25 : }
39 :
40 25 : void UnixWebsocketSim::attachSocket(UnixWebsocketConnectionSim *sock) {
41 25 : _socket = sock;
42 25 : }
43 :
44 0 : bool UnixWebsocketSim::read(WebsocketFrameType t, const uint8_t *bytes, size_t count) {
45 0 : return false;
46 : }
47 :
48 1375 : bool UnixWebsocketSim::write(BytesView frame) {
49 1375 : _socket->read(frame);
50 1375 : return true;
51 : }
52 :
53 100 : void UnixWebsocketSim::send(const mem_std::Value &data) {
54 100 : _socket->receive(data);
55 100 : }
56 :
57 0 : void UnixWebsocketSim::onStarted() { }
58 :
59 0 : void UnixWebsocketSim::onEnded() { }
60 :
61 25 : UnixWebsocketConnectionSim::UnixWebsocketConnectionSim(allocator_t *a, pool_t *p, HostController *c, UnixWebsocketSim *sim)
62 25 : : WebsocketConnection(a, p, c), _sim(sim), _reader(c->getRoot(), p), _writer(p) {
63 25 : _commonReader = &_reader;
64 25 : _commonWriter = &_writer;
65 25 : }
66 :
67 2850 : bool UnixWebsocketConnectionSim::write(WebsocketFrameType t, const uint8_t *bytes, size_t count) {
68 2850 : _sim->read(t, bytes, count);
69 2850 : return true;
70 : }
71 :
72 25 : bool UnixWebsocketConnectionSim::run(WebsocketHandler *h, const Callback<void()> &beginCb, const Callback<void()> &endCb) {
73 25 : _enabled = true;
74 25 : _handler = h;
75 25 : _shouldTerminate.test_and_set();
76 :
77 25 : _sim->onStarted();
78 25 : perform([&] {
79 25 : h->handleBegin();
80 25 : }, _pool, config::TAG_WEBSOCKET, this);
81 :
82 25 : beginCb();
83 548 : while (_shouldTerminate.test_and_set()) {
84 249 : std::unique_lock lock(_waitMutex);
85 249 : if (_inputFrames.empty() && _inputValues.empty()) {
86 200 : _waitCond.wait(lock);
87 : }
88 :
89 1624 : for (auto &it : _inputFrames) {
90 1375 : if (!processFrame(it)) {
91 25 : _shouldTerminate.clear();
92 : }
93 : }
94 :
95 349 : for (auto &it : _inputValues) {
96 100 : if (!_handler->handleMessage(it)) {
97 0 : _shouldTerminate.clear();
98 : }
99 : }
100 :
101 249 : _inputFrames.clear();
102 :
103 249 : _inputValues.clear();
104 :
105 249 : lock.unlock();
106 :
107 249 : auto p = memory::pool::create(_pool);
108 249 : if (_handler) {
109 249 : perform([&, this] {
110 249 : _handler->sendPendingNotifications(p);
111 249 : }, p);
112 : }
113 249 : memory::pool::destroy(p);
114 249 : }
115 :
116 25 : _enabled = false;
117 :
118 25 : endCb();
119 :
120 25 : _group.waitForAll();
121 25 : perform([&] {
122 25 : h->handleEnd();
123 25 : }, _pool, config::TAG_WEBSOCKET, this);
124 25 : _sim->onEnded();
125 :
126 25 : return true;
127 : }
128 :
129 0 : void UnixWebsocketConnectionSim::wakeup() {
130 0 : _waitCond.notify_one();
131 0 : }
132 :
133 1375 : void UnixWebsocketConnectionSim::read(BytesView frame) {
134 1375 : std::unique_lock lock(_waitMutex);
135 1375 : _inputFrames.emplace_back(frame.bytes<memory::StandartInterface>());
136 1375 : _waitCond.notify_one();
137 1375 : }
138 :
139 100 : void UnixWebsocketConnectionSim::receive(const mem_std::Value &val) {
140 100 : std::unique_lock lock(_waitMutex);
141 100 : _inputValues.emplace_back(val);
142 100 : _waitCond.notify_one();
143 100 : }
144 :
145 1375 : bool UnixWebsocketConnectionSim::processFrame(BytesView frame) {
146 1375 : auto p = memory::pool::create(_pool);
147 1375 : WebsocketFrameReader reader(_reader.root, p);
148 5450 : while (!frame.empty()) {
149 4100 : size_t len = reader.getRequiredBytes();
150 4100 : uint8_t *buf = reader.prepare(len);
151 :
152 4100 : auto sub = frame.readBytes(len);
153 4100 : memcpy(buf, sub.data(), sub.size());
154 4100 : reader.save(buf, len);
155 :
156 4100 : if (reader.isControlReady()) {
157 25 : if (reader.type == WebsocketFrameType::Close) {
158 25 : _clientCloseCode = WebsocketStatusCode(reader.buffer.get<BytesViewNetwork>().readUnsigned16());
159 25 : reader.popFrame();
160 25 : memory::pool::destroy(p);
161 25 : return false;
162 0 : } else if (reader.type == WebsocketFrameType::Ping) {
163 0 : write(WebsocketFrameType::Pong, nullptr, 0);
164 0 : reader.popFrame();
165 0 : memory::pool::destroy(p);
166 0 : return true;
167 : }
168 4075 : } else if (reader.isFrameReady()) {
169 1350 : auto ret = perform([&, this] {
170 1350 : if (_handler && !_handler->handleFrame(reader.type, reader.frame.buffer)) {
171 0 : return false;
172 : }
173 1350 : return true;
174 : }, reader.pool, config::TAG_WEBSOCKET, this);
175 1350 : reader.popFrame();
176 1350 : if (!ret) {
177 0 : memory::pool::destroy(p);
178 0 : return false;
179 : }
180 : }
181 : }
182 1350 : memory::pool::destroy(p);
183 1350 : return true;
184 1375 : }
185 :
186 : }
|