LCOV - code coverage report
Current view: top level - core/threads - SPThreadTaskQueue.cc (source / functions) Hit Total Coverage
Test: coverage.info Lines: 248 394 62.9 %
Date: 2024-05-12 00:16:13 Functions: 42 64 65.6 %

          Line data    Source code
       1             : /**
       2             : Copyright (c) 2019 Roman Katuntsev <sbkarr@stappler.org>
       3             : Copyright (c) 2023 Stappler LLC <admin@stappler.dev>
       4             : 
       5             : Permission is hereby granted, free of charge, to any person obtaining a copy
       6             : of this software and associated documentation files (the "Software"), to deal
       7             : in the Software without restriction, including without limitation the rights
       8             : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
       9             : copies of the Software, and to permit persons to whom the Software is
      10             : furnished to do so, subject to the following conditions:
      11             : 
      12             : The above copyright notice and this permission notice shall be included in
      13             : all copies or substantial portions of the Software.
      14             : 
      15             : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
      16             : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
      17             : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
      18             : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
      19             : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
      20             : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
      21             : THE SOFTWARE.
      22             : **/
      23             : 
      24             : #include "SPThreadTaskQueue.h"
      25             : #include "SPThread.h"
      26             : #include <chrono>
      27             : 
      28             : namespace STAPPLER_VERSIONIZED stappler::thread {
      29             : 
      30             : SPUNUSED static uint32_t getNextThreadId();
      31             : 
      32             : class Worker : public ThreadInterface<memory::StandartInterface> {
      33             : public:
      34             :         struct LocalQueue {
      35             :                 std::mutex mutexQueue;
      36             :                 std::mutex mutexFree;
      37             :                 memory::PriorityQueue<Rc<Task>> queue;
      38             : 
      39           0 :                 LocalQueue() {
      40           0 :                         queue.setFreeLocking(mutexFree);
      41           0 :                         queue.setQueueLocking(mutexQueue);
      42           0 :                 }
      43             :         };
      44             : 
      45             :         Worker(TaskQueue::WorkerContext *queue, uint32_t threadId, uint32_t workerId, StringView name);
      46             :         virtual ~Worker();
      47             : 
      48             : #if SP_REF_DEBUG
      49             :         virtual uint64_t retain() override;
      50             :         virtual void release(uint64_t) override;
      51             : #else
      52             :         uint64_t retain();
      53             :         void release(uint64_t);
      54             : #endif
      55             : 
      56             :         bool execute(Task *task);
      57             : 
      58             :         virtual void threadInit() override;
      59             :         virtual void threadDispose() override;
      60             :         virtual bool worker() override;
      61             : 
      62             :         std::thread &getThread();
      63           0 :         std::thread::id getThreadId() const { return _threadId; }
      64             : 
      65             :         void perform(Rc<Task> &&);
      66             : 
      67             : protected:
      68             :         uint64_t _queueRefId = 0;
      69             :         TaskQueue::WorkerContext *_queue = nullptr;
      70             :         LocalQueue *_local = nullptr;
      71             :         std::thread::id _threadId;
      72             :         std::atomic<int32_t> _refCount;
      73             :         std::atomic_flag _shouldQuit;
      74             : 
      75             :         memory::PoolFlags _flags = memory::PoolFlags::None;
      76             :         memory::pool_t *_pool = nullptr;
      77             : 
      78             :         uint32_t _managerId;
      79             :         uint32_t _workerId;
      80             :         StringView _name;
      81             :         std::thread _thread;
      82             : };
      83             : 
      84             : struct TaskQueue::WorkerContext {
      85             :         struct ExitCondition {
      86             :                 std::mutex mutex;
      87             :                 std::condition_variable condition;
      88             :         };
      89             : 
      90             :         memory::pool_t *pool;
      91             :         TaskQueue *queue;
      92             :         Flags flags;
      93             : 
      94             :         std::condition_variable_any *conditionAny = nullptr;
      95             :         std::condition_variable *conditionGeneral = nullptr;
      96             :         ExitCondition *exit = nullptr;
      97             : 
      98             :         std::atomic<bool> finalized;
      99             : 
     100             :         std::vector<Worker *> workers;
     101             : 
     102         100 :         WorkerContext(TaskQueue *queue, Flags flags) : queue(queue), flags(flags) {
     103         100 :                 pool = memory::pool::create(memory::app_root_pool);
     104             : 
     105         100 :                 finalized = false;
     106             : 
     107         100 :                 if ((flags & Flags::LocalQueue) != Flags::None) {
     108           0 :                         conditionAny = new std::condition_variable_any;
     109             :                 } else {
     110         100 :                         conditionGeneral = new std::condition_variable;
     111             :                 }
     112             : 
     113         100 :                 if ((flags & Flags::Cancelable) != Flags::None || (flags & Flags::Waitable) != Flags::None) {
     114         100 :                         exit = new ExitCondition;
     115             :                 }
     116         100 :         }
     117             : 
     118         100 :         ~WorkerContext() {
     119         100 :                 if (conditionAny) { delete conditionAny; }
     120         100 :                 if (conditionGeneral) { delete conditionGeneral; }
     121         100 :                 if (exit) { delete exit; }
     122         100 :                 if (pool) {
     123         100 :                         memory::pool::destroy(pool);
     124             :                 }
     125         100 :         }
     126             : 
     127     2568911 :         bool isWaitEnabled() const {
     128     2568911 :                 return (flags & Flags::Waitable) != Flags::None;
     129             :         }
     130             : 
     131     1073128 :         void wait(std::unique_lock<std::mutex> &lock) {
     132     1073128 :                 if (finalized.load() != true) {
     133     1073127 :                         if (conditionGeneral) {
     134     1073127 :                                 conditionGeneral->wait(lock);
     135           0 :                         } else if (conditionAny) {
     136           0 :                                 conditionAny->wait(lock);
     137             :                         }
     138             :                 }
     139     1073068 :         }
     140             : 
     141     1234566 :         void notify() {
     142     1234566 :                 if (conditionGeneral) {
     143     1234566 :                         conditionGeneral->notify_one();
     144           0 :                 } else if (conditionAny) {
     145           0 :                         conditionAny->notify_one();
     146             :                 }
     147     1234567 :         }
     148             : 
     149      871958 :         void notifyAll() {
     150      871958 :                 if (conditionGeneral) {
     151      871960 :                         conditionGeneral->notify_all();
     152           0 :                 } else if (conditionAny) {
     153           0 :                         conditionAny->notify_all();
     154             :                 }
     155      871960 :         }
     156             : 
     157     1782143 :         void notifyWait() {
     158     1782143 :                 if (exit) {
     159     1782155 :                         exit->condition.notify_one();
     160             :                 }
     161     1782507 :         }
     162             : 
     163           0 :         void notifyExit() {
     164           0 :                 if (exit) {
     165           0 :                         exit->condition.notify_one();
     166             :                 }
     167           0 :         }
     168             : 
     169           0 :         void finalize() {
     170           0 :                 finalized = true;
     171           0 :                 notifyAll();
     172           0 :         }
     173             : 
     174         100 :         void spawn(uint32_t threadId, uint32_t threadCount, StringView name) {
     175         350 :                 for (uint32_t i = 0; i < threadCount; i++) {
     176         250 :                         workers.push_back(new Worker(this, threadId, i, name.empty() ? queue->getName() : name));
     177             :                 }
     178         100 :         }
     179             : 
     180         100 :         void cancel() {
     181         350 :                 for (auto &it : workers) {
     182         250 :                         it->release(0);
     183             :                 }
     184             : 
     185         100 :                 notifyAll();
     186             : 
     187         350 :                 for (auto &it : workers) {
     188         250 :                         it->getThread().join();
     189         250 :                         delete it;
     190             :                 }
     191             : 
     192         100 :                 workers.clear();
     193         100 :         }
     194             : 
     195          64 :         void waitExit(TimeInterval iv) {
     196          64 :                 std::unique_lock<std::mutex> exitLock(exit->mutex);
     197          64 :                 exit->condition.wait_for(exitLock, std::chrono::microseconds(iv.toMicros()));
     198          64 :                 queue->update();
     199          64 :         }
     200             : 
     201           0 :         bool waitExternal(uint32_t *count) {
     202           0 :                 std::unique_lock<std::mutex> waitLock(exit->mutex);
     203           0 :                 exit->condition.wait(waitLock);
     204           0 :                 queue->update(count);
     205           0 :                 return true;
     206           0 :         }
     207             : 
     208      553118 :         bool waitExternal(TimeInterval iv, uint32_t *count) {
     209      553118 :                 std::unique_lock<std::mutex> waitLock(exit->mutex);
     210      553118 :                 auto ret = exit->condition.wait_for(waitLock, std::chrono::microseconds(iv.toMicros()), [&, this] {
     211     1093969 :                         return queue->getOutputCounter() > 0;
     212             :                 });
     213      552971 :                 if (!ret) {
     214        3165 :                         if (count) {
     215        1770 :                                 *count = 0;
     216             :                         }
     217        3165 :                         return false;
     218             :                 } else {
     219      549806 :                         waitLock.unlock();
     220      549901 :                         queue->update(count);
     221      549870 :                         return true;
     222             :                 }
     223      553035 :         }
     224             : 
     225         100 :         void lockExternal() {
     226         100 :                 if (exit) {
     227         100 :                         exit->mutex.lock();
     228             :                 }
     229         100 :         }
     230             : 
     231         100 :         void unlockExternal() {
     232         100 :                 if (exit) {
     233         100 :                         exit->mutex.unlock();
     234             :                 }
     235         100 :         }
     236             : };
     237             : 
     238             : class _SingleTaskWorker : public ThreadInterface<memory::StandartInterface> {
     239             : public:
     240           0 :         _SingleTaskWorker(const Rc<TaskQueue> &q, Rc<Task> &&task)
     241           0 :         : _queue(q), _task(std::move(task)), _managerId(getNextThreadId()) { }
     242             : 
     243           0 :         virtual ~_SingleTaskWorker() { }
     244             : 
     245           0 :         bool execute(Task *task) {
     246           0 :                 return task->execute();
     247             :         }
     248             : 
     249           0 :         virtual void threadInit() override {
     250           0 :                 ThreadInfo::setThreadInfo(_managerId, 0, "Worker", true);
     251           0 :         }
     252             : 
     253           0 :         virtual bool worker() override {
     254           0 :                 if (_task) {
     255           0 :                         memory::pool::initialize();
     256           0 :                         auto pool = memory::pool::create();
     257             : 
     258           0 :                         memory::pool::push(pool);
     259           0 :                         auto ret = execute(_task);
     260           0 :                         memory::pool::pop();
     261             : 
     262           0 :                         _task->setSuccessful(ret);
     263           0 :                         if (!_task->getCompleteTasks().empty()) {
     264           0 :                                 _queue->onMainThread(std::move(_task));
     265             :                         }
     266             : 
     267           0 :                         memory::pool::destroy(pool);
     268           0 :                         memory::pool::terminate();
     269             :                 }
     270             : 
     271           0 :                 delete this;
     272           0 :                 return false;
     273             :         }
     274             : 
     275             : protected:
     276             :         Rc<TaskQueue> _queue;
     277             :         Rc<Task> _task;
     278             :         uint32_t _managerId;
     279             : };
     280             : 
     281         100 : TaskQueue::TaskQueue(StringView name, std::function<void()> &&wakeup)
     282         100 : : _wakeup(move(wakeup)) {
     283         100 :         _inputQueue.setQueueLocking(_inputMutexQueue);
     284         100 :         _inputQueue.setFreeLocking(_inputMutexFree);
     285         100 :         if (!name.empty()) {
     286          50 :                 _name = name;
     287             :         }
     288             : 
     289         100 :         _outputQueue.reserve(2);
     290         100 :         _outputCallbacks.reserve(2);
     291         100 : }
     292             : 
     293         150 : TaskQueue::~TaskQueue() {
     294         100 :         if (_context) {
     295           0 :                 cancelWorkers();
     296           0 :                 update();
     297             :         }
     298             : 
     299         100 :         _inputQueue.foreach([&] (memory::PriorityQueue<Rc<Task>>::PriorityType p, const Rc<Task> &t) {
     300           0 :                 if (t) {
     301           0 :                         t->setSuccessful(false);
     302           0 :                         t->onComplete();
     303             :                 }
     304           0 :         });
     305         100 :         _inputQueue.clear();
     306         150 : }
     307             : 
     308           0 : void TaskQueue::finalize() {
     309           0 :         if (_context) {
     310           0 :                 _context->finalize();
     311             :         }
     312           0 : }
     313             : 
     314           0 : void TaskQueue::performAsync(Rc<Task> &&task) {
     315           0 :         if (task) {
     316           0 :                 _SingleTaskWorker *worker = new _SingleTaskWorker(this, std::move(task));
     317           0 :                 std::thread wThread(_SingleTaskWorker::workerThread, worker, this);
     318           0 :                 wThread.detach();
     319           0 :         }
     320           0 : }
     321             : 
     322     1234557 : void TaskQueue::perform(Rc<Task> &&task, bool first) {
     323     1234557 :         if (!task) {
     324           0 :                 return;
     325             :         }
     326             : 
     327     1234557 :         if (!task->prepare()) {
     328           0 :                 task->setSuccessful(false);
     329           0 :                 onMainThread(std::move(task));
     330           0 :                 return;
     331             :         }
     332             : 
     333     1234554 :         task->addRef(this);
     334             : 
     335     1234554 :         ++ _tasksCounter;
     336     1234560 :         _inputQueue.push(task->getPriority().get(), first, std::move(task));
     337     1234566 :         if (_context) {
     338     1234566 :                 _context->notify();
     339             :         }
     340             : }
     341             : 
     342       78701 : void TaskQueue::perform(std::function<void()> &&cb, Ref *ref, bool first) {
     343       78701 :         perform(Rc<Task>::create([fn = move(cb)] (const Task &) -> bool {
     344       78460 :                 fn();
     345       78658 :                 return true;
     346       78701 :         }, nullptr, ref), first);
     347       78701 : }
     348             : 
     349           0 : bool TaskQueue::perform(TaskMap &&tasks) {
     350           0 :         if (tasks.empty()) {
     351           0 :                 return false;
     352             :         }
     353             : 
     354           0 :         if (!_context || (_context->flags & Flags::LocalQueue) == Flags::None) {
     355           0 :                 return false;
     356             :         }
     357             : 
     358           0 :         for (auto &it : tasks) {
     359           0 :                 if (it.first > _context->workers.size()) {
     360           0 :                         continue;
     361             :                 }
     362             : 
     363           0 :                 Worker * w = _context->workers[it.first];
     364           0 :                 for (Rc<Task> &t : it.second) {
     365           0 :                         if (!t->prepare()) {
     366           0 :                                 t->setSuccessful(false);
     367           0 :                                 onMainThread(std::move(t));
     368             :                         } else {
     369           0 :                                 t->addRef(this);
     370           0 :                                 ++ _tasksCounter;
     371           0 :                                 w->perform(move(t));
     372             :                         }
     373             :                 }
     374             :         }
     375             : 
     376           0 :         _context->notifyAll();
     377           0 :         return true;
     378             : }
     379             : 
     380   215548815 : Rc<Task> TaskQueue::popTask(uint32_t idx) {
     381   215548815 :         Rc<Task> ret;
     382   215548097 :         _inputQueue.pop_direct([&] (memory::PriorityQueue<Rc<Task>>::PriorityType, Rc<Task> &&task) {
     383     1234417 :                 ret = move(task);
     384     1234401 :         });
     385   215935521 :         return ret;
     386           0 : }
     387             : 
     388     1143784 : void TaskQueue::update(uint32_t *count) {
     389     1143784 :     _outputMutex.lock();
     390             : 
     391     1143654 :         auto stack = std::move(_outputQueue);
     392     1142288 :         auto callbacks = std::move(_outputCallbacks);
     393             : 
     394     1142041 :         _outputQueue.clear();
     395     1141982 :         _outputCallbacks.clear();
     396             : 
     397     1141841 :         _outputCounter.store(0);
     398             : 
     399     1142762 :         _outputMutex.unlock();
     400             : 
     401     1143401 :         if (_context) {
     402     1143427 :                 memory::pool::push(_context->pool);
     403             :         }
     404             : 
     405     2368579 :         for (auto &task : stack) {
     406     1226543 :                 task->onComplete();
     407             :         }
     408             : 
     409     1617844 :         for (auto &task : callbacks) {
     410      476931 :                 task.first();
     411             :         }
     412             : 
     413     1141999 :         if (_context) {
     414     1144787 :                 memory::pool::pop();
     415     1144697 :                 memory::pool::clear(_context->pool);
     416             :         }
     417             : 
     418     1144879 :     if (count) {
     419      119758 :         *count += stack.size() + callbacks.size();
     420             :     }
     421             : 
     422     2289255 :     if (_context && _tasksCounter.load() > 0) {
     423      871859 :         _context->notifyAll();
     424             :     }
     425     1144844 : }
     426             : 
     427       71188 : void TaskQueue::onMainThread(Rc<Task> &&task) {
     428       71188 :     if (!task) {
     429           0 :         return;
     430             :     }
     431             : 
     432       71188 :     task->addRef(this);
     433             : 
     434       71188 :     _outputMutex.lock();
     435       71188 :     _outputQueue.push_back(std::move(task));
     436       71188 :     ++ _outputCounter;
     437       71188 :         _outputMutex.unlock();
     438             : 
     439       71188 :         if (_context && _context->isWaitEnabled()) {
     440       71163 :                 _context->notifyWait();
     441             :         }
     442             : 
     443       71188 :         if (_wakeup) {
     444           0 :                 _wakeup();
     445             :         }
     446             : 
     447      142376 :         if (_tasksCounter.load() == 0 && _context && !_context->isWaitEnabled()) {
     448           0 :                 _context->notifyExit();
     449             :         }
     450             : }
     451             : 
     452      476553 : void TaskQueue::onMainThread(std::function<void()> &&func, Ref *target) {
     453      476553 :     _outputMutex.lock();
     454      477029 :     _outputCallbacks.emplace_back(std::move(func), target);
     455      476926 :     ++ _outputCounter;
     456      477000 :         _outputMutex.unlock();
     457             : 
     458      476983 :         if (_context && _context->isWaitEnabled()) {
     459      476947 :                 _context->notifyWait();
     460             :         }
     461             : 
     462      477052 :         if (_wakeup) {
     463           0 :                 _wakeup();
     464             :         }
     465             : 
     466      954074 :         if (_tasksCounter.load() == 0 && _context && !_context->isWaitEnabled()) {
     467           0 :                 _context->notifyExit();
     468             :         }
     469      477028 : }
     470             : 
     471           0 : std::vector<std::thread::id> TaskQueue::getThreadIds() const {
     472           0 :         if (!_context) {
     473           0 :                 return std::vector<std::thread::id>();
     474             :         }
     475             : 
     476           0 :         std::vector<std::thread::id> ret;
     477           0 :         for (Worker *it : _context->workers) {
     478           0 :                 ret.emplace_back(it->getThreadId());
     479             :         }
     480           0 :         return ret;
     481           0 : }
     482             : 
     483        6340 : uint16_t TaskQueue::getThreadCount() const {
     484        6340 :         if (!_context) {
     485           0 :                 return 0;
     486             :         }
     487             : 
     488        6340 :         return uint16_t(_context->workers.size());
     489             : }
     490             : 
     491     1232072 : void TaskQueue::onMainThreadWorker(Rc<Task> &&task) {
     492     1232072 :     if (!task) {
     493           0 :         return;
     494             :     }
     495             : 
     496     1231982 :         if (!task->getCompleteTasks().empty()) {
     497     1153195 :                 _outputMutex.lock();
     498     1155666 :                 _outputQueue.push_back(std::move(task));
     499     1155666 :             ++ _outputCounter;
     500     1155666 :                 _outputMutex.unlock();
     501             : 
     502     1155637 :                 if (_context && _context->isWaitEnabled()) {
     503     1155619 :                         _context->notifyWait();
     504             :                 }
     505             : 
     506     1155611 :                 if (_wakeup) {
     507           0 :                         _wakeup();
     508             :                 }
     509             : 
     510     2311184 :                 if (_tasksCounter.fetch_sub(1) == 1 && _context && !_context->isWaitEnabled()) {
     511           0 :                         _context->notifyExit();
     512             :                 }
     513             :         } else {
     514       78659 :                 if (_context && _context->isWaitEnabled()) {
     515       78643 :                         _context->notifyWait();
     516             :                 }
     517             : 
     518      157720 :                 if (_tasksCounter.fetch_sub(1) == 1 && _context && !_context->isWaitEnabled()) {
     519           0 :                         _context->notifyExit();
     520             :                 }
     521             :         }
     522             : }
     523             : 
     524           0 : bool TaskQueue::spawnWorkers(Flags flags) {
     525           0 :         return spawnWorkers(flags, getNextThreadId(), uint16_t(std::thread::hardware_concurrency()), _name);
     526             : }
     527             : 
     528         100 : bool TaskQueue::spawnWorkers(Flags flags, uint32_t threadId, uint16_t threadCount, StringView name) {
     529         100 :         if (_context) {
     530           0 :                 return false;
     531             :         }
     532             : 
     533         100 :         if (threadId == maxOf<uint32_t>()) {
     534           0 :                 threadId = getNextThreadId();
     535             :         }
     536             : 
     537         100 :         _context = new WorkerContext(this, flags);
     538         100 :         _context->spawn(threadId, threadCount, name);
     539             : 
     540         100 :         return true;
     541             : }
     542             : 
     543         100 : bool TaskQueue::cancelWorkers() {
     544         100 :         if (!_context) {
     545           0 :                 return false;
     546             :         }
     547             : 
     548         100 :         _context->cancel();
     549         100 :         update();
     550         100 :         delete _context;
     551         100 :         _context = nullptr;
     552         100 :         return true;
     553             : }
     554             : 
     555           0 : void TaskQueue::performAll(Flags flags) {
     556           0 :         if (!_context) {
     557           0 :                 if (!spawnWorkers(flags | Flags::Cancelable)) {
     558           0 :                         return;
     559             :                 }
     560             :         }
     561           0 :         if (!waitForAll()) {
     562           0 :                 return;
     563             :         }
     564           0 :         cancelWorkers();
     565             : }
     566             : 
     567         200 : bool TaskQueue::waitForAll(TimeInterval iv) {
     568         200 :         if (!_context || (_context->flags & Flags::Cancelable) == Flags::None) {
     569          50 :                 return false;
     570             :         }
     571             : 
     572         150 :         update();
     573         428 :         while (_tasksCounter.load() != 0) {
     574          64 :                 _context->waitExit(iv);
     575             :         }
     576         150 :         return true;
     577             : }
     578             : 
     579           0 : bool TaskQueue::wait(uint32_t *count) {
     580           0 :         if (!_context || (_context->flags & Flags::Waitable) == Flags::None) {
     581           0 :                 return false;
     582             :         }
     583             : 
     584           0 :         return _context->waitExternal(count);
     585             : }
     586             : 
     587      553130 : bool TaskQueue::wait(TimeInterval iv, uint32_t *count) {
     588      553130 :         if (!_context || (_context->flags & Flags::Waitable) == Flags::None) {
     589           0 :                 return false;
     590             :         }
     591             : 
     592      553123 :         return _context->waitExternal(iv, count);
     593             : }
     594             : 
     595         100 : void TaskQueue::lock() {
     596         100 :         if (!_context) {
     597           0 :                 return;
     598             :         }
     599             : 
     600         100 :         _context->lockExternal();
     601             : }
     602             : 
     603         100 : void TaskQueue::unlock() {
     604         100 :         if (!_context) {
     605           0 :                 return;
     606             :         }
     607             : 
     608         100 :         _context->unlockExternal();
     609             : }
     610             : 
     611             : 
     612         250 : Worker::Worker(TaskQueue::WorkerContext *queue, uint32_t threadId, uint32_t workerId, StringView name)
     613         250 : : _queue(queue), _refCount(1), _shouldQuit(), _managerId(threadId), _workerId(workerId), _name(name) {
     614         250 :         _queueRefId = _queue->queue->retain();
     615         250 :         if ((queue->flags & TaskQueue::Flags::LocalQueue) != TaskQueue::Flags::None) {
     616           0 :                 _local = new LocalQueue;
     617             :         }
     618         250 :         _thread = std::thread(Worker::workerThread, this, queue->queue);
     619         250 : }
     620             : 
     621         500 : Worker::~Worker() {
     622         250 :         _queue->queue->release(_queueRefId);
     623         250 :         if (_local) {
     624           0 :                 delete _local;
     625           0 :                 _local = nullptr;
     626             :         }
     627         500 : }
     628             : 
     629           0 : uint64_t Worker::retain() {
     630           0 :         _refCount ++;
     631           0 :         return 0;
     632             : }
     633             : 
     634         250 : void Worker::release(uint64_t) {
     635         250 :         if (--_refCount <= 0) {
     636         250 :                 _shouldQuit.clear();
     637             :         }
     638         250 : }
     639             : 
     640     1234135 : bool Worker::execute(Task *task) {
     641     1234135 :         memory::pool::push(_pool);
     642     1233919 :         auto ret = task->execute();
     643     1232687 :         memory::pool::pop();
     644     1232518 :         memory::pool::clear(_pool);
     645     1232316 :         return ret;
     646             : }
     647             : 
     648         250 : void Worker::threadInit() {
     649         250 :         memory::pool::initialize();
     650         250 :         _pool = memory::pool::createTagged(_name.data(), _flags);
     651             : 
     652         250 :         _shouldQuit.test_and_set();
     653         250 :         _threadId = std::this_thread::get_id();
     654             : 
     655         250 :         ThreadInfo::setThreadInfo(_managerId, _workerId, _name, true);
     656         250 : }
     657             : 
     658         244 : void Worker::threadDispose() {
     659         244 :         memory::pool::destroy(_pool);
     660         245 :         memory::pool::terminate();
     661         246 : }
     662             : 
     663   215809983 : bool Worker::worker() {
     664   431619966 :         if (!_shouldQuit.test_and_set()) {
     665         249 :                 return false;
     666             :         } else {
     667   215809734 :                 memory::pool::clear(_pool);
     668             :         }
     669             : 
     670   215603156 :         Rc<Task> task;
     671   215575146 :         if (_local) {
     672           0 :                 _local->queue.pop_direct([&] (memory::PriorityQueue<Rc<Task>>::PriorityType, Rc<Task> &&t) {
     673           0 :                         task = move(t);
     674           0 :                 });
     675             :         }
     676             : 
     677   215575146 :         if (!task) {
     678   215580584 :                 task = _queue->queue->popTask(_workerId);
     679             :         }
     680             : 
     681   215884997 :         if (!task) {
     682   214661696 :                 if (_local) {
     683           0 :                         std::unique_lock<std::mutex> lock(_local->mutexQueue);
     684           0 :                         if (!_local->queue.empty(lock)) {
     685           0 :                                 return true;
     686             :                         }
     687           0 :                         if (_queue->queue->_tasksCounter.load() > 0) {
     688             :                                 // some task received after locking
     689           0 :                                 return true;
     690             :                         }
     691           0 :                         _queue->wait(lock);
     692           0 :                 } else {
     693   214661696 :                         std::unique_lock<std::mutex> lock(_queue->queue->_inputMutexQueue);
     694   429408476 :                         if (_queue->queue->_tasksCounter.load() > 0) {
     695             :                                 // some task received after locking
     696   213635930 :                                 return true;
     697             :                         }
     698     1064896 :                         _queue->wait(lock);
     699     1073060 :                         return true;
     700   214708990 :                 }
     701             :         }
     702             : 
     703     1217254 :         task->setSuccessful(execute(task));
     704     1232216 :         _queue->queue->onMainThreadWorker(std::move(task));
     705             : 
     706     1234341 :         return true;
     707   215943673 : }
     708             : 
     709         250 : std::thread &Worker::getThread() {
     710         250 :         return _thread;
     711             : }
     712             : 
     713           0 : void Worker::perform(Rc<Task> &&task) {
     714           0 :         if (_local) {
     715           0 :                 auto p = task->getPriority();
     716           0 :                 _local->queue.push(p.get(), false, move(task));
     717             :         }
     718           0 : }
     719             : 
     720             : }

Generated by: LCOV version 1.14