Line data Source code
1 : /**
2 : Copyright (c) 2023-2024 Stappler LLC <admin@stappler.dev>
3 :
4 : Permission is hereby granted, free of charge, to any person obtaining a copy
5 : of this software and associated documentation files (the "Software"), to deal
6 : in the Software without restriction, including without limitation the rights
7 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 : copies of the Software, and to permit persons to whom the Software is
9 : furnished to do so, subject to the following conditions:
10 :
11 : The above copyright notice and this permission notice shall be included in
12 : all copies or substantial portions of the Software.
13 :
14 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20 : THE SOFTWARE.
21 : **/
22 :
23 : #include "XLStorageServer.h"
24 : #include "XLStorageComponent.h"
25 : #include "XLApplication.h"
26 : #include "SPValid.h"
27 : #include "SPThread.h"
28 : #include "SPSqlDriver.h"
29 : #include <typeindex>
30 :
31 : namespace STAPPLER_VERSIONIZED stappler::xenolith::storage {
32 :
33 : struct ServerComponentData : public db::AllocBase {
34 : db::pool_t *pool;
35 : ComponentContainer *container;
36 :
37 : db::Map<StringView, Component *> components;
38 : db::Map<std::type_index, Component *> typedComponents;
39 : db::Map<StringView, const db::Scheme *> schemes;
40 : };
41 :
42 : class ServerComponentLoader : public ComponentLoader {
43 : public:
44 : virtual ~ServerComponentLoader();
45 :
46 : ServerComponentLoader(Server::ServerData *, const db::Transaction &);
47 :
48 42 : virtual db::pool_t *getPool() const override { return _pool; }
49 21 : virtual const Server &getServer() const override { return *_server; }
50 21 : virtual const db::Transaction &getTransaction() const override { return *_transaction; }
51 :
52 : virtual void exportComponent(Component *) override;
53 :
54 : virtual const db::Scheme * exportScheme(const db::Scheme &) override;
55 :
56 : bool run(ComponentContainer *);
57 :
58 : protected:
59 : Server::ServerData *_data = nullptr;
60 : db::pool_t *_pool = nullptr;
61 : const Server *_server = nullptr;
62 : const db::Transaction *_transaction = nullptr;
63 : ServerComponentData *_components = nullptr;
64 : };
65 :
66 :
67 : struct Server::ServerData : public thread::ThreadInterface<Interface>, public db::ApplicationInterface {
68 : struct TaskCallback {
69 : Function<bool(const Server &, const db::Transaction &)> callback;
70 : Rc<Ref> ref;
71 :
72 1886 : TaskCallback() = default;
73 : TaskCallback(Function<bool(const Server &, const db::Transaction &)> &&cb)
74 : : callback(move(cb)) { }
75 1281 : TaskCallback(Function<bool(const Server &, const db::Transaction &)> &&cb, Ref *ref)
76 1281 : : callback(move(cb)), ref(ref) { }
77 : };
78 :
79 : memory::pool_t *serverPool = nullptr;
80 : memory::pool_t *threadPool = nullptr;
81 : memory::pool_t *asyncPool = nullptr;
82 : Application *application = nullptr;
83 : db::Map<StringView, StringView> params;
84 : db::Map<StringView, const db::Scheme *> predefinedSchemes;
85 : db::Map<ComponentContainer *, ServerComponentData *> components;
86 :
87 : db::String documentRoot;
88 : StringView serverName;
89 : std::thread thread;
90 : std::condition_variable condition;
91 : std::atomic_flag shouldQuit;
92 : Mutex mutexQueue;
93 : Mutex mutexFree;
94 : memory::PriorityQueue<TaskCallback> queue;
95 : db::sql::Driver *driver = nullptr;
96 : db::sql::Driver::Handle handle;
97 : Server *server = nullptr;
98 : uint64_t now = 0;
99 :
100 : mutable db::Vector<db::Function<void(const db::Transaction &)>> *asyncTasks = nullptr;
101 :
102 : db::BackendInterface::Config interfaceConfig;
103 :
104 : // accessed from main thread only, std memory model
105 : mem_std::Map<StringView, Rc<ComponentContainer>> appComponents;
106 :
107 : const db::Transaction *currentTransaction = nullptr;
108 :
109 : ServerData();
110 : virtual ~ServerData();
111 :
112 : bool init();
113 : bool execute(const TaskCallback &);
114 : void runAsync();
115 :
116 : virtual void threadInit() override;
117 : virtual bool worker() override;
118 : virtual void threadDispose() override;
119 :
120 : void handleHeartbeat();
121 :
122 : void addAsyncTask(const db::Callback<db::Function<void(const db::Transaction &)>(db::pool_t *)> &setupCb) const;
123 :
124 : bool addComponent(ComponentContainer *, const db::Transaction &);
125 : void removeComponent(ComponentContainer *, const db::Transaction &t);
126 :
127 : virtual void scheduleAyncDbTask(const db::Callback<db::Function<void(const db::Transaction &)>(db::pool_t *)> &setupCb) const override;
128 :
129 : virtual db::StringView getDocumentRoot() const override;
130 :
131 : virtual const db::Scheme *getFileScheme() const override;
132 : virtual const db::Scheme *getUserScheme() const override;
133 :
134 : virtual void pushErrorMessage(db::Value &&) const override;
135 : virtual void pushDebugMessage(db::Value &&) const override;
136 :
137 : virtual void initTransaction(db::Transaction &) const override;
138 : };
139 :
140 :
141 : XL_DECLARE_EVENT_CLASS(Server, onBroadcast)
142 :
143 84 : Server::~Server() { }
144 :
145 42 : bool Server::init(Application *app, const Value ¶ms) {
146 42 : auto pool = memory::pool::create();
147 :
148 42 : memory::pool::context ctx(pool);
149 :
150 42 : auto bytes = memory::pool::palloc(pool, sizeof(ServerData));
151 :
152 42 : _data = new (bytes) ServerData;
153 42 : _data->serverPool = pool;
154 42 : _data->application = app;
155 42 : _data->shouldQuit.test_and_set();
156 :
157 42 : StringView driver;
158 :
159 168 : for (auto &it : params.asDict()) {
160 126 : if (it.first == "driver") {
161 42 : driver = StringView(it.second.getString());
162 84 : } else if (it.first == "serverName") {
163 42 : _data->serverName = StringView(it.second.getString()).pdup(pool);
164 : } else {
165 42 : _data->params.emplace(StringView(it.first).pdup(pool), StringView(it.second.getString()).pdup(pool));
166 : }
167 : }
168 :
169 42 : if (driver.empty()) {
170 0 : driver = StringView("sqlite");
171 : }
172 :
173 42 : _data->driver = db::sql::Driver::open(pool, _data, driver);
174 42 : if (!_data->driver) {
175 0 : return false;
176 : }
177 :
178 42 : _data->server = this;
179 42 : return _data->init();
180 42 : }
181 :
182 21 : void Server::initialize(Application *) {
183 :
184 21 : }
185 :
186 21 : void Server::invalidate(Application *) {
187 21 : if (_data) {
188 42 : for (auto &it : _data->appComponents) {
189 21 : it.second->handleComponentsUnloaded(*this);
190 : }
191 :
192 21 : auto serverPool = _data->serverPool;
193 21 : _data->~ServerData();
194 21 : memory::pool::destroy(serverPool);
195 21 : _data = nullptr;
196 : }
197 21 : }
198 :
199 3846 : void Server::update(Application *, const UpdateTime &t) {
200 :
201 3846 : }
202 :
203 42 : Rc<ComponentContainer> Server::getComponentContainer(StringView key) const {
204 42 : auto it = _data->appComponents.find(key);
205 42 : if (it != _data->appComponents.end()) {
206 0 : return it->second;
207 : }
208 42 : return nullptr;
209 : }
210 :
211 42 : bool Server::addComponentContainer(const Rc<ComponentContainer> &comp) {
212 42 : if (getComponentContainer(comp->getName()) != nullptr) {
213 0 : log::error("storage::Server", "Component with name ", comp->getName(), " already loaded");
214 0 : return false;
215 : }
216 :
217 42 : perform([this, comp] (const Server &serv, const db::Transaction &t) -> bool {
218 42 : if (_data->addComponent(comp, t)) {
219 42 : _data->application->performOnMainThread([this, comp] {
220 42 : comp->handleComponentsLoaded(*this);
221 42 : }, this);
222 : }
223 42 : return true;
224 : });
225 42 : _data->appComponents.emplace(comp->getName(), comp);
226 42 : return true;
227 : }
228 :
229 21 : bool Server::removeComponentContainer(const Rc<ComponentContainer> &comp) {
230 21 : if (!_data) {
231 0 : return false;
232 : }
233 :
234 21 : auto it = _data->appComponents.find(comp->getName());
235 21 : if (it == _data->appComponents.end()) {
236 0 : log::error("storage::Server", "Component with name ", comp->getName(), " is not loaded");
237 0 : return false;
238 : }
239 :
240 21 : if (it->second != comp) {
241 0 : log::error("storage::Server", "Component you try to remove is not the same that was loaded");
242 0 : return false;
243 : }
244 :
245 21 : auto refId = _data->application->retain();
246 21 : auto selfRefId = retain();
247 21 : perform([this, comp, refId, selfRefId] (const Server &serv, const db::Transaction &t) -> bool {
248 21 : _data->removeComponent(comp, t);
249 21 : _data->application->release(refId);
250 21 : release(selfRefId);
251 21 : return true;
252 : }, comp);
253 21 : _data->appComponents.erase(it);
254 21 : comp->handleComponentsUnloaded(*this);
255 21 : return true;
256 : }
257 :
258 21 : bool Server::get(CoderSource key, DataCallback &&cb) const {
259 21 : if (!cb) {
260 0 : return false;
261 : }
262 :
263 21 : auto p = new DataCallback(move(cb));
264 42 : return perform([this, p, key = key.view().bytes<Interface>()] (const Server &serv, const db::Transaction &t) {
265 21 : auto d = t.getAdapter().get(key);
266 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(d)] {
267 21 : (*p)(ret);
268 21 : delete p;
269 21 : });
270 21 : return true;
271 42 : });
272 : }
273 :
274 42 : bool Server::set(CoderSource key, Value &&data, DataCallback &&cb) const {
275 42 : if (cb) {
276 21 : auto p = new DataCallback(move(cb));
277 42 : return perform([this, p, key = key.view().bytes<Interface>(), data = move(data)] (const Server &serv, const db::Transaction &t) {
278 21 : auto d = t.getAdapter().get(key);
279 21 : t.getAdapter().set(key, data);
280 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(d)] {
281 21 : (*p)(ret);
282 21 : delete p;
283 21 : });
284 21 : return true;
285 42 : });
286 : } else {
287 42 : return perform([key = key.view().bytes<Interface>(), data = move(data)] (const Server &serv, const db::Transaction &t) {
288 21 : t.getAdapter().set(key, data);
289 21 : return true;
290 21 : });
291 : }
292 : }
293 :
294 42 : bool Server::clear(CoderSource key, DataCallback &&cb) const {
295 42 : if (cb) {
296 21 : auto p = new DataCallback(move(cb));
297 42 : return perform([this, p, key = key.view().bytes<Interface>()] (const Server &serv, const db::Transaction &t) {
298 21 : auto d = t.getAdapter().get(key);
299 21 : t.getAdapter().clear(key);
300 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(d)] {
301 21 : (*p)(ret);
302 21 : delete p;
303 21 : });
304 21 : return true;
305 42 : });
306 : } else {
307 42 : return perform([key = key.view().bytes<Interface>()] (const Server &serv, const db::Transaction &t) {
308 21 : t.getAdapter().clear(key);
309 21 : return true;
310 21 : });
311 : }
312 : }
313 :
314 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, uint64_t oid, db::UpdateFlags flags) const {
315 84 : if (!cb) {
316 0 : return false;
317 : }
318 :
319 84 : auto p = new DataCallback(move(cb));
320 168 : return perform([this, scheme = &scheme, oid, flags, p] (const Server &serv, const db::Transaction &t) {
321 84 : auto ret = scheme->get(t, oid, flags);
322 84 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
323 84 : (*p)(ret);
324 84 : delete p;
325 84 : });
326 84 : return true;
327 168 : });
328 : }
329 42 : bool Server::get(const Scheme &scheme, DataCallback &&cb, StringView alias, db::UpdateFlags flags) const {
330 42 : if (!cb) {
331 0 : return false;
332 : }
333 :
334 42 : auto p = new DataCallback(move(cb));
335 84 : return perform([this, scheme = &scheme, alias = alias.str<Interface>(), flags, p] (const Server &serv, const db::Transaction &t) {
336 42 : auto ret = scheme->get(t, alias, flags);
337 42 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
338 42 : (*p)(ret);
339 42 : delete p;
340 42 : });
341 42 : return true;
342 84 : });
343 : }
344 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, const Value &id, db::UpdateFlags flags) const {
345 84 : if (id.isDictionary()) {
346 21 : if (auto oid = id.getInteger("__oid")) {
347 21 : return get(scheme, move(cb), oid, flags);
348 : }
349 : } else {
350 63 : if ((id.isString() && stappler::valid::validateNumber(id.getString())) || id.isInteger()) {
351 42 : if (auto oid = id.getInteger()) {
352 42 : return get(scheme, move(cb), oid, flags);
353 : }
354 : }
355 :
356 21 : auto &str = id.getString();
357 21 : if (!str.empty()) {
358 21 : return get(scheme, move(cb), str, flags);
359 : }
360 : }
361 0 : return false;
362 : }
363 :
364 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, uint64_t oid, StringView field, db::UpdateFlags flags) const {
365 84 : if (!cb) {
366 0 : return false;
367 : }
368 :
369 84 : auto p = new DataCallback(move(cb));
370 168 : return perform([this, scheme = &scheme, oid, field = field.str<Interface>(), flags, p] (const Server &serv, const db::Transaction &t) {
371 84 : auto ret = scheme->get(t, oid, field, flags);
372 84 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
373 84 : (*p)(ret);
374 84 : delete p;
375 84 : });
376 84 : return true;
377 168 : });
378 : }
379 :
380 42 : bool Server::get(const Scheme &scheme, DataCallback &&cb, StringView alias, StringView field, db::UpdateFlags flags) const {
381 42 : if (!cb) {
382 0 : return false;
383 : }
384 :
385 42 : auto p = new DataCallback(move(cb));
386 84 : return perform([this, scheme = &scheme, alias = alias.str<Interface>(), field = field.str<Interface>(), flags, p] (const Server &serv, const db::Transaction &t) {
387 42 : auto ret = scheme->get(t, alias, field, flags);
388 42 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
389 42 : (*p)(ret);
390 42 : delete p;
391 42 : });
392 42 : return true;
393 84 : });
394 : }
395 :
396 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, const Value &id, StringView field, db::UpdateFlags flags) const {
397 84 : if (id.isDictionary()) {
398 21 : if (auto oid = id.getInteger("__oid")) {
399 21 : return get(scheme, move(cb), oid, field, flags);
400 : }
401 : } else {
402 63 : if ((id.isString() && stappler::valid::validateNumber(id.getString())) || id.isInteger()) {
403 42 : if (auto oid = id.getInteger()) {
404 42 : return get(scheme, move(cb), oid, field, flags);
405 : }
406 : }
407 :
408 21 : auto &str = id.getString();
409 21 : if (!str.empty()) {
410 21 : return get(scheme, move(cb), str, field, flags);
411 : }
412 : }
413 0 : return false;
414 : }
415 :
416 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, uint64_t oid, InitList<StringView> &&fields, db::UpdateFlags flags) const {
417 84 : Vector<const db::Field *> fieldsVec;
418 168 : for (auto &it : fields) {
419 84 : if (auto f = scheme.getField(it)) {
420 84 : mem_std::emplace_ordered(fieldsVec, f);
421 : }
422 : }
423 168 : return get(scheme, move(cb), oid, move(fieldsVec), flags);
424 84 : }
425 :
426 42 : bool Server::get(const Scheme &scheme, DataCallback &&cb, StringView alias, InitList<StringView> &&fields, db::UpdateFlags flags) const {
427 42 : Vector<const db::Field *> fieldsVec;
428 84 : for (auto &it : fields) {
429 42 : if (auto f = scheme.getField(it)) {
430 42 : mem_std::emplace_ordered(fieldsVec, f);
431 : }
432 : }
433 84 : return get(scheme, move(cb), alias, move(fieldsVec), flags);
434 42 : }
435 :
436 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, const Value &id, InitList<StringView> &&fields, db::UpdateFlags flags) const {
437 84 : if (id.isDictionary()) {
438 21 : if (auto oid = id.getInteger("__oid")) {
439 21 : return get(scheme, move(cb), oid, move(fields), flags);
440 : }
441 : } else {
442 63 : if ((id.isString() && stappler::valid::validateNumber(id.getString())) || id.isInteger()) {
443 42 : if (auto oid = id.getInteger()) {
444 42 : return get(scheme, move(cb), oid, move(fields), flags);
445 : }
446 : }
447 :
448 21 : auto &str = id.getString();
449 21 : if (!str.empty()) {
450 21 : return get(scheme, move(cb), str, move(fields), flags);
451 : }
452 : }
453 0 : return false;
454 : }
455 :
456 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, uint64_t oid, InitList<const char *> &&fields, db::UpdateFlags flags) const {
457 84 : Vector<const db::Field *> fieldsVec;
458 168 : for (auto &it : fields) {
459 84 : if (auto f = scheme.getField(it)) {
460 84 : mem_std::emplace_ordered(fieldsVec, f);
461 : }
462 : }
463 168 : return get(scheme, move(cb), oid, move(fieldsVec), flags);
464 84 : }
465 :
466 42 : bool Server::get(const Scheme &scheme, DataCallback &&cb, StringView alias, InitList<const char *> &&fields, db::UpdateFlags flags) const {
467 42 : Vector<const db::Field *> fieldsVec;
468 84 : for (auto &it : fields) {
469 42 : if (auto f = scheme.getField(it)) {
470 42 : mem_std::emplace_ordered(fieldsVec, f);
471 : }
472 : }
473 84 : return get(scheme, move(cb), alias, move(fieldsVec), flags);
474 42 : }
475 :
476 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, const Value &id, InitList<const char *> &&fields, db::UpdateFlags flags) const {
477 84 : if (id.isDictionary()) {
478 21 : if (auto oid = id.getInteger("__oid")) {
479 21 : return get(scheme, move(cb), oid, move(fields), flags);
480 : }
481 : } else {
482 63 : if ((id.isString() && stappler::valid::validateNumber(id.getString())) || id.isInteger()) {
483 42 : if (auto oid = id.getInteger()) {
484 42 : return get(scheme, move(cb), oid, move(fields), flags);
485 : }
486 : }
487 :
488 21 : auto &str = id.getString();
489 21 : if (!str.empty()) {
490 21 : return get(scheme, move(cb), str, move(fields), flags);
491 : }
492 : }
493 0 : return false;
494 : }
495 :
496 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, uint64_t oid, InitList<const db::Field *> &&fields, db::UpdateFlags flags) const {
497 84 : Vector<const db::Field *> fieldsVec;
498 168 : for (auto &it : fields) {
499 84 : mem_std::emplace_ordered(fieldsVec, it);
500 : }
501 168 : return get(scheme, move(cb), oid, move(fieldsVec), flags);
502 84 : }
503 :
504 42 : bool Server::get(const Scheme &scheme, DataCallback &&cb, StringView alias, InitList<const db::Field *> &&fields, db::UpdateFlags flags) const {
505 42 : Vector<const db::Field *> fieldsVec;
506 84 : for (auto &it : fields) {
507 42 : mem_std::emplace_ordered(fieldsVec, it);
508 : }
509 84 : return get(scheme, move(cb), alias, move(fieldsVec), flags);
510 42 : }
511 :
512 84 : bool Server::get(const Scheme &scheme, DataCallback &&cb, const Value &id, InitList<const db::Field *> &&fields, db::UpdateFlags flags) const {
513 84 : if (id.isDictionary()) {
514 21 : if (auto oid = id.getInteger("__oid")) {
515 21 : return get(scheme, move(cb), oid, move(fields), flags);
516 : }
517 : } else {
518 63 : if ((id.isString() && stappler::valid::validateNumber(id.getString())) || id.isInteger()) {
519 42 : if (auto oid = id.getInteger()) {
520 42 : return get(scheme, move(cb), oid, move(fields), flags);
521 : }
522 : }
523 :
524 21 : auto &str = id.getString();
525 21 : if (!str.empty()) {
526 21 : return get(scheme, move(cb), str, move(fields), flags);
527 : }
528 : }
529 0 : return false;
530 : }
531 :
532 : // returns Array with zero or more Dictionaries with object data or Null value
533 42 : bool Server::select(const Scheme &scheme, DataCallback &&cb, QueryCallback &&qcb, db::UpdateFlags flags) const {
534 42 : if (!cb) {
535 0 : return false;
536 : }
537 :
538 42 : if (qcb) {
539 21 : auto p = new DataCallback(move(cb));
540 21 : auto q = new QueryCallback(move(qcb));
541 42 : return perform([this, scheme = &scheme, p, q, flags] (const Server &serv, const db::Transaction &t) {
542 21 : db::Query query;
543 21 : (*q)(query);
544 21 : delete q;
545 21 : auto ret = scheme->select(t, query, flags);
546 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
547 21 : (*p)(ret);
548 21 : delete p;
549 21 : });
550 21 : return true;
551 42 : });
552 : } else {
553 21 : auto p = new DataCallback(move(cb));
554 42 : return perform([this, scheme = &scheme, p, flags] (const Server &serv, const db::Transaction &t) {
555 21 : auto ret = scheme->select(t, db::Query(), flags);
556 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
557 21 : (*p)(ret);
558 21 : delete p;
559 21 : });
560 21 : return true;
561 42 : });
562 : }
563 : }
564 :
565 42 : bool Server::create(const Scheme &scheme, Value &&data, DataCallback &&cb, db::UpdateFlags flags) const {
566 42 : return create(scheme, move(data), move(cb), flags, db::Conflict::None);
567 : }
568 :
569 21 : bool Server::create(const Scheme &scheme, Value &&data, DataCallback &&cb, db::Conflict::Flags conflict) const {
570 21 : return create(scheme, move(data), move(cb), db::UpdateFlags::None, conflict);
571 : }
572 :
573 84 : bool Server::create(const Scheme &scheme, Value &&data, DataCallback &&cb, db::UpdateFlags flags, db::Conflict::Flags conflict) const {
574 84 : if (cb) {
575 63 : auto p = new DataCallback(move(cb));
576 126 : return perform([this, scheme = &scheme, data = move(data), flags, conflict, p] (const Server &serv, const db::Transaction &t) {
577 63 : auto ret = scheme->create(t, data, flags | db::UpdateFlags::NoReturn, conflict);
578 63 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
579 63 : (*p)(ret);
580 63 : delete p;
581 63 : });
582 63 : return true;
583 126 : });
584 : } else {
585 42 : return perform([scheme = &scheme, data = move(data), flags, conflict] (const Server &serv, const db::Transaction &t) {
586 21 : scheme->create(t, data, flags | db::UpdateFlags::NoReturn, conflict);
587 21 : return true;
588 21 : });
589 : }
590 : }
591 :
592 42 : bool Server::update(const Scheme &scheme, uint64_t oid, Value &&data, DataCallback &&cb, db::UpdateFlags flags) const {
593 42 : if (cb) {
594 21 : auto p = new DataCallback(move(cb));
595 42 : return perform([this, scheme = &scheme, oid, data = move(data), flags, p] (const Server &serv, const db::Transaction &t) {
596 21 : db::Value patch(data);
597 21 : auto ret = scheme->update(t, oid, patch, flags);
598 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
599 21 : (*p)(ret);
600 21 : delete p;
601 21 : });
602 21 : return true;
603 42 : });
604 : } else {
605 42 : return perform([scheme = &scheme, oid, data = move(data), flags] (const Server &serv, const db::Transaction &t) {
606 21 : db::Value patch(data);
607 21 : scheme->update(t, oid, patch, flags | db::UpdateFlags::NoReturn);
608 21 : return true;
609 42 : });
610 : }
611 : }
612 :
613 42 : bool Server::update(const Scheme &scheme, const Value & obj, Value &&data, DataCallback &&cb, db::UpdateFlags flags) const {
614 42 : if (cb) {
615 21 : auto p = new DataCallback(move(cb));
616 42 : return perform([this, scheme = &scheme, obj, data = move(data), flags, p] (const Server &serv, const db::Transaction &t) {
617 21 : db::Value value(obj);
618 21 : db::Value patch(data);
619 21 : auto ret = scheme->update(t, value, patch, flags);
620 21 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
621 21 : (*p)(ret);
622 21 : delete p;
623 21 : });
624 21 : return true;
625 42 : });
626 : } else {
627 42 : return perform([scheme = &scheme, obj, data = move(data), flags] (const Server &serv, const db::Transaction &t) {
628 21 : db::Value value(obj);
629 21 : db::Value patch(data);
630 21 : scheme->update(t, value, patch, flags | db::UpdateFlags::NoReturn);
631 21 : return true;
632 42 : });
633 : }
634 : }
635 :
636 63 : bool Server::remove(const Scheme &scheme, uint64_t oid, Function<void(bool)> &&cb) const {
637 63 : if (cb) {
638 42 : auto p = new Function<void(bool)>(move(cb));
639 84 : return perform([this, scheme = &scheme, oid, p] (const Server &serv, const db::Transaction &t) {
640 42 : auto ret = scheme->remove(t, oid);
641 42 : _data->application->performOnMainThread([p, ret] {
642 42 : (*p)(ret);
643 42 : delete p;
644 42 : });
645 42 : return true;
646 42 : });
647 : } else {
648 42 : return perform([scheme = &scheme, oid] (const Server &serv, const db::Transaction &t) {
649 21 : scheme->remove(t, oid);
650 21 : return true;
651 21 : });
652 : }
653 : }
654 :
655 21 : bool Server::remove(const Scheme &scheme, const Value &obj, Function<void(bool)> &&cb) const {
656 21 : return remove(scheme, obj.getInteger("__oid"), move(cb));
657 : }
658 :
659 21 : bool Server::count(const Scheme &scheme, Function<void(size_t)> &&cb) const {
660 21 : if (cb) {
661 21 : auto p = new Function<void(size_t)>(move(cb));
662 42 : return perform([this, scheme = &scheme, p] (const Server &serv, const db::Transaction &t) {
663 21 : auto c = scheme->count(t);
664 21 : _data->application->performOnMainThread([p, c] {
665 21 : (*p)(c);
666 21 : delete p;
667 21 : });
668 21 : return true;
669 21 : });
670 : }
671 0 : return false;
672 : }
673 :
674 21 : bool Server::count(const Scheme &scheme, Function<void(size_t)> &&cb, QueryCallback &&qcb) const {
675 21 : if (qcb) {
676 21 : if (cb) {
677 21 : auto p = new Function<void(size_t)>(move(cb));
678 21 : auto q = new QueryCallback(move(qcb));
679 42 : return perform([this, scheme = &scheme, p, q] (const Server &serv, const db::Transaction &t) {
680 21 : db::Query query;
681 21 : (*q)(query);
682 21 : delete q;
683 21 : auto c = scheme->count(t, query);
684 21 : _data->application->performOnMainThread([p, c] {
685 21 : (*p)(c);
686 21 : delete p;
687 21 : });
688 21 : return true;
689 42 : });
690 : }
691 : } else {
692 0 : return count(scheme, move(cb));
693 : }
694 0 : return false;
695 : }
696 :
697 21 : bool Server::touch(const Scheme &scheme, uint64_t id) const {
698 42 : return perform([scheme = &scheme, id] (const Server &serv, const db::Transaction &t) {
699 21 : scheme->touch(t, id);
700 21 : return true;
701 42 : });
702 : }
703 :
704 21 : bool Server::touch(const Scheme &scheme, const Value & obj) const {
705 42 : return perform([scheme = &scheme, obj] (const Server &serv, const db::Transaction &t) {
706 21 : db::Value value(obj);
707 21 : scheme->touch(t, value);
708 21 : return true;
709 63 : });
710 : }
711 :
712 1281 : bool Server::perform(Function<bool(const Server &, const db::Transaction &)> &&cb, Ref *ref) const {
713 1281 : if (!_data) {
714 0 : return false;
715 : }
716 :
717 1281 : if (std::this_thread::get_id() == _data->thread.get_id()) {
718 0 : _data->execute(ServerData::TaskCallback(move(cb), ref));
719 : } else {
720 1281 : _data->queue.push(0, false, ServerData::TaskCallback(move(cb), ref));
721 1281 : _data->condition.notify_one();
722 : }
723 1281 : return true;
724 : }
725 :
726 21 : Application *Server::getApplication() const {
727 21 : return _data->application;
728 : }
729 :
730 252 : bool Server::get(const Scheme &scheme, DataCallback &&cb, uint64_t oid,
731 : Vector<const db::Field *> &&fields, db::UpdateFlags flags) const {
732 252 : if (!cb) {
733 0 : return false;
734 : }
735 :
736 252 : auto p = new DataCallback(move(cb));
737 504 : return perform([this, scheme = &scheme, oid, flags, p, fields = move(fields)] (const Server &serv, const db::Transaction &t) {
738 252 : auto ret = scheme->get(t, oid, fields, flags);
739 252 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
740 252 : (*p)(ret);
741 252 : delete p;
742 252 : });
743 252 : return true;
744 504 : });
745 : }
746 :
747 126 : bool Server::get(const Scheme &scheme, DataCallback &&cb, StringView alias,
748 : Vector<const db::Field *> &&fields, db::UpdateFlags flags) const {
749 126 : if (!cb) {
750 0 : return false;
751 : }
752 :
753 126 : auto p = new DataCallback(move(cb));
754 252 : return perform([this, scheme = &scheme, alias = alias.str<Interface>(), flags, p, fields = move(fields)] (const Server &serv, const db::Transaction &t) {
755 126 : auto ret = scheme->get(t, alias, fields, flags);
756 126 : _data->application->performOnMainThread([p, ret = xenolith::Value(ret)] {
757 126 : (*p)(ret);
758 126 : delete p;
759 126 : });
760 126 : return true;
761 252 : });
762 : }
763 :
764 42 : Server::ServerData::ServerData() {
765 42 : queue.setQueueLocking(mutexQueue);
766 42 : queue.setFreeLocking(mutexFree);
767 42 : documentRoot = filesystem::writablePath<db::Interface>();
768 42 : }
769 :
770 21 : Server::ServerData::~ServerData() {
771 21 : shouldQuit.clear();
772 21 : condition.notify_all();
773 21 : thread.join();
774 21 : }
775 :
776 42 : bool Server::ServerData::init() {
777 42 : thread = std::thread(ThreadInterface::workerThread, this, nullptr);
778 42 : return true;
779 : }
780 :
781 1281 : bool Server::ServerData::execute(const TaskCallback &task) {
782 1281 : if (currentTransaction) {
783 0 : if (!task.callback) {
784 0 : return false;
785 : }
786 0 : return task.callback(*server, *currentTransaction);
787 : }
788 :
789 1281 : bool ret = false;
790 :
791 1281 : memory::pool::push(threadPool);
792 :
793 1281 : driver->performWithStorage(handle, [&, this] (const db::Adapter &adapter) {
794 1281 : adapter.performWithTransaction([&, this] (const db::Transaction &t) {
795 1281 : currentTransaction = &t;
796 2562 : auto ret = task.callback(*server, t);
797 1281 : currentTransaction = nullptr;
798 1281 : return ret;
799 : });
800 1281 : });
801 :
802 1281 : memory::pool::pop();
803 1281 : memory::pool::clear(threadPool);
804 :
805 1281 : runAsync();
806 :
807 1281 : return ret;
808 : }
809 :
810 1323 : void Server::ServerData::runAsync() {
811 1323 : memory::pool::push(asyncPool);
812 :
813 1554 : while (asyncTasks && driver->isValid(handle)) {
814 231 : auto tmp = asyncTasks;
815 231 : asyncTasks = nullptr;
816 :
817 231 : driver->performWithStorage(handle, [&, this] (const db::Adapter &adapter) {
818 231 : adapter.performWithTransaction([&, this] (const db::Transaction &t) {
819 231 : auto &vec = *tmp;
820 231 : currentTransaction = &t;
821 462 : for (auto &it : vec) {
822 231 : it(t);
823 : }
824 231 : currentTransaction = nullptr;
825 231 : return true;
826 : });
827 231 : });
828 : }
829 :
830 1323 : memory::pool::pop();
831 1323 : memory::pool::clear(asyncPool);
832 1323 : }
833 :
834 42 : void Server::ServerData::threadInit() {
835 : //db::setStorageRoot(this);
836 :
837 42 : memory::pool::initialize();
838 42 : memory::pool::push(serverPool);
839 42 : handle = driver->connect(params);
840 41 : if (!handle.get()) {
841 0 : StringStream out;
842 0 : for (auto &it : params) {
843 0 : out << "\n\t" << it.first << ": " << it.second;
844 : }
845 0 : log::error("StorageServer", "Fail to initialize DB with params: ", out.str());
846 0 : }
847 41 : memory::pool::pop();
848 :
849 42 : asyncPool = memory::pool::create();
850 :
851 42 : threadPool = memory::pool::create();
852 42 : memory::pool::push(threadPool);
853 :
854 42 : driver->init(handle, db::Vector<db::StringView>());
855 :
856 42 : driver->performWithStorage(handle, [&, this] (const db::Adapter &adapter) {
857 42 : db::Scheme::initSchemes(predefinedSchemes);
858 42 : interfaceConfig.name = adapter.getDatabaseName();
859 42 : adapter.init(interfaceConfig, predefinedSchemes);
860 42 : });
861 :
862 42 : memory::pool::pop();
863 42 : memory::pool::clear(threadPool);
864 :
865 42 : runAsync();
866 :
867 42 : if (!serverName.empty()) {
868 42 : thread::ThreadInfo::setThreadInfo(serverName);
869 : }
870 :
871 41 : now = platform::clock(core::ClockType::Monotonic);
872 41 : }
873 :
874 1908 : bool Server::ServerData::worker() {
875 3816 : if (!shouldQuit.test_and_set()) {
876 21 : return false;
877 : }
878 :
879 1887 : auto t = platform::clock(core::ClockType::Monotonic);
880 1887 : if (t - now > TimeInterval::seconds(1).toMicros()) {
881 482 : now = t;
882 482 : handleHeartbeat();
883 : }
884 :
885 1886 : TaskCallback task;
886 : do {
887 1886 : queue.pop_direct([&] (memory::PriorityQueue<TaskCallback>::PriorityType, TaskCallback &&cb) {
888 1281 : task = move(cb);
889 1280 : });
890 : } while (0);
891 :
892 1886 : if (!task.callback) {
893 606 : std::unique_lock<std::mutex> lock(mutexQueue);
894 606 : if (!queue.empty(lock)) {
895 2 : return true;
896 : }
897 604 : condition.wait_for(lock, std::chrono::seconds(1));
898 583 : return true;
899 585 : }
900 :
901 1281 : if (!driver->isValid(handle)) {
902 0 : return false;
903 : }
904 :
905 1281 : execute(task);
906 1281 : return true;
907 1866 : }
908 :
909 21 : void Server::ServerData::threadDispose() {
910 21 : memory::pool::push(threadPool);
911 :
912 21 : while (!queue.empty()) {
913 0 : TaskCallback task;
914 : do {
915 0 : queue.pop_direct([&] (memory::PriorityQueue<TaskCallback>::PriorityType, TaskCallback &&cb) SP_COVERAGE_TRIVIAL {
916 : task = move(cb);
917 : });
918 : } while (0);
919 :
920 0 : if (task.callback) {
921 0 : execute(task);
922 : }
923 0 : }
924 :
925 21 : if (driver->isValid(handle)) {
926 21 : driver->performWithStorage(handle, [&, this] (const db::Adapter &adapter) {
927 21 : auto it = components.begin();
928 42 : while (it != components.end()) {
929 21 : adapter.performWithTransaction([&, this] (const db::Transaction &t) {
930 : do {
931 21 : memory::pool::context ctx(it->second->pool);
932 42 : for (auto &iit : it->second->components) {
933 21 : iit.second->handleChildRelease(*server, t);
934 21 : iit.second->~Component();
935 : }
936 :
937 21 : it->second->container->handleStorageDisposed(t);
938 21 : } while (0);
939 21 : return true;
940 : });
941 21 : memory::pool::destroy(it->second->pool);
942 21 : it = components.erase(it);
943 : }
944 21 : components.clear();
945 21 : });
946 : }
947 :
948 21 : memory::pool::pop();
949 :
950 21 : memory::pool::destroy(threadPool);
951 21 : memory::pool::destroy(asyncPool);
952 21 : memory::pool::terminate();
953 21 : }
954 :
955 482 : void Server::ServerData::handleHeartbeat() {
956 785 : for (auto &it : components) {
957 606 : for (auto &iit : it.second->components) {
958 303 : iit.second->handleHeartbeat(*server);
959 : }
960 : }
961 482 : }
962 :
963 252 : void Server::ServerData::addAsyncTask(const db::Callback<db::Function<void(const db::Transaction &)>(db::pool_t *)> &setupCb) const {
964 252 : memory::pool::push(asyncPool);
965 252 : if (!asyncTasks) {
966 252 : asyncTasks = new (asyncPool) db::Vector<db::Function<void(const db::Transaction &)>>;
967 : }
968 252 : asyncTasks->emplace_back(setupCb(asyncPool));
969 252 : memory::pool::pop();
970 252 : }
971 :
972 42 : bool Server::ServerData::addComponent(ComponentContainer *comp, const db::Transaction &t) {
973 42 : ServerComponentLoader loader(this, t);
974 :
975 42 : memory::pool::push(loader.getPool());
976 42 : comp->handleStorageInit(loader);
977 42 : memory::pool::pop();
978 :
979 84 : return loader.run(comp);
980 42 : }
981 :
982 21 : void Server::ServerData::removeComponent(ComponentContainer *comp, const db::Transaction &t) {
983 21 : auto cmpIt = components.find(comp);
984 21 : if (cmpIt == components.end()) {
985 0 : return;
986 : }
987 :
988 : do {
989 21 : memory::pool::context ctx(cmpIt->second->pool);
990 42 : for (auto &it : cmpIt->second->components) {
991 21 : it.second->handleChildRelease(*server, t);
992 21 : it.second->~Component();
993 : }
994 :
995 21 : cmpIt->second->container->handleStorageDisposed(t);
996 21 : } while (0);
997 :
998 21 : memory::pool::destroy(cmpIt->second->pool);
999 21 : components.erase(cmpIt);
1000 : }
1001 :
1002 252 : void Server::ServerData::scheduleAyncDbTask(const db::Callback<db::Function<void(const db::Transaction &)>(db::pool_t *)> &setupCb) const {
1003 252 : addAsyncTask(setupCb);
1004 252 : }
1005 :
1006 63 : db::StringView Server::ServerData::getDocumentRoot() const {
1007 63 : return documentRoot;
1008 : }
1009 :
1010 21 : const db::Scheme *Server::ServerData::getFileScheme() const {
1011 21 : return nullptr;
1012 : }
1013 :
1014 21 : const db::Scheme *Server::ServerData::getUserScheme() const {
1015 21 : return nullptr;
1016 : }
1017 :
1018 21 : void Server::ServerData::pushErrorMessage(db::Value &&val) const {
1019 21 : log::error("xenolith::Server", data::EncodeFormat::Pretty, val);
1020 21 : }
1021 :
1022 21 : void Server::ServerData::pushDebugMessage(db::Value &&val) const {
1023 21 : log::debug("xenolith::Server", data::EncodeFormat::Pretty, val);
1024 21 : }
1025 :
1026 1532 : void Server::ServerData::initTransaction(db::Transaction &t) const {
1027 3022 : for (auto &it : components) {
1028 2977 : for (auto &iit : it.second->components) {
1029 1485 : iit.second->handleStorageTransaction(t);
1030 : }
1031 : }
1032 1531 : }
1033 :
1034 42 : ServerComponentLoader::~ServerComponentLoader() {
1035 42 : if (_pool) {
1036 0 : memory::pool::destroy(_pool);
1037 0 : _pool = nullptr;
1038 : }
1039 42 : }
1040 :
1041 42 : ServerComponentLoader::ServerComponentLoader(Server::ServerData *data, const db::Transaction &t)
1042 42 : : _data(data), _pool(memory::pool::create(data->serverPool)), _server(data->server), _transaction(&t) {
1043 42 : memory::pool::context ctx(_pool);
1044 :
1045 42 : _components = new ServerComponentData;
1046 42 : _components->pool = _pool;
1047 42 : }
1048 :
1049 42 : void ServerComponentLoader::exportComponent(Component *comp) {
1050 42 : memory::pool::context ctx(_pool);
1051 :
1052 42 : _components->components.emplace(comp->getName(), comp);
1053 41 : }
1054 :
1055 84 : const db::Scheme * ServerComponentLoader::exportScheme(const db::Scheme &scheme) {
1056 84 : return _components->schemes.emplace(scheme.getName(), &scheme).first->second;
1057 : }
1058 :
1059 42 : bool ServerComponentLoader::run(ComponentContainer *comp) {
1060 42 : memory::pool::context ctx(_pool);
1061 :
1062 42 : _components->container = comp;
1063 42 : _data->components.emplace(comp, _components);
1064 :
1065 42 : db::Scheme::initSchemes(_components->schemes);
1066 42 : _transaction->getAdapter().init(_data->interfaceConfig, _components->schemes);
1067 :
1068 84 : for (auto &it : _components->components) {
1069 42 : it.second->handleChildInit(*_server, *_transaction);
1070 : }
1071 :
1072 42 : _pool = nullptr;
1073 42 : _components = nullptr;
1074 42 : return true;
1075 42 : }
1076 :
1077 : }
|