root/release/4/postgresql/trunk/postgresql.scm

Revision 17477, 40.7 KB (checked in by sjamaan, 3 weeks ago)

Implement new COPY API

Line 
1;;; Bindings to the PostgreSQL C library
2;;
3;; Copyright (C) 2008-2009 Peter Bex
4;; Copyright (C) 2004 Johannes Grødem <johs@copyleft.no>
5;; Redistribution and use in source and binary forms, with or without
6;; modification, is permitted.
7;;
8;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS
9;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
10;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
11;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE
12;; LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
13;; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
14;; OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
15;; BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
16;; LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
17;; (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
18;; USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
19;; DAMAGE.
20
21(module postgresql
22 (update-type-parsers! default-type-parsers
23  char-parser bool-parser bytea-parser numeric-parser
24  update-type-unparsers! default-type-unparsers
25  bool-unparser
26 
27  connect reset-connection disconnect connection?
28 
29  query query* with-transaction in-transaction?
30 
31  result? clear-result! row-count column-count
32  column-index column-name column-names column-format
33  column-type column-type-modifier table-oid table-column-index
34  value-at row-values row-alist column-values affected-rows inserted-oid
35
36  invalid-oid
37 
38  escape-string escape-bytea unescape-bytea
39
40  put-copy-data put-copy-end get-copy-data
41 
42  row-fold row-fold* row-fold-right row-fold-right*
43  row-for-each row-for-each* row-map row-map*
44  column-fold column-fold* column-fold-right column-fold-right*
45  column-for-each column-for-each* column-map column-map*
46  copy-query-fold copy-query*-fold copy-query-fold-right copy-query*-fold-right
47  copy-query-for-each copy-query*-for-each copy-query-map copy-query*-map
48  call-with-output-copy-query call-with-output-copy-query*
49  with-output-to-copy-query with-output-to-copy-query*)
50
51(import chicken scheme foreign)
52
53(require-extension srfi-1 srfi-4 srfi-13 srfi-18 srfi-69
54                   extras data-structures ports sql-null)
55
56(foreign-declare "#include <libpq-fe.h>")
57
58(define-foreign-type pg-polling-status (enum "PostgresPollingStatusType"))
59(define-foreign-variable PGRES_POLLING_FAILED pg-polling-status)
60(define-foreign-variable PGRES_POLLING_READING pg-polling-status)
61(define-foreign-variable PGRES_POLLING_WRITING pg-polling-status)
62(define-foreign-variable PGRES_POLLING_OK pg-polling-status)
63
64(define-foreign-type pg-exec-status (enum "ExecStatusType"))
65(define-foreign-variable PGRES_EMPTY_QUERY pg-exec-status)
66(define-foreign-variable PGRES_COMMAND_OK pg-exec-status)
67(define-foreign-variable PGRES_TUPLES_OK pg-exec-status)
68(define-foreign-variable PGRES_COPY_OUT pg-exec-status)
69(define-foreign-variable PGRES_COPY_IN pg-exec-status)
70(define-foreign-variable PGRES_BAD_RESPONSE pg-exec-status)
71(define-foreign-variable PGRES_NONFATAL_ERROR pg-exec-status)
72(define-foreign-variable PGRES_FATAL_ERROR pg-exec-status)
73
74;(define-foreign-type pgconn* (c-pointer "PGconn"))
75(define-foreign-type pgconn* c-pointer)
76
77(define PQconnectStart (foreign-lambda pgconn* PQconnectStart (const c-string)))
78(define PQconnectPoll (foreign-lambda pg-polling-status PQconnectPoll pgconn*))
79(define PQresetStart (foreign-lambda bool PQresetStart pgconn*))
80(define PQresetPoll (foreign-lambda pg-polling-status PQresetPoll pgconn*))
81(define PQfinish (foreign-lambda void PQfinish pgconn*))
82(define PQstatus (foreign-lambda (enum "ConnStatusType") PQstatus (const pgconn*)))
83(define PQerrorMessage (foreign-lambda c-string PQerrorMessage (const pgconn*)))
84
85;(define-foreign-type oid "Oid")
86(define-foreign-type oid unsigned-int)
87
88(define invalid-oid (foreign-value "InvalidOid" oid))
89
90(define PQisBusy (foreign-lambda bool PQisBusy pgconn*))
91(define PQconsumeInput (foreign-lambda bool PQconsumeInput pgconn*))
92
93(define-foreign-type pgresult* (c-pointer "PGresult"))
94
95(define PQgetResult (foreign-lambda pgresult* PQgetResult pgconn*))
96(define PQresultStatus (foreign-lambda pg-exec-status PQresultStatus (const pgresult*)))
97(define PQresultErrorMessage (foreign-lambda c-string PQresultErrorMessage (const pgresult*)))
98(define PQresultErrorField (foreign-lambda c-string PQresultErrorField (const pgresult*) int))
99
100(define PQclear (foreign-lambda void PQclear pgresult*))
101(define PQntuples (foreign-lambda int PQntuples (const pgresult*)))
102(define PQnfields (foreign-lambda int PQnfields (const pgresult*)))
103(define PQfname (foreign-lambda c-string PQfname (const pgresult*) int))
104(define PQfnumber (foreign-lambda int PQfnumber (const pgresult*) (const c-string)))
105(define PQftable (foreign-lambda oid PQftable (const pgresult*) int))
106(define PQftablecol (foreign-lambda int PQftablecol (const pgresult*) int))
107(define PQfformat (foreign-lambda int PQfformat (const pgresult*) int))
108(define PQftype (foreign-lambda oid PQftype (const pgresult*) int))
109(define PQfmod (foreign-lambda int PQfmod (const pgresult*) int))
110(define PQgetisnull (foreign-lambda bool PQgetisnull (const pgresult*) int int))
111(define PQcmdTuples (foreign-lambda nonnull-c-string PQcmdTuples pgresult*))
112(define PQoidValue (foreign-lambda oid PQoidValue pgresult*))
113
114(define PQputCopyData (foreign-lambda int PQputCopyData pgconn* blob int))
115(define PQputCopyEnd (foreign-lambda int PQputCopyEnd pgconn* (const c-string)))
116
117;; TODO: Create a real callback system?
118(foreign-declare "static void nullNoticeReceiver(void *arg, const PGresult *res){ }")
119
120(define-syntax define-foreign-int
121  (er-macro-transformer
122   (lambda (e r c)
123     ;; cannot rename define-foreign-variable; it's a really special form
124    `(define-foreign-variable ,(cadr e) int ,(conc "(int) " (cadr e))))))
125
126(define-foreign-int PG_DIAG_SEVERITY)
127(define-foreign-int PG_DIAG_SQLSTATE)
128(define-foreign-int PG_DIAG_MESSAGE_PRIMARY)
129(define-foreign-int PG_DIAG_MESSAGE_DETAIL)
130(define-foreign-int PG_DIAG_MESSAGE_HINT)
131(define-foreign-int PG_DIAG_STATEMENT_POSITION)
132(define-foreign-int PG_DIAG_CONTEXT)
133(define-foreign-int PG_DIAG_SOURCE_FILE)
134(define-foreign-int PG_DIAG_SOURCE_LINE)
135(define-foreign-int PG_DIAG_SOURCE_FUNCTION)
136
137(define (postgresql-error loc message . args)
138  (signal (make-pg-condition loc message args: args)))
139
140(define (make-pg-condition loc message #!key (args '()) severity
141                           error-class error-code message-detail
142                           message-hint statement-position context
143                           source-file source-line
144                           source-function)
145  (make-composite-condition
146    (make-property-condition
147     'exn 'location loc 'message message 'arguments args)
148    (make-property-condition
149     'postgresql 'severity severity 'error-class error-class
150     'error-code error-code 'message-detail message-detail
151     'message-hint message-hint 'statement-position statement-position
152     'context context 'source-file source-file 'source-line source-line
153     ;; Might break not-terribly-old versions of postgresql
154     ;;'internal-position internal-position 'internal-query internal-query
155     'source-function source-function)))
156
157;;;;;;;;;;;;;;;;;;;;;;;;
158;;;; Type parsers
159;;;;;;;;;;;;;;;;;;;;;;;;
160
161(define (char-parser str) (string-ref str 0))
162
163(define (bool-parser str) (string=? str "t"))
164
165(define (numeric-parser str)
166  (or (string->number str)
167      (postgresql-error 'numeric-parser "Unable to parse number" str)))
168
169(define (bytea-parser str)
170  (blob->u8vector/shared (string->blob (unescape-bytea str))))
171
172(define default-type-parsers
173  (make-parameter
174   `(("text" . ,identity)
175     ("bytea" . ,bytea-parser)
176     ("char" . ,char-parser)
177     ("bpchar" . ,identity)
178     ("bool" . ,bool-parser)
179     ("int8" . ,numeric-parser)
180     ("int4" . ,numeric-parser)
181     ("int2" . ,numeric-parser)
182     ("float4" . ,numeric-parser)
183     ("float8" . ,numeric-parser)
184     ("numeric" . ,numeric-parser)
185     ("oid" . ,numeric-parser))))
186
187;;;;;;;;;;;;;;;;;;;;;;;
188;;;; Type unparsers
189;;;;;;;;;;;;;;;;;;;;;;;
190
191(define (bool-unparser b)
192  (if b "TRUE" "FALSE"))
193
194(define default-type-unparsers
195  (make-parameter
196   `((,string? . ,identity)
197     (,u8vector? . ,u8vector->blob/shared)
198     (,char? . ,string)
199     (,boolean? . ,bool-unparser)
200     (,number? . ,number->string))))
201
202;; Retrieve type-oids from PostgreSQL:
203(define (update-type-parsers! conn #!optional new-type-parsers)
204  (let ((type-parsers (or new-type-parsers (pg-connection-type-parsers conn)))
205        (ht (make-hash-table))
206        (result '()))
207    ;; Set the parsers now, so that we will be retrieving raw data
208    (pg-connection-oid-parsers-set! conn ht)
209    (pg-connection-type-parsers-set! conn type-parsers)
210    (unless (null? type-parsers)   ; empty IN () clause is not allowed
211      (row-for-each*
212       (lambda (oid typname)
213         (and-let* ((procedure (assoc typname type-parsers)))
214           (hash-table-set! ht (string->number oid) (cdr procedure))))
215       (query*
216        conn
217        (conc "SELECT oid, typname FROM pg_type WHERE typname IN "
218              "('" (string-intersperse
219                    (map (lambda (p) (escape-string conn (car p)))
220                         type-parsers) "', '") "')"))))))
221
222(define (update-type-unparsers! conn new-type-unparsers)
223  (pg-connection-type-unparsers-set! conn new-type-unparsers))
224
225;;;;;;;;;;;;;;;;;;;;
226;;;; Connections
227;;;;;;;;;;;;;;;;;;;;
228
229(define-record
230  pg-connection ptr
231  type-parsers oid-parsers type-unparsers
232  transaction-depth)
233(define connection? pg-connection?)
234
235(define (pgsql-connection->fd conn)
236  ((foreign-lambda int PQsocket pgconn*) (pg-connection-ptr conn)))
237
238(define (wait-for-connection! conn poll-function)
239  (let ((conn-fd (pgsql-connection->fd conn))
240        (conn-ptr (pg-connection-ptr conn)))
241    (let loop ((result (poll-function conn-ptr)))
242      (cond ((= result PGRES_POLLING_OK) (void))
243            ((= result PGRES_POLLING_FAILED)
244             (let ((error-message (PQerrorMessage conn-ptr)))
245               (disconnect conn)
246               (postgresql-error 'connect
247                                 (conc "Polling Postgres database failed. "
248                                       error-message) conn)))
249            ((member result (list PGRES_POLLING_WRITING PGRES_POLLING_READING))
250             (thread-wait-for-i/o! conn-fd (if (= PGRES_POLLING_READING result)
251                                               #:input
252                                               #:output))
253             (loop (poll-function conn-ptr)))
254            (else
255             (postgresql-error 'connect (conc "Unknown status code!") conn))))))
256
257(define (alist->connection-spec alist)
258  (string-join
259   (map (lambda (subspec)
260          (sprintf "~A='~A'"
261                   (car subspec) ;; this had better not contain [ =\']
262                   (string-translate* (->string (cdr subspec))
263                                      '(("\\" . "\\\\") ("'" . "\\'")))))
264        alist)))
265
266(define (connect connection-spec
267                 #!optional
268                 (type-parsers (default-type-parsers))
269                 (type-unparsers (default-type-unparsers)))
270  (let* ((connection-spec (if (string? connection-spec)
271                              connection-spec
272                              (alist->connection-spec connection-spec)))
273         (conn-ptr (PQconnectStart connection-spec)))
274    (cond
275     ((not conn-ptr)
276      (postgresql-error 'connect
277                        "Unable to allocate a Postgres connection structure."
278                        connection-spec))
279     ((= (foreign-value "CONNECTION_BAD" int) (PQstatus conn-ptr))
280      (let ((error-message (PQerrorMessage conn-ptr)))
281        (PQfinish conn-ptr)
282        (postgresql-error 'connect
283                          (conc "Connection to Postgres database failed: "
284                                error-message)
285                          connection-spec)))
286     (else
287      (let ((conn (make-pg-connection conn-ptr type-parsers
288                                      (make-hash-table) type-unparsers 0)))
289        ;; We don't want libpq to piss in our stderr stream
290        ((foreign-lambda* void ((pgconn* conn))
291          "PQsetNoticeReceiver(conn, nullNoticeReceiver, NULL);") conn-ptr)
292        (wait-for-connection! conn PQconnectPoll)
293        (set-finalizer! conn disconnect)
294        ;; Retrieve type-information from PostgreSQL metadata for use by
295        ;; the various value-parsers.
296        (update-type-parsers! conn)
297        conn)))))
298
299(define (reset-connection connection)
300  (let ((conn-ptr (pg-connection-ptr connection)))
301    (if (PQresetStart conn-ptr) ;; Update oid-parsers?
302        (wait-for-connection! connection PQresetPoll)
303        (let ((error-message (PQerrorMessage conn-ptr)))
304          (disconnect connection)
305          (postgresql-error 'reset-connection
306                            (conc "Reset of connection failed " error-message)
307                            connection)))))
308
309(define (disconnect connection)
310  (and-let* ((conn-ptr (pg-connection-ptr connection)))
311    (pg-connection-ptr-set! connection #f)
312    (pg-connection-type-parsers-set! connection #f)
313    (pg-connection-oid-parsers-set! connection #f)
314    (PQfinish conn-ptr))
315  (void))
316
317;;;;;;;;;;;;;;;
318;;;; Results
319;;;;;;;;;;;;;;;
320
321(define-record pg-result ptr value-parsers)
322(define result? pg-result?)
323
324(define (clear-result! result)
325  (and-let* ((result-ptr (pg-result-ptr result)))
326    (pg-result-ptr-set! result #f)
327    (PQclear result-ptr)))
328
329(define (row-count result)
330  (PQntuples (pg-result-ptr result)))
331
332(define (column-count result)
333  (PQnfields (pg-result-ptr result)))
334
335;; Helper procedures for bounds checking; so we can distinguish between
336;; out of bounds and nonexistant columns, and signal it.
337(define (check-column-index! result index location)
338  (when (>= index (column-count result))
339    (postgresql-error
340     location (sprintf "Result column ~A out of bounds" index) result index)))
341
342(define (check-row-index! result index location)
343  (when (>= index (row-count result))
344    (postgresql-error
345     location (sprintf "Result row ~A out of bounds" index) result index)))
346
347(define (column-name result index)
348  (check-column-index! result index 'column-name)
349  (string->symbol (PQfname (pg-result-ptr result) index)))
350
351(define (column-names result)
352  (let ((ptr (pg-result-ptr result)))
353   (let loop ((names '())
354              (column (column-count result)))
355     (if (= column 0)
356         names
357         (loop (cons (string->symbol (PQfname ptr (sub1 column))) names)
358               (sub1 column))))))
359
360(define (column-index result name)
361  (let ((idx (PQfnumber (pg-result-ptr result) (symbol->string name))))
362    (and (>= idx 0) idx)))
363
364(define (table-oid result index)
365  (check-column-index! result index 'table-oid)
366  (let ((oid (PQftable (pg-result-ptr result) index)))
367    (and (not (= oid invalid-oid)) oid)))
368
369;; Fixes the off-by-1 unexpectedness in libpq/the protocol to make it more
370;; consistent with the rest of Scheme.  However, this is inconsistent with
371;; almost all other PostgreSQL interfaces...
372(define (table-column-index result index)
373  (check-column-index! result index 'table-column-index)
374  (let ((idx (PQftablecol (pg-result-ptr result) index)))
375    (and (> idx 0) (sub1 idx))))
376
377(define format-table
378  '((0 . text) (1 . binary)))
379
380(define (format->symbol format)
381  (or (alist-ref format format-table eq?)
382      (postgresql-error 'format->symbol "Unknown format" format)))
383
384(define (symbol->format symbol)
385  (or (and-let* ((res (rassoc symbol format-table eq?)))
386        (car res))
387      (postgresql-error 'format->symbol "Unknown format" symbol)))
388
389(define (column-format result index)
390  (check-column-index! result index 'column-format)
391  (format->symbol (PQfformat (pg-result-ptr result) index)))
392
393(define (column-type result index)
394  (check-column-index! result index 'column-type)
395  (PQftype (pg-result-ptr result) index))
396
397;; This is really not super-useful as it requires intimate knowledge
398;; about the internal implementations of types in PostgreSQL.
399(define (column-type-modifier result index)
400  (check-column-index! result index 'column-type)
401  (let ((mod (PQfmod (pg-result-ptr result) index)))
402    (and (>= mod 0) mod)))
403
404;; Unchecked version, for speed
405(define (value-at* result column row #!key raw)
406  (if (PQgetisnull (pg-result-ptr result) row column)
407      (sql-null)
408      (let ((value ((foreign-safe-lambda*
409                     scheme-object ((c-pointer res) (int row) (int col))
410                     "C_word fin, *str; char *val; int len;"
411                     "len = PQgetlength(res, row, col);"
412                     "str = C_alloc(C_bytestowords(len + sizeof(C_header)));"
413                     "val = PQgetvalue(res, row, col);"
414                     "fin = C_string(&str, len, val);"
415                     "if (PQfformat(res, col) == 1) /* binary? */"
416                     "        C_string_to_bytevector(fin);"
417                     "C_return(fin);")
418                    (pg-result-ptr result) row column)))
419        (if (or raw (blob? value))
420            value
421            ((vector-ref (pg-result-value-parsers result) column) value)))))
422
423(define (value-at result #!optional (column 0) (row 0) #!key raw)
424  (check-row-index! result row 'value)
425  (check-column-index! result column 'value)
426  (value-at* result column row raw: raw))
427
428(define (row-values result #!optional (row 0) #!key raw)
429  (check-row-index! result row 'row)
430  (let loop ((list '())
431             (column (column-count result)))
432    (if (= column 0)
433        list
434        (loop (cons (value-at* result (sub1 column) row raw: raw) list)
435              (sub1 column)))))
436
437(define (column-values result #!optional (column 0) #!key raw)
438  (check-column-index! result column 'column)
439  (let loop ((list '())
440             (row (row-count result)))
441    (if (= row 0)
442        list
443        (loop (cons (value-at* result column (sub1 row) raw: raw) list)
444              (sub1 row)))))
445
446;; (define (row-alist result #!optional (row 0) #!key raw)
447;;   (map cons (column-names result) (row-values result row raw: raw)))
448(define (row-alist result #!optional (row 0) #!key raw)
449  (check-row-index! result row 'row-alist)
450  (let loop ((alist '())
451             (column (column-count result)))
452    (if (= column 0)
453        alist
454        (loop (cons (cons (string->symbol
455                           (PQfname (pg-result-ptr result) (sub1 column)))
456                          (value-at* result (sub1 column) row raw: raw)) alist)
457              (sub1 column)))))
458
459;;; TODO: Do we want/need PQnparams and PQparamtype bindings?
460
461(define (affected-rows result)
462  (string->number (PQcmdTuples (pg-result-ptr result))))
463
464(define (inserted-oid result)
465  (let ((oid (PQoidValue (pg-result-ptr result))))
466    (and (not (= oid invalid-oid)) oid)))
467
468
469;;;;;;;;;;;;;;;;;;;;;;;;
470;;;; Query procedures
471;;;;;;;;;;;;;;;;;;;;;;;;
472
473;; Buffer all available input, yielding if nothing is available:
474(define (buffer-available-input! conn)
475  (let ((conn-ptr (pg-connection-ptr conn))
476        (conn-fd (pgsql-connection->fd conn)))
477    (let loop ()
478      (if (PQconsumeInput conn-ptr)
479          (when (PQisBusy conn-ptr)
480            (thread-wait-for-i/o! conn-fd #:input)
481            (loop))
482          (postgresql-error 'buffer-available-input!
483                            (conc "Error reading reply from server. "
484                                  (PQerrorMessage conn-ptr))
485                            conn)))))
486
487(define (make-value-parsers conn pqresult)
488  (let ((nfields (PQnfields pqresult)))
489    (do ([col 0 (+ col 1)]
490         [parsers (make-vector nfields)])
491        ([= col nfields] parsers)
492      (vector-set! parsers col
493                   (hash-table-ref (pg-connection-oid-parsers conn)
494                                   (PQftype pqresult col)
495                                   (lambda () identity))))))
496
497;; Collect the result pointers from the last query.
498;;
499;; A pgresult represents an entire resultset and is always read into memory
500;; all at once.
501(define (get-last-result conn)
502  (buffer-available-input! conn)
503  (let* ((conn-ptr (pg-connection-ptr conn))
504         ;; Read out all remaining results (including the current one).
505         ;; TODO: Is this really needed? libpq does it (in pqExecFinish),
506         ;; but ostensibly only to concatenate the error messages for
507         ;; each query.  OTOH, maybe we want to do that, too.
508         (clean-results! (lambda (result)
509                           (let loop ((result result))
510                             (when result
511                               (PQclear result)
512                               (loop (PQgetResult conn-ptr))))))
513         (result (PQgetResult conn-ptr))
514         (status (PQresultStatus result)))
515    (cond
516     ((not result) (postgresql-error
517                    'get-last-result
518                    "Internal error! No result object available from server"
519                    conn))
520     ((member status (list PGRES_BAD_RESPONSE PGRES_FATAL_ERROR
521                           PGRES_NONFATAL_ERROR))
522      (let* ((get-error-field (lambda (d) (PQresultErrorField result d)))
523             (sqlstate (get-error-field PG_DIAG_SQLSTATE))
524             (maybe-severity (get-error-field PG_DIAG_SEVERITY))
525             (maybe-statement-position
526              (get-error-field PG_DIAG_STATEMENT_POSITION))
527             (condition
528              (make-pg-condition
529               'get-last-result
530               (PQresultErrorMessage result)
531               severity:           (and maybe-severity
532                                        (string->symbol
533                                         (string-downcase maybe-severity)))
534               error-class:        (and sqlstate (string-take sqlstate 2))
535               error-code:         sqlstate
536               message-detail:     (get-error-field PG_DIAG_MESSAGE_DETAIL)
537               message-hint:       (get-error-field PG_DIAG_MESSAGE_HINT)
538               statement-position: (and maybe-statement-position
539                                        (string->number
540                                         maybe-statement-position))
541               context:            (get-error-field PG_DIAG_CONTEXT)
542               source-file:        (get-error-field PG_DIAG_SOURCE_FILE)
543               source-line:        (get-error-field PG_DIAG_SOURCE_LINE)
544               source-function:    (get-error-field PG_DIAG_SOURCE_FUNCTION))))
545        (clean-results! result)
546        (signal condition)))
547     ((member status (list PGRES_COPY_OUT PGRES_COPY_IN))
548      ;; These are weird; A COPY puts the connection in "copy mode".
549      ;; As long as it's in "copy mode", pqgetresult will return the
550      ;; same result every time you call it, so don't try to call
551      ;; clean-results!
552      (let ((result-obj (make-pg-result result #f)))
553        (set-finalizer! result-obj clear-result!)
554        result-obj))
555     ((member status (list PGRES_EMPTY_QUERY PGRES_COMMAND_OK
556                           PGRES_TUPLES_OK))
557      (let ((result-obj (make-pg-result result
558                                        (make-value-parsers conn result))))
559        (set-finalizer! result-obj clear-result!)
560        (let ((trailing-result (PQgetResult conn-ptr)))
561          (when trailing-result
562            (clean-results! trailing-result)
563            (postgresql-error 'get-last-result
564                              (conc "Internal error! Unexpected extra "
565                                    "results after first query result")
566                              conn)))
567        result-obj))
568     (else (postgresql-error 'get-last-result
569                             (conc "Internal error! Unknown status code: "
570                                   status)
571                             conn)))))
572
573(define (query conn query . params)
574  (query* conn query params))
575
576(define (query* conn query #!optional (params '()) #!key (format 'text) raw)
577  (let* ((unparsers (pg-connection-type-unparsers conn))
578         (unparse (lambda (x)
579                    (cond ((find (lambda (parse?)
580                                   ((car parse?) x))
581                                 unparsers) => (lambda (parse)
582                                                 ((cdr parse) x)))
583                          (else x))))
584         (params ;; Check all params and ensure they are proper pairs
585          (map   ;; See if this can be moved into C
586           (lambda (p)
587             (let ((obj (if raw p (unparse p))))
588               (when (and (not (string? obj))
589                          (not (blob? obj))
590                          (not (sql-null? obj)))
591                 (postgresql-error
592                  'query*
593                  (sprintf "Param value is not a string, sql-null or blob: ~S" p)
594                  conn query params format))
595               (if (sql-null? obj) #f obj))) params))
596         (send-query
597          (foreign-lambda*
598           bool ((pgconn* conn) (nonnull-c-string query)
599                 (int num) (scheme-object params) (int resfmt))
600           "int res = 0, i = 0, *lens = NULL;"
601           "char **vals = NULL;"
602           "int *fmts = NULL;"
603           "C_word obj, cons;"
604           "if (num > 0) {"
605           "    vals = C_malloc(num * sizeof(char *));"
606           "    lens = C_malloc(num * sizeof(int));"
607           "    fmts = C_malloc(num * sizeof(int));"
608           "}"
609           "for (i=0,cons=params; i < num; ++i,cons=C_u_i_cdr(cons)) {"
610           "    obj = C_u_i_car(cons);"
611           "    if (obj == C_SCHEME_FALSE) {"
612           "        fmts[i] = 0; /* don't care */"
613           "        lens[i] = 0;"
614           "        vals[i] = NULL;"
615           "    } else if (C_header_bits(obj) == C_BYTEVECTOR_TYPE) {"
616           "        fmts[i] = 1; /* binary */"
617           "        lens[i] = C_header_size(obj);"
618           "        vals[i] = C_c_string(obj);"
619           "    } else {"
620           "        /* text needs to be copied; it expects ASCIIZ */"
621           "        fmts[i] = 0; /* text */"
622           "        lens[i] = C_header_size(obj);"
623           "        vals[i] = malloc(lens[i] + 1);"
624           "        memcpy(vals[i], C_c_string(obj), lens[i]);"
625           "        vals[i][lens[i]] = '\\0';"
626           "    }"
627           "}"
628           "res = PQsendQueryParams((PGconn *)conn, query, num, NULL,"
629           "                        (const char * const *)vals, lens, fmts, resfmt);"
630           "for (i=0,cons=params; i < num; ++i,cons=C_u_i_cdr(cons)) {"
631           "    obj = C_u_i_car(cons);"
632           "    if (!C_immediatep(obj) && C_header_bits(obj) == C_STRING_TYPE)"
633           "        free(vals[i]); /* Clear copied strings only */"
634           "}"
635           "if (num > 0) {"
636           "    free(fmts);"
637           "    free(lens);"
638           "    free(vals);"
639           "}"
640           "C_return(res);")))
641   (if (send-query (pg-connection-ptr conn) query
642                   (length params) params (symbol->format format))
643       (get-last-result conn)
644       (postgresql-error 'query*
645                         (conc "Unable to send query to server. "
646                               (PQerrorMessage (pg-connection-ptr conn)))
647                         conn query params format))))
648
649;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
650;;;; Transaction management
651;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
652
653(define (with-transaction conn thunk)
654  (let* ((old-depth (pg-connection-transaction-depth conn))
655         (rollback!
656          (lambda ()
657            (if (= old-depth 0)
658                (query conn "ROLLBACK")
659                ;; We do not *need* to give savepoints unique names,
660                ;; but it aids debugging and we know the depth anyway.
661                (query conn (conc "ROLLBACK TO SAVEPOINT s_" old-depth)))))
662         (commit!
663          (lambda ()
664            (if (= old-depth 0)
665                (query conn "COMMIT")
666                (query conn (conc "RELEASE SAVEPOINT s_" old-depth))))))
667    (if (= old-depth 0)
668        (query conn "BEGIN")
669        (query conn (conc "SAVEPOINT s_" old-depth)))
670    (pg-connection-transaction-depth-set! conn (add1 old-depth))
671    ;; TODO: Add a warning mechanism (using dynamic-wind) for when the
672    ;; user tries to jump into/out of transactions with continuations?
673    (handle-exceptions exn
674      (begin
675        (pg-connection-transaction-depth-set! conn old-depth)
676        (rollback!)
677        (raise exn))
678      (let ((res (thunk)))
679        (pg-connection-transaction-depth-set! conn old-depth)
680        (if res (commit!) (rollback!))
681        res))))
682
683(define (in-transaction? conn)
684  (> (pg-connection-transaction-depth conn) 0))
685
686;;;;;;;;;;;;;;;;;;;;
687;;;; COPY support
688;;;;;;;;;;;;;;;;;;;;
689
690(define (put-copy-data conn data)
691  (let* ((data (cond
692                ((blob? data) data)
693                ((string? data) (string->blob data))
694                ((u8vector? data) (u8vector->blob/shared data))
695                (else (postgresql-error
696                       'put-copy-data "Expected a blob, string or u8vector"
697                       conn data))))
698         (len (blob-size data))
699         (conn-ptr (pg-connection-ptr conn))
700         (conn-fd (pgsql-connection->fd conn)))
701    (let loop ((res (PQputCopyData conn-ptr data len)))
702      (cond
703       ((= res 0)
704        (thread-wait-for-i/o! conn-fd #:output)
705        (loop (PQputCopyData conn-ptr data len)))
706       ((= res 1) (void))
707       ((= res -1) (postgresql-error 'put-copy-data
708                                     (conc "Error putting COPY data. "
709                                           (PQerrorMessage conn-ptr))
710                                     conn))
711       (else (postgresql-error
712              'put-copy-data
713              (conc "Internal error! Unexpected return value: " res)
714              conn))))))
715
716(define (put-copy-end conn #!optional error-message)
717  (let ((conn-ptr (pg-connection-ptr conn))
718        (conn-fd (pgsql-connection->fd conn)))
719   (let loop ((res (PQputCopyEnd conn-ptr error-message)))
720     (cond
721      ((= res 0)
722       (thread-wait-for-i/o! conn-fd #:output)
723       (loop (PQputCopyEnd conn-ptr error-message)))
724      ((= res 1) (get-last-result conn))
725      ((= res -1) (postgresql-error 'put-copy-end
726                                    (conc "Error ending put COPY data. "
727                                          (PQerrorMessage conn-ptr))
728                                    conn error-message))
729      (else
730       (postgresql-error 'put-copy-end
731                         (conc "Internal error! Unexpected return value: " res)
732                         conn))))))
733
734(define (get-copy-data conn #!key (format 'text))
735  (let* ((conn-ptr (pg-connection-ptr conn))
736         (conn-fd (pgsql-connection->fd conn)))
737    (let loop ()
738      (let-location ((res int))
739        (let ((data ((foreign-safe-lambda*
740                      scheme-object ((pgconn* conn) ((c-pointer int) res)
741                                     (int format))
742                      "C_word fin = C_SCHEME_FALSE, *str; char *buf; "
743                      "*res = PQgetCopyData(conn, &buf, 1); "
744                      "if (buf != NULL) { /* res is length */ "
745                      "  str = C_alloc(C_bytestowords(*res + sizeof(C_header))); "
746                      "  fin = C_string(&str, *res, buf); "
747                      "  if (format == 1) "
748                      "    C_string_to_bytevector(fin);"
749                      "  PQfreemem(buf); "
750                      "} "
751                      "C_return(fin);")
752                     conn-ptr (location res) (symbol->format format))))
753          (cond
754           ((> res 0) data)
755           ((= res 0) (thread-wait-for-i/o! conn-fd #:input) (loop))
756           ((= res -1) (get-last-result conn))
757           ((= res -2) (postgresql-error 'put-copy-data
758                                         (conc "Error putting COPY data. "
759                                               (PQerrorMessage conn-ptr))
760                                         conn))
761           (else (postgresql-error
762                  'get-copy-data
763                  (conc "Internal error! Unexpected return value: " res)
764                  conn))))))))
765
766;;;;;;;;;;;;;;;;;;;;;;
767;;;; Value escaping
768;;;;;;;;;;;;;;;;;;;;;;
769
770(define (escape-string conn str)
771  (define %escape-string-conn
772    ;; This could be more efficient by copying straight into a Scheme object.
773    ;; Now it's being copied by PQescapeStringConn, and Chicken copies it again.
774    ;; This can allocate up to twice as much memory than the string actually
775    ;; uses; in extreme cases this could be a problem.
776    (foreign-lambda* c-string* ((pgconn* conn) (c-string from) (int flen))
777                     "int err = 0; char *to;"
778                     "to = malloc(sizeof(char) * (flen * 2 + 1));"
779                     "PQescapeStringConn((PGconn *)conn, to, from, (size_t)flen, &err);"
780                     "if (err) {"
781                     "        free(to);"
782                     "        C_return(NULL);"
783                     "}"
784                     "C_return(to);"))
785  (or (%escape-string-conn (pg-connection-ptr conn) str (string-length str))
786      (postgresql-error 'escape-string
787                        (conc "String escaping failed. "
788                              (PQerrorMessage conn)) conn str)))
789
790(define (escape-bytea conn str)
791  (define %escape-bytea-conn
792    ;; This must copy because libpq returns a malloced ptr...
793    (foreign-safe-lambda* scheme-object ((pgconn* conn)
794                                         ;; not copied/NUL interpreted:
795                                         ((const unsigned-c-string*) from)
796                                         (int flen))
797                     "size_t tolen=0; C_word res, *fin; unsigned char *esc;"
798                     "esc = PQescapeByteaConn((PGconn *)conn, from, (size_t)flen, &tolen);"
799                     "if (esc == NULL)"
800                     "    C_return(C_SCHEME_FALSE);"
801                     "fin = C_alloc(C_bytestowords(tolen + sizeof(C_header)));"
802                     "/* tolen includes the resulting NUL byte */"
803                     "res = C_string(&fin, tolen - 1, (char *)esc);"
804                     "PQfreemem(esc);"
805                     "C_return(res);"))
806  (or (%escape-bytea-conn (pg-connection-ptr conn) str (string-length str))
807      (postgresql-error 'escape-bytea
808                        (conc "Byte array escaping failed. "
809                              (PQerrorMessage conn)) conn str)))
810
811(define (unescape-bytea str)
812  (define %unescape-bytea
813    ;; This must copy because libpq returns a malloced ptr...
814    (foreign-safe-lambda* scheme-object (((const unsigned-c-string*) from))
815                     "size_t tolen=0; C_word res, *fin; unsigned char *unesc;"
816                     "unesc = PQunescapeBytea(from, &tolen);"
817                     "if (unesc == NULL)"
818                     "    C_return(C_SCHEME_FALSE);"
819                     "fin = C_alloc(C_bytestowords(tolen + sizeof(C_header)));"
820                     "res = C_string(&fin, tolen, (char *)unesc);"
821                     "PQfreemem(unesc);"
822                     "C_return(res);"
823                     ))
824  (or (%unescape-bytea str)
825      (postgresql-error 'unescape-bytea
826                        "Byte array unescaping failed (out of memory?)" str)))
827
828
829;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
830;;;; High-level interface
831;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
832
833(define (make-result-fold item-count extract-item)
834  (lambda (kons knil result)
835   (let ((items (item-count result)))
836     (let loop ((seed knil)
837                (item 0))
838       (if (= item items)
839           seed
840           (loop (kons (extract-item result item) seed) (add1 item)))))))
841
842(define row-fold (make-result-fold row-count row-values))
843(define (row-fold* kons knil result)
844  (row-fold (lambda (values seed)
845              (apply kons (append values (list seed)))) knil result))
846
847(define column-fold (make-result-fold column-count column-values))
848(define (column-fold* kons knil result)
849  (column-fold (lambda (values seed)
850                 (apply kons (append values (list seed)))) knil result))
851
852
853(define (make-result-fold-right item-count extract-item)
854  (lambda (kons knil result)
855    (let loop ((seed knil)
856               (item (item-count result)))
857      (if (= item 0)
858          seed
859          (loop (kons (extract-item result (sub1 item)) seed) (sub1 item))))))
860
861(define row-fold-right (make-result-fold-right row-count row-values))
862(define (row-fold-right* kons knil result)
863  (row-fold-right (lambda (values seed)
864                    (apply kons (append values (list seed)))) knil result))
865
866(define column-fold-right (make-result-fold-right column-count column-values))
867(define (column-fold-right* kons knil result)
868  (column-fold-right (lambda (values seed)
869                       (apply kons (append values (list seed)))) knil result))
870
871
872(define (row-for-each proc result)
873  (row-fold (lambda (values seed) (proc values)) #f result)
874  (void))
875(define (row-for-each* proc result)
876  (row-fold (lambda (values seed) (apply proc values)) #f result)
877  (void))
878
879(define (column-for-each proc result)
880  (column-fold (lambda (values seed) (proc values)) #f result)
881  (void))
882(define (column-for-each* proc result)
883  (column-fold (lambda (values seed) (apply proc values)) #f result)
884  (void))
885
886;; Like regular Scheme map, the order in which the procedure is applied is
887;; undefined.  We make good use of that by traversing the resultset from
888;; the end back to the beginning, thereby avoiding a reverse! on the result.
889(define (row-map proc res)
890  (row-fold-right (lambda (row lst) (cons (proc row) lst)) '() res))
891(define (row-map* proc res)
892  (row-fold-right (lambda (row lst) (cons (apply proc row) lst)) '() res))
893(define (column-map proc res)
894  (column-fold-right (lambda (col lst) (cons (proc col) lst)) '() res))
895(define (column-map* proc res)
896  (column-fold-right (lambda (col lst) (cons (apply proc col) lst)) '() res))
897
898(define (result-format result)
899  (if (and result ((foreign-lambda bool PQbinaryTuples pgresult*)
900                   (pg-result-ptr result)))
901     'binary 'text))
902
903(define (copy-query*-fold kons knil conn query
904                          #!optional (params '()) #!key (format 'text) raw)
905  (let* ((result (query* conn query params format: format raw: raw))
906         (data-format (result-format result)))
907    (handle-exceptions exn
908      (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup)))
909      (let loop ((data (get-copy-data conn format: data-format))
910                 (seed knil))
911        (if (result? data)
912            seed
913            ;; Explicit ordering; data could be _very_ big, allow one to be GCed
914            (let ((next (kons data seed)))
915              (loop (get-copy-data conn format: data-format) next)))))))
916
917(define (copy-query-fold kons knil conn query . params)
918  (copy-query*-fold kons knil conn query params))
919
920
921;; This is slow and memory-intensive if data is big. Provided for completeness
922(define (copy-query*-fold-right kons knil conn query
923                                #!optional (params '()) #!key (format 'text) raw)
924  (let* ((result (query* conn query params format: format raw: raw))
925         (data-format (result-format result)))
926    (handle-exceptions exn
927      (let cleanup () (if (result? (get-copy-data conn)) (raise exn) (cleanup)))
928      (let loop ((data (get-copy-data conn format: data-format)))
929        (if (result? data)
930            knil
931            (kons data (loop (get-copy-data conn format: data-format))))))))
932
933(define (copy-query-fold-right kons knil conn query . params)
934  (copy-query*-fold-right kons knil conn query params))
935
936
937(define (copy-query*-map proc conn query
938                         #!optional (params '()) #!key (format 'text) raw)
939  (reverse! (copy-query*-fold (lambda (data seed) (cons (proc data) seed))
940                              '() conn query params format: format raw: raw)))
941
942(define (copy-query-map proc conn query . params)
943  (copy-query*-map proc conn query params))
944
945
946(define (copy-query*-for-each proc conn query
947                              #!optional (params '()) #!key (format 'text) raw)
948  (copy-query*-fold (lambda (data seed) (proc data))
949                    #f conn query params format: format raw: raw)
950  (void))
951
952(define (copy-query-for-each proc conn query . params)
953  (copy-query*-for-each proc conn query params))
954
955;; A bit of a weird name but consistent
956(define (call-with-output-copy-query*
957         proc conn query #!optional (params '()) #!key (format 'text) raw)
958  (query* conn query params format: format raw: raw)
959  (let* ((closed? #f)
960         (output-port (make-output-port
961                       (lambda (data) (put-copy-data conn data))
962                       (lambda () (put-copy-end conn) (set! closed? #t)))))
963    (handle-exceptions exn
964      (if closed?
965          (raise exn)
966          (handle-exceptions _
967            (raise exn)
968            ;; Previously written data will be discarded to guarantee atomicity
969            (put-copy-end conn "Chicken PostgreSQL egg -- forcing error")))
970      (call-with-values (lambda () (proc output-port))
971        (lambda args
972          (unless closed? (put-copy-end conn))
973          (apply values args))))))
974
975(define (call-with-output-copy-query proc conn query . params)
976  (call-with-output-copy-query* proc conn query params))
977
978(define (with-output-to-copy-query*
979         thunk conn query #!optional (params '()) #!key (format 'text) raw)
980  (call-with-output-copy-query* (lambda (x) (with-output-to-port x thunk))
981                                conn query params format: format raw: raw))
982
983(define (with-output-to-copy-query thunk conn query . params)
984  (with-output-to-copy-query* thunk conn query params))
985
986)
Note: See TracBrowser for help on using the browser.