Line data Source code
1 : /**
2 : Copyright (c) 2019-2022 Roman Katuntsev <sbkarr@stappler.org>
3 : Copyright (c) 2023-2024 Stappler LLC <admin@stappler.dev>
4 :
5 : Permission is hereby granted, free of charge, to any person obtaining a copy
6 : of this software and associated documentation files (the "Software"), to deal
7 : in the Software without restriction, including without limitation the rights
8 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 : copies of the Software, and to permit persons to whom the Software is
10 : furnished to do so, subject to the following conditions:
11 :
12 : The above copyright notice and this permission notice shall be included in
13 : all copies or substantial portions of the Software.
14 :
15 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 : THE SOFTWARE.
22 : **/
23 :
24 : #include "SPPqDriver.h"
25 : #include "SPPqHandle.h"
26 : #include "SPSqlHandle.h"
27 : #include "SPDso.h"
28 : #include "SPDbFieldExtensions.h"
29 :
30 : namespace STAPPLER_VERSIONIZED stappler::db::pq {
31 :
32 : constexpr static auto LIST_DB_TYPES = "SELECT oid, typname, typcategory FROM pg_type WHERE typcategory = 'B'"
33 : " OR typcategory = 'D' OR typcategory = 'I' OR typcategory = 'N' OR typcategory = 'S' OR typcategory = 'U';";
34 :
35 : enum ConnStatusType {
36 : CONNECTION_OK,
37 : CONNECTION_BAD,
38 : };
39 :
40 : enum ExecStatusType {
41 : PGRES_EMPTY_QUERY = 0,
42 : PGRES_COMMAND_OK,
43 : PGRES_TUPLES_OK,
44 : PGRES_COPY_OUT,
45 : PGRES_COPY_IN,
46 : PGRES_BAD_RESPONSE,
47 : PGRES_NONFATAL_ERROR,
48 : PGRES_FATAL_ERROR,
49 : PGRES_COPY_BOTH,
50 : PGRES_SINGLE_TUPLE
51 : };
52 :
53 : enum PGTransactionStatusType {
54 : PQTRANS_IDLE,
55 : PQTRANS_ACTIVE,
56 : PQTRANS_INTRANS,
57 : PQTRANS_INERROR,
58 : PQTRANS_UNKNOWN
59 : };
60 :
61 : struct pgNotify {
62 : char *relname; /* notification condition name */
63 : int be_pid; /* process ID of notifying server process */
64 : char *extra; /* notification parameter */
65 : /* Fields below here are private to libpq; apps should not use 'em */
66 : pgNotify *next; /* list link */
67 : };
68 :
69 : struct DriverSym : AllocBase {
70 : using PQnoticeProcessor = void (*) (void *arg, const char *message);
71 : using PQresultStatusType = ExecStatusType (*) (const void *res);
72 : using PQconnectdbParamsType = void * (*) (const char * const *keywords, const char * const *values, int expand_dbname);
73 : using PQfinishType = void (*) (void *conn);
74 : using PQfformatType = int (*) (const void *res, int field_num);
75 : using PQgetisnullType = int (*) (const void *res, int tup_num, int field_num);
76 : using PQgetvalueType = char *(*) (const void *res, int tup_num, int field_num);
77 : using PQgetlengthType = int (*) (const void *res, int tup_num, int field_num);
78 : using PQfnameType = char *(*) (const void *res, int field_num);
79 : using PQftypeType = unsigned int (*) (const void *res, int field_num);
80 : using PQntuplesType = int (*) (const void *res);
81 : using PQnfieldsType = int (*) (const void *res);
82 : using PQcmdTuplesType = char *(*) (void *res);
83 : using PQresStatusType = char *(*) (ExecStatusType status);
84 : using PQresultErrorMessageType = char *(*) (const void *res);
85 : using PQclearType = void (*) (void *res);
86 : using PQexecType = void *(*) (void *conn, const char *query);
87 : using PQexecParamsType = void *(*) (void *conn, const char *command, int nParams, const void *paramTypes,
88 : const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat);
89 : using PQsendQueryType = int (*) (void *conn, const char *query);
90 : using PQstatusType = ConnStatusType (*) (void *conn);
91 : using PQerrorMessageType = char * (*) (const void *conn);
92 : using PQresetType = void (*) (void *conn);
93 : using PQtransactionStatusType = PGTransactionStatusType (*) (void *conn);
94 : using PQsetnonblockingType = int (*) (void *conn, int arg);
95 : using PQsocketType = int (*) (const void *conn);
96 : using PQconsumeInputType = int (*) (void *conn);
97 : using PQnotifiesType = pgNotify *(*) (void *conn);
98 : using PQfreememType = void (*) (void *ptr);
99 : using PQisBusyType = int (*) (void *conn);
100 : using PQgetResultType = void *(*) (void *conn);
101 : using PQsetNoticeProcessorType = void (*) (void *conn, PQnoticeProcessor, void *);
102 :
103 25 : DriverSym(StringView n, Dso &&d) : name(n), ptr(move(d)) {
104 25 : this->PQresultStatus = ptr.sym<DriverSym::PQresultStatusType>("PQresultStatus");
105 25 : this->PQconnectdbParams = ptr.sym<DriverSym::PQconnectdbParamsType>("PQconnectdbParams");
106 25 : this->PQfinish = ptr.sym<DriverSym::PQfinishType>("PQfinish");
107 25 : this->PQfformat = ptr.sym<DriverSym::PQfformatType>("PQfformat");
108 25 : this->PQgetisnull = ptr.sym<DriverSym::PQgetisnullType>("PQgetisnull");
109 25 : this->PQgetvalue = ptr.sym<DriverSym::PQgetvalueType>("PQgetvalue");
110 25 : this->PQgetlength = ptr.sym<DriverSym::PQgetlengthType>("PQgetlength");
111 25 : this->PQfname = ptr.sym<DriverSym::PQfnameType>("PQfname");
112 25 : this->PQftype = ptr.sym<DriverSym::PQftypeType>("PQftype");
113 25 : this->PQntuples = ptr.sym<DriverSym::PQntuplesType>("PQntuples");
114 25 : this->PQnfields = ptr.sym<DriverSym::PQnfieldsType>("PQnfields");
115 25 : this->PQcmdTuples = ptr.sym<DriverSym::PQcmdTuplesType>("PQcmdTuples");
116 25 : this->PQresStatus = ptr.sym<DriverSym::PQresStatusType>("PQresStatus");
117 25 : this->PQresultErrorMessage = ptr.sym<DriverSym::PQresultErrorMessageType>("PQresultErrorMessage");
118 25 : this->PQclear = ptr.sym<DriverSym::PQclearType>("PQclear");
119 25 : this->PQexec = ptr.sym<DriverSym::PQexecType>("PQexec");
120 25 : this->PQexecParams = ptr.sym<DriverSym::PQexecParamsType>("PQexecParams");
121 25 : this->PQsendQuery = ptr.sym<DriverSym::PQsendQueryType>("PQsendQuery");
122 25 : this->PQstatus = ptr.sym<DriverSym::PQstatusType>("PQstatus");
123 25 : this->PQerrorMessage = ptr.sym<DriverSym::PQerrorMessageType>("PQerrorMessage");
124 25 : this->PQreset = ptr.sym<DriverSym::PQresetType>("PQreset");
125 25 : this->PQtransactionStatus = ptr.sym<DriverSym::PQtransactionStatusType>("PQtransactionStatus");
126 25 : this->PQsetnonblocking = ptr.sym<DriverSym::PQsetnonblockingType>("PQsetnonblocking");
127 25 : this->PQsocket = ptr.sym<DriverSym::PQsocketType>("PQsocket");
128 25 : this->PQconsumeInput = ptr.sym<DriverSym::PQconsumeInputType>("PQconsumeInput");
129 25 : this->PQnotifies = ptr.sym<DriverSym::PQnotifiesType>("PQnotifies");
130 25 : this->PQfreemem = ptr.sym<DriverSym::PQfreememType>("PQfreemem");
131 25 : this->PQisBusy = ptr.sym<DriverSym::PQisBusyType>("PQisBusy");
132 25 : this->PQgetResult = ptr.sym<DriverSym::PQgetResultType>("PQgetResult");
133 25 : this->PQsetNoticeProcessor = ptr.sym<DriverSym::PQsetNoticeProcessorType>("PQsetNoticeProcessor");
134 25 : }
135 :
136 50 : ~DriverSym() { }
137 :
138 25 : explicit operator bool () const {
139 25 : void **begin = (void **)&this->PQconnectdbParams;
140 25 : void **end = (void **)&this->PQsetNoticeProcessor + 1;
141 775 : while (begin != end) {
142 750 : if (*begin == nullptr) {
143 0 : return false;
144 : }
145 750 : ++ begin;
146 : }
147 25 : return true;
148 : }
149 :
150 25 : DriverSym(DriverSym &&) = default;
151 : DriverSym &operator=(DriverSym &&) = default;
152 :
153 : StringView name;
154 : Dso ptr;
155 : PQconnectdbParamsType PQconnectdbParams = nullptr;
156 : PQfinishType PQfinish = nullptr;
157 : PQresultStatusType PQresultStatus = nullptr;
158 : PQfformatType PQfformat = nullptr;
159 : PQgetisnullType PQgetisnull = nullptr;
160 : PQgetvalueType PQgetvalue = nullptr;
161 : PQgetlengthType PQgetlength = nullptr;
162 : PQfnameType PQfname = nullptr;
163 : PQftypeType PQftype = nullptr;
164 : PQntuplesType PQntuples = nullptr;
165 : PQnfieldsType PQnfields = nullptr;
166 : PQcmdTuplesType PQcmdTuples = nullptr;
167 : PQresStatusType PQresStatus = nullptr;
168 : PQresultErrorMessageType PQresultErrorMessage = nullptr;
169 : PQclearType PQclear = nullptr;
170 : PQexecType PQexec = nullptr;
171 : PQexecParamsType PQexecParams = nullptr;
172 : PQsendQueryType PQsendQuery = nullptr;
173 : PQstatusType PQstatus = nullptr;
174 : PQerrorMessageType PQerrorMessage = nullptr;
175 : PQresetType PQreset = nullptr;
176 : PQtransactionStatusType PQtransactionStatus = nullptr;
177 : PQsetnonblockingType PQsetnonblocking = nullptr;
178 : PQsocketType PQsocket = nullptr;
179 : PQconsumeInputType PQconsumeInput = nullptr;
180 : PQnotifiesType PQnotifies = nullptr;
181 : PQfreememType PQfreemem = nullptr;
182 : PQisBusyType PQisBusy = nullptr;
183 : PQgetResultType PQgetResult = nullptr;
184 : PQsetNoticeProcessorType PQsetNoticeProcessor = nullptr;
185 : uint32_t refCount = 1;
186 : };
187 :
188 : struct DriverHandle {
189 : void *conn;
190 : const Driver *driver;
191 : Time ctime;
192 : pool_t *pool;
193 : };
194 :
195 : struct PgDriverLibStorage {
196 : std::mutex s_driverMutex;
197 : std::map<std::string, DriverSym, std::less<void>> s_driverLibs;
198 :
199 : static PgDriverLibStorage *getInstance();
200 :
201 100 : DriverSym *openLib(StringView lib) {
202 100 : std::unique_lock<std::mutex> lock(s_driverMutex);
203 :
204 100 : auto target = lib.str<stappler::memory::StandartInterface>();
205 100 : auto it = s_driverLibs.find(target);
206 100 : if (it != s_driverLibs.end()) {
207 25 : ++ it->second.refCount;
208 25 : return &it->second;
209 : }
210 :
211 75 : if (auto d = Dso(target)) {
212 25 : DriverSym syms(target, move(d));
213 25 : if (syms) {
214 25 : auto ret = s_driverLibs.emplace(target, move(syms)).first;
215 25 : ret->second.name = ret->first;
216 25 : return &ret->second;
217 : }
218 100 : }
219 :
220 50 : return nullptr;
221 100 : }
222 :
223 50 : void closeLib(DriverSym *sym) {
224 50 : std::unique_lock<std::mutex> lock(s_driverMutex);
225 50 : if (sym->refCount == 1) {
226 25 : s_driverLibs.erase(sym->name.str<stappler::memory::StandartInterface>());
227 : } else {
228 25 : -- sym->refCount;
229 : }
230 50 : }
231 : };
232 :
233 : static PgDriverLibStorage *s_libStorage;
234 : SPUNUSED static String pg_numeric_to_string(BytesViewNetwork r);
235 :
236 150 : PgDriverLibStorage *PgDriverLibStorage::getInstance() {
237 150 : if (!s_libStorage) {
238 25 : s_libStorage = new PgDriverLibStorage;
239 : }
240 150 : return s_libStorage;
241 : }
242 :
243 25 : void Driver_noticeMessage(void *arg, const char *message) {
244 : // std::cout << "Notice: " << message << "\n";
245 : // Silence libpq notices
246 25 : }
247 :
248 300 : static void Driver_insert_sorted(Vector<Pair<uint32_t, BackendInterface::StorageType>> & vec, uint32_t oid, BackendInterface::StorageType type) {
249 300 : auto it = std::upper_bound(vec.begin(), vec.end(), oid, [] (uint32_t l, const Pair<uint32_t, BackendInterface::StorageType> &r) -> bool {
250 625 : return l < r.first;
251 300 : });
252 300 : vec.emplace(it, oid, type);
253 300 : }
254 :
255 1100 : static void Driver_insert_sorted(Vector<Pair<uint32_t, String>> & vec, uint32_t oid, StringView type) {
256 1100 : auto it = std::upper_bound(vec.begin(), vec.end(), oid, [] (uint32_t l, const Pair<uint32_t, String> &r) -> bool {
257 4375 : return l < r.first;
258 1100 : });
259 1100 : vec.emplace(it, oid, type.str<Interface>());
260 1100 : }
261 :
262 25 : bool Driver::init(Handle handle, const Vector<StringView> &dbs) {
263 25 : if (_init) {
264 0 : return true;
265 : }
266 :
267 25 : auto conn = getConnection(handle);
268 25 : Vector<StringView> toCreate(dbs);
269 25 : if (!dbs.empty()) {
270 0 : auto res = exec(conn, "SELECT datname FROM pg_database;");
271 :
272 0 : for (size_t i = 0; i < getNTuples(res); ++ i) {
273 0 : auto name = StringView(getValue(res, i, 0), getLength(res, i, 0));
274 0 : auto it = std::find(toCreate.begin(), toCreate.end(), name);
275 0 : if (it != toCreate.end()) {
276 0 : toCreate.erase(it);
277 : }
278 : }
279 :
280 0 : clearResult(res);
281 :
282 0 : if (!toCreate.empty()) {
283 0 : for (auto &it : toCreate) {
284 0 : StringStream query;
285 0 : query << "CREATE DATABASE " << it << ";";
286 0 : auto q = query.weak().data();
287 0 : auto res = exec(conn, q);
288 0 : clearResult(res);
289 0 : }
290 : }
291 : }
292 :
293 25 : ResultCursor result(this, exec(conn, LIST_DB_TYPES));
294 :
295 25 : db::sql::Result res(&result);
296 25 : pool::push(_storageTypes.get_allocator());
297 1425 : for (auto it : res) {
298 1400 : auto tid = it.toInteger(0);
299 1400 : auto tname = it.at(1);
300 1400 : if (tname == "bool") {
301 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Bool);
302 1375 : } else if (tname == "bytea") {
303 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Bytes);
304 1350 : } else if (tname == "char") {
305 0 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Char);
306 1350 : } else if (tname == "int8") {
307 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Int8);
308 1325 : } else if (tname == "int4") {
309 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Int4);
310 1300 : } else if (tname == "int2") {
311 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Int2);
312 1275 : } else if (tname == "float4") {
313 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Float4);
314 1250 : } else if (tname == "float8") {
315 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Float8);
316 1225 : } else if (tname == "varchar") {
317 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::VarChar);
318 1200 : } else if (tname == "text") {
319 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Text);
320 1175 : } else if (tname == "name") {
321 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Text);
322 1150 : } else if (tname == "numeric") {
323 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::Numeric);
324 1125 : } else if (tname == "tsvector") {
325 25 : Driver_insert_sorted(_storageTypes, uint32_t(tid), BackendInterface::StorageType::TsVector);
326 : } else {
327 1100 : Driver_insert_sorted(_customTypes, uint32_t(tid), tname);
328 : }
329 : }
330 25 : pool::pop();
331 :
332 25 : _init = true;
333 25 : return true;
334 25 : }
335 :
336 375 : void Driver::performWithStorage(Handle handle, const Callback<void(const db::Adapter &)> &cb) const {
337 375 : auto targetPool = pool::acquire();
338 :
339 375 : db::pq::Handle h(this, handle);
340 375 : db::Adapter storage(&h, _application);
341 375 : pool::userdata_set((void *)&h, config::STORAGE_INTERFACE_KEY.data(), nullptr, targetPool);
342 :
343 375 : cb(storage);
344 :
345 375 : auto stack = stappler::memory::pool::get<db::Transaction::Stack>(targetPool, config::STORAGE_TRANSACTION_STACK_KEY);
346 375 : if (stack) {
347 350 : for (auto &it : stack->stack) {
348 0 : if (it->adapter == storage) {
349 0 : it->adapter = db::Adapter(nullptr, _application);
350 0 : _application->error("Root", "Incomplete transaction found");
351 : }
352 : }
353 : }
354 375 : pool::userdata_set((void *)nullptr, storage.getTransactionKey().data(), nullptr, targetPool);
355 375 : pool::userdata_set((void *)nullptr, config::STORAGE_INTERFACE_KEY.data(), nullptr, targetPool);
356 375 : }
357 :
358 0 : BackendInterface *Driver::acquireInterface(Handle handle, pool_t *pool) const {
359 0 : BackendInterface *ret = nullptr;
360 0 : pool::push(pool);
361 0 : ret = new (pool) db::pq::Handle(this, handle);
362 0 : pool::pop();
363 0 : return ret;
364 : }
365 :
366 25 : Driver::Handle Driver::connect(const Map<StringView, StringView> ¶ms) const {
367 25 : auto p = pool::create(pool::acquire());
368 : Driver::Handle rec;
369 25 : pool::push(p);
370 : do {
371 25 : Vector<const char *> keywords; keywords.reserve(params.size());
372 25 : Vector<const char *> values; values.reserve(params.size());
373 :
374 150 : for (auto &it : params) {
375 125 : if (it.first == "host"
376 100 : || it.first == "hostaddr"
377 100 : || it.first == "port"
378 100 : || it.first == "dbname"
379 75 : || it.first == "user"
380 50 : || it.first == "password"
381 25 : || it.first == "passfile"
382 25 : || it.first == "channel_binding"
383 25 : || it.first == "connect_timeout"
384 25 : || it.first == "client_encoding"
385 25 : || it.first == "options"
386 25 : || it.first == "application_name"
387 25 : || it.first == "fallback_application_name"
388 25 : || it.first == "keepalives"
389 25 : || it.first == "keepalives_idle"
390 25 : || it.first == "keepalives_interval"
391 25 : || it.first == "keepalives_count"
392 25 : || it.first == "tcp_user_timeout"
393 25 : || it.first == "replication"
394 25 : || it.first == "gssencmode"
395 25 : || it.first == "sslmode"
396 25 : || it.first == "requiressl"
397 25 : || it.first == "sslcompression"
398 25 : || it.first == "sslcert"
399 25 : || it.first == "sslkey"
400 25 : || it.first == "sslpassword"
401 25 : || it.first == "sslrootcert"
402 25 : || it.first == "sslcrl"
403 25 : || it.first == "requirepeer"
404 25 : || it.first == "ssl_min_protocol_version"
405 25 : || it.first == "ssl_max_protocol_version"
406 25 : || it.first == "krbsrvname"
407 25 : || it.first == "gsslib"
408 25 : || it.first == "service"
409 225 : || it.first == "target_session_attrs") {
410 100 : keywords.emplace_back(it.first.data());
411 100 : values.emplace_back(it.second.data());
412 50 : } else if (it.first != "driver" && it.first == "nmin" && it.first == "nkeep"
413 50 : && it.first == "nmax" && it.first == "exptime" && it.first == "persistent") {
414 0 : std::cout << "[pq::Driver] unknown connection parameter: " << it.first << "=" << it.second << "\n";
415 : }
416 : }
417 :
418 25 : keywords.emplace_back(nullptr);
419 25 : values.emplace_back(nullptr);
420 :
421 25 : rec = doConnect(keywords.data(), values.data(), 0);
422 25 : } while (0);
423 25 : pool::pop();
424 :
425 25 : if (!rec.get()) {
426 0 : pool::destroy(p);
427 : }
428 25 : return rec;
429 : }
430 :
431 0 : void Driver::finish(Handle h) const {
432 0 : auto db = (DriverHandle *)h.get();
433 0 : if (db && db->pool) {
434 0 : pool::destroy(db->pool);
435 : }
436 0 : }
437 :
438 25 : bool Driver::isValid(Handle handle) const {
439 25 : if (!handle.get()) {
440 0 : return false;
441 : }
442 :
443 25 : auto conn = getConnection(handle);
444 25 : if (conn.get()) {
445 25 : return isValid(conn);
446 : }
447 0 : return false;
448 : }
449 :
450 25 : bool Driver::isValid(Connection conn) const {
451 25 : if (_handle->PQstatus(conn.get()) != CONNECTION_OK) {
452 0 : _handle->PQreset(conn.get());
453 0 : if (_handle->PQstatus(conn.get()) != CONNECTION_OK) {
454 0 : return false;
455 : }
456 : }
457 25 : return true;
458 : }
459 :
460 0 : bool Driver::isIdle(Connection conn) const {
461 0 : return getTransactionStatus(conn) == TransactionStatus::Idle;
462 : }
463 :
464 0 : Time Driver::getConnectionTime(Handle handle) const {
465 0 : auto db = (DriverHandle *)handle.get();
466 0 : return db->ctime;
467 : }
468 :
469 0 : int Driver::listenForNotifications(Handle handle) const {
470 0 : auto conn = getConnection(handle).get();
471 :
472 0 : auto query = toString("LISTEN ", config::BROADCAST_CHANNEL_NAME, ";");
473 0 : int querySent = _handle->PQsendQuery(conn, query.data());
474 0 : if (querySent == 0) {
475 0 : std::cout << "[Postgres]: " << _handle->PQerrorMessage(conn) << "\n";
476 0 : return -1;
477 : }
478 :
479 0 : if (_handle->PQsetnonblocking(conn, 1) == -1) {
480 0 : std::cout << "[Postgres]: " << _handle->PQerrorMessage(conn) << "\n";
481 0 : return -1;
482 : } else {
483 0 : return _handle->PQsocket(conn);
484 : }
485 0 : }
486 :
487 0 : bool Driver::consumeNotifications(Handle handle, const Callback<void(StringView)> &cb) const {
488 0 : auto conn = getConnection(handle).get();
489 :
490 0 : auto connStatusType = _handle->PQstatus(conn);
491 0 : if (connStatusType == CONNECTION_BAD) {
492 0 : return false;
493 : }
494 :
495 0 : int rc = _handle->PQconsumeInput(conn);
496 0 : if (rc == 0) {
497 0 : std::cout << "[Postgres]: " << _handle->PQerrorMessage(conn) << "\n";
498 0 : return false;
499 : }
500 : pgNotify *notify;
501 0 : while ((notify = _handle->PQnotifies(conn)) != NULL) {
502 0 : cb(notify->relname);
503 0 : _handle->PQfreemem(notify);
504 : }
505 0 : if (_handle->PQisBusy(conn) == 0) {
506 : void *result;
507 0 : while ((result = _handle->PQgetResult(conn)) != NULL) {
508 0 : _handle->PQclear(result);
509 : }
510 : }
511 0 : return true;
512 : }
513 :
514 0 : Driver::TransactionStatus Driver::getTransactionStatus(Connection conn) const {
515 0 : auto ret = _handle->PQtransactionStatus(conn.get());
516 0 : switch (ret) {
517 0 : case PQTRANS_IDLE: return TransactionStatus::Idle; break;
518 0 : case PQTRANS_ACTIVE: return TransactionStatus::Active; break;
519 0 : case PQTRANS_INTRANS: return TransactionStatus::InTrans; break;
520 0 : case PQTRANS_INERROR: return TransactionStatus::InError; break;
521 0 : case PQTRANS_UNKNOWN: return TransactionStatus::Unknown; break;
522 : }
523 0 : return TransactionStatus::Unknown;
524 : }
525 :
526 5775 : Driver::Status Driver::getStatus(Result res) const {
527 5775 : auto err = _handle->PQresultStatus(res.get());
528 5775 : switch (err) {
529 0 : case PGRES_EMPTY_QUERY: return Driver::Status::Empty; break;
530 1475 : case PGRES_COMMAND_OK: return Driver::Status::CommandOk; break;
531 4300 : case PGRES_TUPLES_OK: return Driver::Status::TuplesOk; break;
532 0 : case PGRES_COPY_OUT: return Driver::Status::CopyOut; break;
533 0 : case PGRES_COPY_IN: return Driver::Status::CopyIn; break;
534 0 : case PGRES_BAD_RESPONSE: return Driver::Status::BadResponse; break;
535 0 : case PGRES_NONFATAL_ERROR: return Driver::Status::NonfatalError; break;
536 0 : case PGRES_FATAL_ERROR: return Driver::Status::FatalError; break;
537 0 : case PGRES_COPY_BOTH: return Driver::Status::CopyBoth; break;
538 0 : case PGRES_SINGLE_TUPLE: return Driver::Status::SingleTuple; break;
539 0 : default: break;
540 : }
541 0 : return Driver::Status::Empty;
542 : }
543 :
544 28125 : bool Driver::isBinaryFormat(Result res, size_t field) const {
545 28125 : return _handle->PQfformat(res.get(), field) != 0;
546 : }
547 :
548 18125 : bool Driver::isNull(Result res, size_t row, size_t field) const {
549 18125 : return _handle->PQgetisnull(res.get(), row, field);
550 : }
551 :
552 28125 : char *Driver::getValue(Result res, size_t row, size_t field) const {
553 28125 : return _handle->PQgetvalue(res.get(), row, field);
554 : }
555 :
556 25850 : size_t Driver::getLength(Result res, size_t row, size_t field) const {
557 25850 : return size_t(_handle->PQgetlength(res.get(), row, field));
558 : }
559 :
560 19175 : char *Driver::getName(Result res, size_t field) const {
561 19175 : return _handle->PQfname(res.get(), field);
562 : }
563 :
564 8750 : unsigned int Driver::getType(Result res, size_t field) const {
565 8750 : return _handle->PQftype(res.get(), field);
566 : }
567 :
568 5775 : size_t Driver::getNTuples(Result res) const {
569 5775 : return size_t(_handle->PQntuples(res.get()));
570 : }
571 :
572 30650 : size_t Driver::getNFields(Result res) const {
573 30650 : return size_t(_handle->PQnfields(res.get()));
574 : }
575 :
576 175 : size_t Driver::getCmdTuples(Result res) const {
577 175 : return stappler::StringToNumber<size_t>(_handle->PQcmdTuples(res.get()));
578 : }
579 :
580 0 : char *Driver::getStatusMessage(Status st) const {
581 0 : switch (st) {
582 0 : case Status::Empty: return _handle->PQresStatus(PGRES_EMPTY_QUERY); break;
583 0 : case Status::CommandOk: return _handle->PQresStatus(PGRES_COMMAND_OK); break;
584 0 : case Status::TuplesOk: return _handle->PQresStatus(PGRES_TUPLES_OK); break;
585 0 : case Status::CopyOut: return _handle->PQresStatus(PGRES_COPY_OUT); break;
586 0 : case Status::CopyIn: return _handle->PQresStatus(PGRES_COPY_IN); break;
587 0 : case Status::BadResponse: return _handle->PQresStatus(PGRES_BAD_RESPONSE); break;
588 0 : case Status::NonfatalError: return _handle->PQresStatus(PGRES_NONFATAL_ERROR); break;
589 0 : case Status::FatalError: return _handle->PQresStatus(PGRES_FATAL_ERROR); break;
590 0 : case Status::CopyBoth: return _handle->PQresStatus(PGRES_COPY_BOTH); break;
591 0 : case Status::SingleTuple: return _handle->PQresStatus(PGRES_SINGLE_TUPLE); break;
592 : }
593 0 : return nullptr;
594 : }
595 :
596 0 : char *Driver::getResultErrorMessage(Result res) const {
597 0 : return _handle->PQresultErrorMessage(res.get());
598 : }
599 :
600 5800 : void Driver::clearResult(Result res) const {
601 5800 : if (_dbCtrl) {
602 0 : _dbCtrl(true);
603 : }
604 5800 : _handle->PQclear(res.get());
605 5800 : }
606 :
607 1350 : Driver::Result Driver::exec(Connection conn, const char *query) const {
608 1350 : if (_dbCtrl) {
609 0 : _dbCtrl(false);
610 : }
611 1350 : return Driver::Result(_handle->PQexec(conn.get(), query));
612 : }
613 :
614 4450 : Driver::Result Driver::exec(Connection conn, const char *command, int nParams, const char *const *paramValues,
615 : const int *paramLengths, const int *paramFormats, int resultFormat) const {
616 4450 : if (_dbCtrl) {
617 0 : _dbCtrl(false);
618 : }
619 4450 : return Driver::Result(_handle->PQexecParams(conn.get(), command, nParams, nullptr, paramValues, paramLengths, paramFormats, resultFormat));
620 : }
621 :
622 9150 : BackendInterface::StorageType Driver::getTypeById(uint32_t oid) const {
623 9150 : auto it = std::lower_bound(_storageTypes.begin(), _storageTypes.end(), oid, [] (const Pair<uint32_t, BackendInterface::StorageType> &l, uint32_t r) -> bool {
624 31725 : return l.first < r;
625 9150 : });
626 9150 : if (it != _storageTypes.end() && it->first == oid) {
627 9125 : return it->second;
628 : }
629 25 : return BackendInterface::StorageType::Unknown;
630 : }
631 :
632 0 : StringView Driver::getTypeNameById(uint32_t oid) const {
633 0 : auto it = std::lower_bound(_customTypes.begin(), _customTypes.end(), oid, [] (const Pair<uint32_t, String> &l, uint32_t r) -> bool {
634 0 : return l.first < r;
635 0 : });
636 0 : if (it != _customTypes.end() && it->first == oid) {
637 0 : return it->second;
638 : }
639 0 : return StringView();
640 : }
641 :
642 100 : Driver::~Driver() { }
643 :
644 50 : Driver *Driver::open(pool_t *pool, ApplicationInterface *app, StringView path, const void *external) {
645 50 : auto ret = new (pool::acquire()) Driver(pool, app, path, external);
646 50 : if (ret->_handle) {
647 50 : return ret;
648 : }
649 0 : return nullptr;
650 : }
651 :
652 50 : Driver::Driver(pool_t *pool, ApplicationInterface *app, StringView path, const void *external)
653 50 : : sql::Driver(pool, app), _external(external) {
654 50 : StringView name;
655 50 : if (path.empty() || path == "pgsql") {
656 : #if WIN32
657 : name = StringView("libpq.dll");
658 : #else
659 50 : name = StringView("libpq.so");
660 : #endif
661 : }
662 :
663 50 : if (auto l = PgDriverLibStorage::getInstance()->openLib(name)) {
664 0 : _handle = l;
665 :
666 0 : pool::cleanup_register(pool::acquire(), [this] {
667 0 : PgDriverLibStorage::getInstance()->closeLib(_handle);
668 0 : _handle = nullptr;
669 0 : });
670 : } else {
671 : #if WIN32
672 : name = StringView("libpq.5.dll");
673 : #else
674 50 : name = StringView("libpq.so.5");
675 : #endif
676 50 : if (auto l = PgDriverLibStorage::getInstance()->openLib(name)) {
677 50 : _handle = l;
678 :
679 50 : pool::cleanup_register(pool::acquire(), [this] {
680 50 : PgDriverLibStorage::getInstance()->closeLib(_handle);
681 50 : _handle = nullptr;
682 50 : });
683 : }
684 : }
685 :
686 50 : if (_handle) {
687 50 : auto it = _customFields.emplace(FieldIntArray::FIELD_NAME);
688 50 : if (!FieldIntArray::registerForPostgres(it.first->second)) {
689 0 : _customFields.erase(it.first);
690 : }
691 :
692 50 : it = _customFields.emplace(FieldBigIntArray::FIELD_NAME);
693 50 : if (!FieldBigIntArray::registerForPostgres(it.first->second)) {
694 0 : _customFields.erase(it.first);
695 : }
696 :
697 50 : it = _customFields.emplace(FieldPoint::FIELD_NAME);
698 50 : if (!FieldPoint::registerForPostgres(it.first->second)) {
699 0 : _customFields.erase(it.first);
700 : }
701 :
702 50 : it = _customFields.emplace(FieldTextArray::FIELD_NAME);
703 50 : if (!FieldTextArray::registerForPostgres(it.first->second)) {
704 0 : _customFields.erase(it.first);
705 : }
706 : }
707 50 : }
708 :
709 5775 : ResultCursor::ResultCursor(const Driver *d, Driver::Result res) : driver(d), result(res) {
710 5775 : err = result.get() ? driver->getStatus(result) : Driver::Status::FatalError;
711 5775 : nrows = driver->getNTuples(result);
712 5775 : }
713 :
714 5775 : ResultCursor::~ResultCursor() {
715 5775 : clear();
716 5775 : }
717 :
718 28125 : bool ResultCursor::isBinaryFormat(size_t field) const {
719 28125 : return driver->isBinaryFormat(result, field) != 0;
720 : }
721 :
722 18125 : bool ResultCursor::isNull(size_t field) const {
723 18125 : return driver->isNull(result, currentRow, field);
724 : }
725 :
726 10150 : StringView ResultCursor::toString(size_t field) const {
727 10150 : if (isBinaryFormat(field)) {
728 8750 : auto t = driver->getType(result, field);
729 8750 : auto s = driver->getTypeById(t);
730 8750 : switch (s) {
731 0 : case BackendInterface::StorageType::Unknown:
732 0 : driver->getApplicationInterface()->error("DB", "Unknown type conversion", Value(driver->getTypeNameById(t)));
733 0 : return StringView();
734 : break;
735 0 : case BackendInterface::StorageType::TsVector:
736 0 : return StringView();
737 : break;
738 0 : case BackendInterface::StorageType::Bool:
739 0 : return StringView(toString(toBool(field))).pdup();
740 : break;
741 0 : case BackendInterface::StorageType::Char:
742 0 : break;
743 0 : case BackendInterface::StorageType::Float4:
744 : case BackendInterface::StorageType::Float8:
745 0 : return StringView(toString(toDouble(field))).pdup();
746 : break;
747 0 : case BackendInterface::StorageType::Int2:
748 : case BackendInterface::StorageType::Int4:
749 : case BackendInterface::StorageType::Int8:
750 0 : return StringView(toString(toInteger(field))).pdup();
751 : break;
752 8750 : case BackendInterface::StorageType::Text:
753 : case BackendInterface::StorageType::VarChar:
754 8750 : return StringView(driver->getValue(result, currentRow, field), driver->getLength(result, currentRow, field));
755 : break;
756 0 : case BackendInterface::StorageType::Numeric: {
757 0 : stappler::BytesViewNetwork r((const uint8_t *)driver->getValue(result, currentRow, field),
758 0 : driver->getLength(result, currentRow, field));
759 0 : auto str = pg_numeric_to_string(r);
760 0 : return StringView(str).pdup();
761 : break;
762 0 : }
763 0 : case BackendInterface::StorageType::Bytes:
764 0 : return StringView(base16::encode<Interface>(toBytes(field))).pdup();
765 : break;
766 : }
767 0 : return StringView();
768 : } else {
769 1400 : return StringView(driver->getValue(result, currentRow, field), driver->getLength(result, currentRow, field));
770 : }
771 : }
772 :
773 5275 : BytesView ResultCursor::toBytes(size_t field) const {
774 5275 : if (isBinaryFormat(field)) {
775 5275 : return BytesView((uint8_t *)driver->getValue(result, currentRow, field), driver->getLength(result, currentRow, field));
776 : } else {
777 0 : auto val = driver->getValue(result, currentRow, field);
778 0 : auto len = driver->getLength(result, currentRow, field);
779 0 : if (len > 2 && memcmp(val, "\\x", 2) == 0) {
780 0 : auto d = new Bytes(stappler::base16::decode<Interface>(stappler::CoderSource(val + 2, len - 2)));
781 0 : return BytesView(*d);
782 : }
783 0 : return BytesView((uint8_t *)val, len);
784 : }
785 : }
786 9150 : int64_t ResultCursor::toInteger(size_t field) const {
787 9150 : if (isBinaryFormat(field)) {
788 15500 : stappler::BytesViewNetwork r((const uint8_t *)driver->getValue(result, currentRow, field),
789 7750 : driver->getLength(result, currentRow, field));
790 7750 : switch (r.size()) {
791 0 : case 1: return r.readUnsigned(); break;
792 0 : case 2: return r.readUnsigned16(); break;
793 1375 : case 4: return r.readUnsigned32(); break;
794 6375 : case 8: return r.readUnsigned64(); break;
795 0 : default: break;
796 : }
797 0 : return 0;
798 : } else {
799 1400 : auto val = driver->getValue(result, currentRow, field);
800 1400 : return stappler::StringToNumber<int64_t>(val, nullptr, 0);
801 : }
802 : }
803 1500 : double ResultCursor::toDouble(size_t field) const {
804 1500 : if (isBinaryFormat(field)) {
805 3000 : stappler::BytesViewNetwork r((const uint8_t *)driver->getValue(result, currentRow, field),
806 1500 : driver->getLength(result, currentRow, field));
807 1500 : switch (r.size()) {
808 0 : case 2: return r.readFloat16(); break;
809 750 : case 4: return r.readFloat32(); break;
810 750 : case 8: return r.readFloat64(); break;
811 0 : default: break;
812 : }
813 0 : return 0;
814 : } else {
815 0 : auto val = driver->getValue(result, currentRow, field);
816 0 : return stappler::StringToNumber<double>(val, nullptr, 0);
817 : }
818 : }
819 875 : bool ResultCursor::toBool(size_t field) const {
820 875 : auto val = driver->getValue(result, currentRow, field);
821 875 : if (!isBinaryFormat(field)) {
822 0 : if (val) {
823 0 : if (*val == 'T' || *val == 't' || *val == 'y') {
824 0 : return true;
825 : }
826 : }
827 0 : return false;
828 : } else {
829 875 : return val && *val != 0;
830 : }
831 : }
832 0 : Value ResultCursor::toTypedData(size_t field) const {
833 0 : auto t = driver->getType(result, field);
834 0 : auto s = driver->getTypeById(t);
835 0 : switch (s) {
836 0 : case BackendInterface::StorageType::Unknown:
837 0 : driver->getApplicationInterface()->error("DB", "Unknown type conversion", Value(driver->getTypeNameById(t)));
838 0 : return Value();
839 : break;
840 0 : case BackendInterface::StorageType::TsVector:
841 0 : return Value();
842 : break;
843 0 : case BackendInterface::StorageType::Bool:
844 0 : return Value(toBool(field));
845 : break;
846 0 : case BackendInterface::StorageType::Char:
847 0 : break;
848 0 : case BackendInterface::StorageType::Float4:
849 : case BackendInterface::StorageType::Float8:
850 0 : return Value(toDouble(field));
851 : break;
852 0 : case BackendInterface::StorageType::Int2:
853 : case BackendInterface::StorageType::Int4:
854 : case BackendInterface::StorageType::Int8:
855 0 : return Value(toInteger(field));
856 : break;
857 0 : case BackendInterface::StorageType::Text:
858 : case BackendInterface::StorageType::VarChar:
859 0 : return Value(toString(field));
860 : break;
861 0 : case BackendInterface::StorageType::Numeric: {
862 0 : stappler::BytesViewNetwork r((const uint8_t *)driver->getValue(result, currentRow, field),
863 0 : driver->getLength(result, currentRow, field));
864 0 : auto str = pg_numeric_to_string(r);
865 :
866 0 : auto v = StringView(str).readDouble();
867 0 : if (v.valid()) {
868 0 : return Value(v.get());
869 : } else {
870 0 : return Value(str);
871 : }
872 : break;
873 0 : }
874 0 : case BackendInterface::StorageType::Bytes:
875 0 : return Value(toBytes(field).bytes<Interface>());
876 : break;
877 : }
878 0 : return Value();
879 : }
880 :
881 3000 : Value ResultCursor::toCustomData(size_t field, const FieldCustom *f) const {
882 3000 : auto info = driver->getCustomFieldInfo(f->getDriverTypeName());
883 3000 : if (!info) {
884 0 : return Value();
885 : }
886 3000 : return info->readFromStorage(*f, *this, field);
887 : }
888 :
889 1175 : int64_t ResultCursor::toId() const {
890 1175 : if (isBinaryFormat(0)) {
891 1175 : stappler::BytesViewNetwork r((const uint8_t *)driver->getValue(result, 0, 0), driver->getLength(result, 0, 0));
892 1175 : switch (r.size()) {
893 0 : case 1: return int64_t(r.readUnsigned()); break;
894 0 : case 2: return int64_t(r.readUnsigned16()); break;
895 25 : case 4: return int64_t(r.readUnsigned32()); break;
896 1150 : case 8: return int64_t(r.readUnsigned64()); break;
897 0 : default: break;
898 : }
899 0 : return 0;
900 : } else {
901 0 : auto val = driver->getValue(result, 0, 0);
902 0 : return stappler::StringToNumber<int64_t>(val, nullptr, 0);
903 : }
904 : }
905 19175 : StringView ResultCursor::getFieldName(size_t field) const {
906 19175 : auto ptr = driver->getName(result, field);
907 19175 : if (ptr) {
908 19175 : return StringView(ptr);
909 : }
910 0 : return StringView();
911 : }
912 11525 : bool ResultCursor::isSuccess() const {
913 11525 : return result.get() && pgsql_is_success(err);
914 : }
915 17125 : bool ResultCursor::isEmpty() const {
916 17125 : return nrows - currentRow <= 0;
917 : }
918 0 : bool ResultCursor::isEnded() const {
919 0 : return currentRow >= nrows;
920 : }
921 30650 : size_t ResultCursor::getFieldsCount() const {
922 30650 : return driver->getNFields(result);
923 : }
924 175 : size_t ResultCursor::getAffectedRows() const {
925 175 : return driver->getCmdTuples(result);
926 : }
927 2550 : size_t ResultCursor::getRowsHint() const {
928 2550 : return nrows;
929 : }
930 6425 : bool ResultCursor::next() {
931 6425 : if (!isEmpty()) {
932 6425 : ++ currentRow;
933 6425 : return !isEmpty();
934 : }
935 0 : return false;
936 : }
937 0 : void ResultCursor::reset() {
938 0 : currentRow = 0;
939 0 : }
940 0 : Value ResultCursor::getInfo() const {
941 : return Value({
942 0 : stappler::pair("error", Value(stappler::toInt(err))),
943 0 : stappler::pair("status", Value(driver->getStatusMessage(err))),
944 0 : stappler::pair("desc", Value(result.get() ? driver->getResultErrorMessage(result) : "Fatal database error")),
945 0 : });
946 : }
947 10375 : void ResultCursor::clear() {
948 10375 : if (result.get()) {
949 5775 : driver->clearResult(result);
950 5775 : result = Driver::Result(nullptr);
951 : }
952 10375 : }
953 :
954 5750 : Driver::Status ResultCursor::getError() const {
955 5750 : return err;
956 : }
957 :
958 : }
959 :
960 : namespace STAPPLER_VERSIONIZED stappler::db::pq {
961 :
962 : /* HTTPD ap_dbd_t mimic */
963 : struct DriverConnectionHandle {
964 : void *connection;
965 : };
966 :
967 : struct DriverExternalHandle {
968 : DriverConnectionHandle *handle;
969 : void *driver;
970 : };
971 :
972 25 : Driver::Handle Driver::doConnect(const char * const *keywords, const char * const *values, int expand_dbname) const {
973 25 : if (_external) {
974 0 : log::error("pq::Driver", "Driver in external mode can not do connection by itself");
975 0 : return Driver::Handle(nullptr);
976 : }
977 :
978 25 : auto p = pool::acquire();
979 25 : auto h = (DriverHandle *)pool::palloc(p, sizeof(DriverHandle));
980 25 : h->pool = p;
981 25 : h->driver = this;
982 25 : h->conn = _handle->PQconnectdbParams(keywords, values, expand_dbname);
983 :
984 25 : if (h->conn) {
985 25 : if (_handle->PQstatus(h->conn) != CONNECTION_OK) {
986 0 : _handle->PQfinish(h->conn);
987 0 : return Driver::Handle(nullptr);
988 : }
989 25 : _handle->PQsetNoticeProcessor(h->conn, Driver_noticeMessage, (void *)this);
990 :
991 25 : pool::cleanup_register(p, [h = (DriverSym *)_handle, ret = h] {
992 25 : if (ret->conn) {
993 25 : h->PQfinish(ret->conn);
994 25 : ret->conn = nullptr;
995 : }
996 25 : });
997 :
998 25 : return Driver::Handle(h);
999 : }
1000 0 : return Driver::Handle(nullptr);
1001 : }
1002 :
1003 450 : Driver::Connection Driver::getConnection(Handle _h) const {
1004 450 : if (_external) {
1005 0 : auto h = (DriverExternalHandle *)_h.get();
1006 0 : if (h->driver == _external) {
1007 0 : return Driver::Connection(h->handle->connection);
1008 : }
1009 : } else {
1010 450 : auto h = (DriverHandle *)_h.get();
1011 450 : return Driver::Connection(h->conn);
1012 : }
1013 0 : return Driver::Connection(nullptr);
1014 : }
1015 :
1016 : }
|