diff --git a/collects/db/TODO b/collects/db/TODO index 2a282d5855..355b23459f 100644 --- a/collects/db/TODO +++ b/collects/db/TODO @@ -62,10 +62,6 @@ Misc - SQL_CURSOR_{COMMIT,ROLLBACK}_BEHAVIOR - check that commit/rollback doesn't delete pstmts! - SQL_MAX_{CATALOG,COLUMN,IDENTIFIER,SCHEMA_NAME,TABLE_NAME}_NAME (min 128) -- cursors? (Olin thinks cursors are important. I'm not sure.) - - how do people want to use cursors? - - how about implicit support only in 'in-query'? - - add evt versions of functions - for query functions (?) - connection-pool-lease-evt diff --git a/collects/db/private/generic/connect-util.rkt b/collects/db/private/generic/connect-util.rkt index a4f89af078..372f0dd2d4 100644 --- a/collects/db/private/generic/connect-util.rkt +++ b/collects/db/private/generic/connect-util.rkt @@ -63,8 +63,9 @@ (connected?) (disconnect) (get-dbsystem) - (query fsym stmt) + (query fsym stmt cursor?) (prepare fsym stmt close-on-exec?) + (fetch/cursor fsym cursor fetch-size) (get-base) (free-statement stmt need-lock?) (transaction-status fsym) @@ -177,7 +178,8 @@ (define-forward (#f #f (connected?)) (#t '_ (get-dbsystem)) - (#t '_ (query fsym stmt)) + (#t '_ (query fsym stmt cursor?)) + (#t '_ (fetch/cursor fsym stmt fetch-size)) (#t '_ (start-transaction fsym isolation cwt?)) (#f (void) (end-transaction fsym mode cwt?)) (#f #f (transaction-status fsym)) @@ -336,8 +338,9 @@ (define-forward define/public (get-dbsystem) - (query fsym stmt) + (query fsym stmt cursor?) (prepare fsym stmt close-on-exec?) + (fetch/cursor fsym stmt fetch-size) (get-base) (free-statement stmt need-lock?) (transaction-status fsym) diff --git a/collects/db/private/generic/functions.rkt b/collects/db/private/generic/functions.rkt index 6d70b4f304..96b1aabc19 100644 --- a/collects/db/private/generic/functions.rkt +++ b/collects/db/private/generic/functions.rkt @@ -2,6 +2,7 @@ (require (for-syntax racket/base) racket/vector racket/class + racket/promise "interfaces.rkt" (only-in "sql-data.rkt" sql-null sql-null?)) (provide (all-defined-out)) @@ -73,7 +74,7 @@ ;; query1 : connection symbol Statement -> QueryResult (define (query1 c fsym stmt) - (send c query fsym stmt)) + (send c query fsym stmt #f)) ;; query/rows : connection symbol Statement nat/#f -> rows-result (define (query/rows c fsym sql want-columns) @@ -86,6 +87,17 @@ got-columns (if (= got-columns 1) "column" "columns") want-columns sql))) result)) +(define (query/cursor c fsym sql want-columns) + (let ([result (send c query fsym sql #t)]) + (unless (cursor-result? result) + (uerror fsym "query did not return cursor: ~e" sql)) + (let ([got-columns (length (cursor-result-headers result))]) + (when (and want-columns (not (= got-columns want-columns))) + (uerror fsym "query returned ~a ~a (expected ~a): ~e" + got-columns (if (= got-columns 1) "column" "columns") + want-columns sql))) + result)) + (define (rows-result->row fsym rs sql maybe-row? one-column?) (define rows (rows-result-rows rs)) (cond [(null? rows) @@ -192,16 +204,8 @@ ;; ======================================== -(define (in-query c stmt . args) - (let ([rows (in-query-helper #f c stmt args)]) - (make-do-sequence - (lambda () - (values (lambda (p) (vector->values (car p))) - cdr - rows - pair? - (lambda _ #t) - (lambda _ #t)))))) +(define (in-query c stmt #:fetch [fetch-size +inf.0] . args) + (apply in-query-helper #f c stmt #:fetch fetch-size args)) (define-sequence-syntax in-query* (lambda () #'in-query) @@ -209,25 +213,55 @@ (syntax-case stx () [[(var ...) (in-query c stmt arg ...)] #'[(var ...) - (:do-in ([(rows) (in-query-helper (length '(var ...)) c stmt (list arg ...))]) - (void) ;; outer check - ([rows rows]) ;; loop inits - (pair? rows) ;; pos guard - ([(var ...) (vector->values (car rows))]) ;; inner bindings - #t ;; pre guard - #t ;; post guard - ((cdr rows)))]] ;; loop args + (in-query-helper (length '(var ...)) c stmt arg ...)]] [_ #f]))) -(define (in-query-helper vars c stmt args) +(define (in-query-helper vars c stmt + #:fetch [fetch-size +inf.0] + . args) ;; Not protected by contract (unless (connection? c) (apply raise-type-error 'in-query "connection" 0 c stmt args)) (unless (statement? stmt) (apply raise-type-error 'in-query "statement" 1 c stmt args)) + (unless (or (exact-positive-integer? fetch-size) (eqv? fetch-size +inf.0)) + (raise-type-error 'in-query "positive integer or +inf.0" fetch-size)) (let* ([check (or vars 'rows)] [stmt (compose-statement 'in-query c stmt args check)]) - (rows-result-rows (query/rows c 'in-query stmt vars)))) + (cond [(eqv? fetch-size +inf.0) + (in-list/vector->values + (rows-result-rows + (query/rows c 'in-query stmt vars)))] + [else + (let ([cursor (query/cursor c 'in-query stmt vars)]) + (in-list-generator/vector->values + (lambda () (send c fetch/cursor 'in-query cursor fetch-size))))]))) + +(define (in-list/vector->values vs) + (make-do-sequence + (lambda () + (values (lambda (p) (vector->values (car p))) + cdr + vs + pair? #f #f)))) + +(define (in-list-generator/vector->values fetch-proc) + ;; fetch-proc : symbol nat -> (U list #f) + ;; state = #f | (cons vector (U state (promise-of state))) + + ;; more-promise : -> (promise-of state) + (define (more-promise) + (delay (let ([more (fetch-proc)]) + ;; note: improper append, list onto promise + (and more (append more (more-promise)))))) + + (make-do-sequence + (lambda () + (values (lambda (p) (vector->values (car p))) + (lambda (p) + (let ([next (cdr p)]) (if (promise? next) (force next) next))) + (force (more-promise)) + pair? #f #f)))) ;; ======================================== diff --git a/collects/db/private/generic/interfaces.rkt b/collects/db/private/generic/interfaces.rkt index 777965db90..97ca96f562 100644 --- a/collects/db/private/generic/interfaces.rkt +++ b/collects/db/private/generic/interfaces.rkt @@ -12,6 +12,7 @@ (struct-out simple-result) (struct-out rows-result) + (struct-out cursor-result) init-private @@ -33,6 +34,7 @@ get-dbsystem ;; -> dbsystem<%> query ;; symbol statement -> QueryResult prepare ;; symbol preparable boolean -> prepared-statement<%> + fetch/cursor ;; symbol cursor nat -> #f or (listof vector) get-base ;; -> connection<%> or #f (#f means base isn't fixed) list-tables ;; symbol symbol -> (listof string) @@ -71,6 +73,9 @@ get-close-on-exec? ;; -> boolean after-exec ;; boolean -> void (for close-on-exec) + get-stmt ;; -> string/#f + get-stmt-type ;; -> symbol/#f + get-param-count ;; -> nat or #f get-param-typeids ;; -> (listof typeid) @@ -110,6 +115,10 @@ (struct simple-result (info) #:transparent) (struct rows-result (headers rows) #:transparent) +;; A cursor-result is +;; - (cursor-result Header prepared-statement ???) +(struct cursor-result (headers pst extra)) + ;; A Header is (listof FieldInfo) ;; A FieldInfo is an alist, contents dbsys-dependent diff --git a/collects/db/private/generic/place-client.rkt b/collects/db/private/generic/place-client.rkt index 292c0f0277..68c60e16cd 100644 --- a/collects/db/private/generic/place-client.rkt +++ b/collects/db/private/generic/place-client.rkt @@ -70,14 +70,17 @@ (define/public (get-dbsystem) (error 'get-dbsystem "not implemented")) (define/public (get-base) this) - (define/public (query fsym stmt) + (define/public (query fsym stmt cursor?) (call 'query fsym (match stmt [(? string?) (list 'string stmt)] [(statement-binding pst params) - (list 'statement-binding (send pst get-handle) params)]))) + (list 'statement-binding (send pst get-handle) params)]) + cursor?)) (define/public (prepare fsym stmt close-on-exec?) (call 'prepare fsym stmt close-on-exec?)) + (define/public (fetch/cursor fsym cursor fetch-size) + (call 'fetch/cursor fsym (cursor-result-extra cursor) fetch-size)) (define/public (transaction-status fsym) (call 'transaction-status fsym)) (define/public (start-transaction fsym iso cwt?) @@ -101,6 +104,8 @@ (simple-result y)] [(list 'rows-result h rows) (rows-result h rows)] + [(list 'cursor-result info handle) + (cursor-result info #f handle)] [(list 'prepared-statement handle close-on-exec? param-typeids result-dvecs) (new prepared-statement% (handle handle) diff --git a/collects/db/private/generic/place-server.rkt b/collects/db/private/generic/place-server.rkt index 6c172edb40..cc9bce68df 100644 --- a/collects/db/private/generic/place-server.rkt +++ b/collects/db/private/generic/place-server.rkt @@ -78,8 +78,9 @@ server -> client: (or (list 'values result ...) channel) (super-new) - (define pstmt-table (make-hash)) ;; int => prepared-statement - (define pstmt-counter 0) + ;; FIXME: need to collect cursors, too + (define table (make-hash)) ;; int => prepared-statement/cursor-result + (define counter 0) (define/public (serve) (serve1) @@ -96,10 +97,12 @@ server -> client: (or (list 'values result ...) (send connection disconnect) (set! connection #f)] [(list 'free-statement pstmt-index need-lock?) - (send connection free-statement (hash-ref pstmt-table pstmt-index) need-lock?) - (hash-remove! pstmt-table pstmt-index)] - [(list 'query fsym stmt) - (send connection query fsym (sexpr->statement stmt))] + (send connection free-statement (hash-ref table pstmt-index) need-lock?) + (hash-remove! table pstmt-index)] + [(list 'query fsym stmt cursor?) + (send connection query fsym (sexpr->statement stmt) cursor?)] + [(list 'fetch/cursor fsym cursor-index fetch-size) + (send connection fetch/cursor fsym (hash-ref table cursor-index) fetch-size)] [msg (define-syntax-rule (forward-methods (method arg ...) ...) (match msg @@ -120,7 +123,7 @@ server -> client: (or (list 'values result ...) (match x [(list 'string s) s] [(list 'statement-binding pstmt-index args) - (statement-binding (hash-ref pstmt-table pstmt-index) args)])) + (statement-binding (hash-ref table pstmt-index) args)])) (define/private (result->sexpr x) (match x @@ -128,12 +131,16 @@ server -> client: (or (list 'values result ...) (list 'simple-result y)] [(rows-result h rows) (list 'rows-result h rows)] + [(cursor-result h pst extra) + (let ([index (begin (set! counter (add1 counter)) counter)]) + (hash-set! table index x) + (list 'cursor-result h index))] ;; FIXME: Assumes prepared-statement is concrete class, not interface. [(? (lambda (x) (is-a? x prepared-statement%))) - (let ([pstmt-index (begin (set! pstmt-counter (add1 pstmt-counter)) pstmt-counter)]) - (hash-set! pstmt-table pstmt-index x) + (let ([index (begin (set! counter (add1 counter)) counter)]) + (hash-set! table index x) (list 'prepared-statement - pstmt-index + index (get-field close-on-exec? x) (get-field param-typeids x) (get-field result-dvecs x)))] diff --git a/collects/db/private/generic/prepared.rkt b/collects/db/private/generic/prepared.rkt index 0281d3a8be..46eadeb6ea 100644 --- a/collects/db/private/generic/prepared.rkt +++ b/collects/db/private/generic/prepared.rkt @@ -14,6 +14,7 @@ close-on-exec? ;; boolean param-typeids ;; (listof typeid) result-dvecs ;; (listof vector), layout depends on dbsys + [stmt #f] ;; string/#f [stmt-type #f]) ;; usually symbol or #f (see classify-*-sql) (define owner (make-weak-box -owner)) @@ -30,6 +31,9 @@ (when close-on-exec? ;; indicates ad-hoc prepared statement (finalize need-lock?))) + (define/public (get-stmt) stmt) + (define/public (get-stmt-type) stmt-type) + (define/public (get-param-count) (length param-typeids)) (define/public (get-param-typeids) param-typeids) @@ -42,8 +46,6 @@ (define/public (get-result-types) (send dbsystem describe-typeids result-typeids)) - (define/public (get-stmt-type) stmt-type) - ;; checktype is either #f, 'rows, or exact-positive-integer (define/public (check-results fsym checktype obj) (cond [(eq? checktype 'rows) @@ -61,6 +63,8 @@ [else (void)])) (define/public (check-owner fsym c obj) + (unless handle + (error fsym "prepared statement is closed")) (unless (eq? c (weak-box-value owner)) (error fsym "prepared statement owned by another connection: ~e" obj))) diff --git a/collects/db/private/mysql/connection.rkt b/collects/db/private/mysql/connection.rkt index c36d2680ae..8f6ff681b0 100644 --- a/collects/db/private/mysql/connection.rkt +++ b/collects/db/private/mysql/connection.rkt @@ -250,7 +250,7 @@ ;; Set connection to use utf8 encoding (define/private (after-connect) - (query 'mysql-connect "set names 'utf8'") + (query 'mysql-connect "set names 'utf8'" #f) (void)) @@ -258,20 +258,20 @@ ;; == Query - ;; query : symbol Statement -> QueryResult - (define/public (query fsym stmt) - (check-valid-tx-status fsym) + ;; query : symbol Statement boolean -> QueryResult + (define/public (query fsym stmt cursor?) (let ([result (call-with-lock fsym (lambda () - (let* ([stmt (check-statement fsym stmt)] + (check-valid-tx-status fsym) + (let* ([stmt (check-statement fsym stmt cursor?)] [stmt-type (cond [(statement-binding? stmt) (send (statement-binding-pst stmt) get-stmt-type)] [(string? stmt) (classify-my-sql stmt)])]) (check-statement/tx fsym stmt-type) - (begin0 (query1 fsym stmt #t) + (begin0 (query1 fsym stmt cursor? #t) (when #f ;; DISABLED! ;; For some reason, *really* slow; the concurrent tests slow ;; down by over an order of magnitude when this is enabled. @@ -279,42 +279,49 @@ (query1:process-result fsym result))) ;; query1 : symbol Statement -> QueryResult - (define/private (query1 fsym stmt warnings?) + (define/private (query1 fsym stmt cursor? warnings?) (let ([wbox (and warnings? (box 0))]) (fresh-exchange) - (query1:enqueue stmt) - (begin0 (query1:collect fsym (not (string? stmt)) wbox) + (query1:enqueue stmt cursor?) + (begin0 (query1:collect fsym stmt (not (string? stmt)) cursor? wbox) (when (and warnings? (not (zero? (unbox wbox)))) (fetch-warnings fsym))))) - ;; check-statement : symbol any -> statement-binding - (define/private (check-statement fsym stmt) + ;; check-statement : symbol any boolean -> statement-binding + ;; For cursor, need to clone pstmt, because only one cursor can be + ;; open for a statement at a time. (Could delay clone until + ;; needed, but that seems more complicated.) + (define/private (check-statement fsym stmt cursor?) (cond [(statement-binding? stmt) (let ([pst (statement-binding-pst stmt)]) (send pst check-owner fsym this stmt) (for ([typeid (in-list (send pst get-result-typeids))]) (unless (supported-result-typeid? typeid) (error/unsupported-type fsym typeid))) - stmt)] + (cond [cursor? + (let ([pst* (prepare1 fsym (send pst get-stmt) #f)]) + (statement-binding pst* (statement-binding-params stmt)))] + [else stmt]))] [(and (string? stmt) (force-prepare-sql? fsym stmt)) - (let ([pst (prepare1 fsym stmt #t)]) - (check-statement fsym (send pst bind fsym null)))] + (let ([pst (prepare1 fsym stmt (not cursor?))]) + (check-statement fsym (send pst bind fsym null) #f))] [else stmt])) ;; query1:enqueue : statement -> void - (define/private (query1:enqueue stmt) + (define/private (query1:enqueue stmt cursor?) (cond [(statement-binding? stmt) (let* ([pst (statement-binding-pst stmt)] [id (send pst get-handle)] [params (statement-binding-params stmt)] [null-map (map sql-null? params)]) (send-message - (make-execute-packet id null null-map params)))] + (let ([flags (if cursor? '(cursor/read-only) '())]) + (make-execute-packet id flags null-map params))))] [else ;; string (send-message (make-command-packet 'query stmt))])) ;; query1:collect : symbol bool -> QueryResult stream - (define/private (query1:collect fsym binary? wbox) + (define/private (query1:collect fsym stmt binary? cursor? wbox) (let ([r (recv fsym 'result)]) (match r [(struct ok-packet (affected-rows insert-id status warnings message)) @@ -324,9 +331,12 @@ (status . ,status) (message . ,message)))] [(struct result-set-header-packet (fields extra)) - (let* ([field-dvecs (query1:get-fields fsym binary?)] - [rows (query1:get-rows fsym field-dvecs binary? wbox)]) - (vector 'rows field-dvecs rows))]))) + (let* ([field-dvecs (query1:get-fields fsym binary?)]) + (if cursor? + (vector 'cursor field-dvecs (statement-binding-pst stmt)) + (vector 'rows + field-dvecs + (query1:get-rows fsym field-dvecs binary? wbox #f))))]))) (define/private (query1:get-fields fsym binary?) (let ([r (recv fsym 'field)]) @@ -336,16 +346,18 @@ [(struct eof-packet (warning status)) null]))) - (define/private (query1:get-rows fsym field-dvecs binary? wbox) + (define/private (query1:get-rows fsym field-dvecs binary? wbox end-box) ;; Note: binary? should always be #t, unless force-prepare-sql? misses something. (let ([r (recv fsym (if binary? 'binary-data 'data) field-dvecs)]) (match r [(struct row-data-packet (data)) - (cons data (query1:get-rows fsym field-dvecs binary? wbox))] + (cons data (query1:get-rows fsym field-dvecs binary? wbox end-box))] [(struct binary-row-data-packet (data)) - (cons data (query1:get-rows fsym field-dvecs binary? wbox))] + (cons data (query1:get-rows fsym field-dvecs binary? wbox end-box))] [(struct eof-packet (warnings status)) (when wbox (set-box! wbox warnings)) + (when (and end-box (bitwise-bit-set? status 7)) ;; 'last-row-sent + (set-box! end-box #t)) null]))) (define/private (query1:process-result fsym result) @@ -353,7 +365,31 @@ [(vector 'rows field-dvecs rows) (rows-result (map field-dvec->field-info field-dvecs) rows)] [(vector 'command command-info) - (simple-result command-info)])) + (simple-result command-info)] + [(vector 'cursor field-dvecs pst) + (cursor-result (map field-dvec->field-info field-dvecs) + pst + (list field-dvecs (box #f)))])) + + ;; == Cursor + + (define/public (fetch/cursor fsym cursor fetch-size) + (let ([pst (cursor-result-pst cursor)] + [extra (cursor-result-extra cursor)]) + (send pst check-owner fsym this pst) + (let ([field-dvecs (car extra)] + [end-box (cadr extra)]) + (call-with-lock fsym + (lambda () + (cond [(unbox end-box) + #f] + [else + (let ([wbox (box 0)]) + (fresh-exchange) + (send-message (make-fetch-packet (send pst get-handle) fetch-size)) + (begin0 (query1:get-rows fsym field-dvecs #t wbox end-box) + (when (not (zero? (unbox wbox))) + (fetch-warnings fsym))))])))))) ;; == Prepare @@ -379,6 +415,7 @@ (close-on-exec? close-on-exec?) (param-typeids (map field-dvec->typeid param-dvecs)) (result-dvecs field-dvecs) + (stmt stmt) (stmt-type (classify-my-sql stmt)) (owner this)))]))) @@ -407,7 +444,7 @@ (define/private (fetch-warnings fsym) (unless (eq? notice-handler void) - (let ([result (query1 fsym "SHOW WARNINGS" #f)]) + (let ([result (query1 fsym "SHOW WARNINGS" #f #f)]) (define (find-index name dvecs) (for/or ([dvec (in-list dvecs)] [i (in-naturals)]) @@ -435,28 +472,28 @@ (define/override (start-transaction* fsym isolation) (cond [(eq? isolation 'nested) (let ([savepoint (generate-name)]) - (query1 fsym (format "SAVEPOINT ~a" savepoint) #t) + (query1 fsym (format "SAVEPOINT ~a" savepoint) #f #t) savepoint)] [else (let ([isolation-level (isolation-symbol->string isolation)]) (when isolation-level - (query1 fsym (format "SET TRANSACTION ISOLATION LEVEL ~a" isolation-level) #t)) - (query1 fsym "START TRANSACTION" #t) + (query1 fsym (format "SET TRANSACTION ISOLATION LEVEL ~a" isolation-level) #f #t)) + (query1 fsym "START TRANSACTION" #f #t) #f)])) (define/override (end-transaction* fsym mode savepoint) (case mode ((commit) (cond [savepoint - (query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #t)] + (query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #f #t)] [else - (query1 fsym "COMMIT" #t)])) + (query1 fsym "COMMIT" #f #t)])) ((rollback) (cond [savepoint - (query1 fsym (format "ROLLBACK TO SAVEPOINT ~a" savepoint) #t) - (query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #t)] + (query1 fsym (format "ROLLBACK TO SAVEPOINT ~a" savepoint) #f #t) + (query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #f #t)] [else - (query1 fsym "ROLLBACK" #t)]))) + (query1 fsym "ROLLBACK" #f #t)]))) (void)) ;; name-counter : number @@ -476,7 +513,7 @@ (string-append "SELECT table_name FROM information_schema.tables " "WHERE table_schema = schema()")] [rows - (vector-ref (call-with-lock fsym (lambda () (query1 fsym stmt #t))) 2)]) + (vector-ref (call-with-lock fsym (lambda () (query1 fsym stmt #f #t))) 2)]) (for/list ([row (in-list rows)]) (vector-ref row 0)))) diff --git a/collects/db/private/mysql/message.rkt b/collects/db/private/mysql/message.rkt index d02443c20d..ef789f0b16 100644 --- a/collects/db/private/mysql/message.rkt +++ b/collects/db/private/mysql/message.rkt @@ -32,6 +32,7 @@ Based on protocol documentation here: (struct-out parameter-packet) (struct-out long-data-packet) (struct-out execute-packet) + (struct-out fetch-packet) (struct-out unknown-packet) supported-result-typeid? @@ -313,6 +314,11 @@ Based on protocol documentation here: params) #:transparent) +(define-struct (fetch-packet packet) + (statement-id + count) + #:transparent) + (define-struct (change-plugin-packet packet) (plugin data) @@ -379,13 +385,24 @@ Based on protocol documentation here: (for-each (lambda (type param) (unless (sql-null? param) (write-binary-datum out type param))) - param-types params))])) + param-types params))] + [(struct fetch-packet (statement-id count)) + (io:write-byte out (encode-command 'statement-fetch)) + (io:write-le-int32 out statement-id) + (io:write-le-int32 out count)])) (define (parse-packet in expect field-dvecs) (let* ([len (io:read-le-int24 in)] [num (io:read-byte in)] - [inp (subport in len)] - [msg (parse-packet/1 inp expect len field-dvecs)]) + ;; [inp (subport in len)] + [bs (read-bytes len in)] + [inp (open-input-bytes bs)] + [msg + (with-handlers ([exn? + (lambda (e) + (eprintf "packet was: ~s\n" (bytes->list bs)) + (raise e))]) + (parse-packet/1 inp expect len field-dvecs))]) (when (port-has-bytes? inp) (error/internal 'parse-packet "bytes left over after parsing ~s; bytes were: ~s" msg (io:read-bytes-to-eof inp))) @@ -853,7 +870,9 @@ Based on protocol documentation here: (define server-status-flags/decoding '((#x1 . in-transaction) - (#x2 . auto-commit))) + (#x2 . auto-commit) + (64 . cursor-exists) + (128 . last-row-sent))) (define commands/decoding '((#x00 . sleep) diff --git a/collects/db/private/odbc/connection.rkt b/collects/db/private/odbc/connection.rkt index 2af79f8bd1..f32d6eea1e 100644 --- a/collects/db/private/odbc/connection.rkt +++ b/collects/db/private/odbc/connection.rkt @@ -55,33 +55,37 @@ (define/public (get-dbsystem) dbsystem) (define/override (connected?) (and db #t)) - (define/public (query fsym stmt) - (let-values ([(dvecs rows) - (call-with-lock fsym - (lambda () - (check-valid-tx-status fsym) - (query1 fsym stmt #t)))]) - (cond [(pair? dvecs) (rows-result (map field-dvec->field-info dvecs) rows)] - [else (simple-result '())]))) + (define/public (query fsym stmt cursor?) + (call-with-lock fsym + (lambda () + (check-valid-tx-status fsym) + (query1 fsym stmt #t cursor?)))) - (define/private (query1 fsym stmt check-tx?) - (let* ([stmt (cond [(string? stmt) - (let* ([pst (prepare1 fsym stmt #t)]) - (send pst bind fsym null))] - [(statement-binding? stmt) - stmt])] + (define/private (query1 fsym stmt check-tx? cursor?) + (let* ([stmt (check-statement fsym stmt cursor?)] [pst (statement-binding-pst stmt)] [params (statement-binding-params stmt)]) - (send pst check-owner fsym this stmt) (when check-tx? (check-statement/tx fsym (send pst get-stmt-type))) (let ([result-dvecs (send pst get-result-dvecs)]) (for ([dvec (in-list result-dvecs)]) (let ([typeid (field-dvec->typeid dvec)]) (unless (supported-typeid? typeid) (error/unsupported-type fsym typeid))))) - (query1:inner fsym pst params))) + (query1:inner fsym pst params cursor?))) - (define/private (query1:inner fsym pst params) + (define/private (check-statement fsym stmt cursor?) + (cond [(statement-binding? stmt) + (let ([pst (statement-binding-pst stmt)]) + (send pst check-owner fsym this stmt) + (cond [cursor? + (let ([pst* (prepare1 fsym (send pst get-stmt) #f)]) + (statement-binding pst* (statement-binding-params stmt)))] + [else stmt]))] + [(string? stmt) + (let* ([pst (prepare1 fsym stmt (not cursor?))]) + (send pst bind fsym null))])) + + (define/private (query1:inner fsym pst params cursor?) (let* ([db (get-db fsym)] [stmt (send pst get-handle)]) (let* ([param-bufs @@ -94,12 +98,32 @@ (strong-void param-bufs)) (let* ([result-dvecs (send pst get-result-dvecs)] [rows - (and (pair? result-dvecs) - (fetch* fsym stmt (map field-dvec->typeid result-dvecs)))]) - (handle-status fsym (SQLFreeStmt stmt SQL_CLOSE) stmt) - (handle-status fsym (SQLFreeStmt stmt SQL_RESET_PARAMS) stmt) - (send pst after-exec #f) - (values result-dvecs rows)))) + (and (not cursor?) + (pair? result-dvecs) + (fetch* fsym stmt (map field-dvec->typeid result-dvecs) #f +inf.0))]) + (unless cursor? (send pst after-exec #f)) + (cond [(and (pair? result-dvecs) (not cursor?)) + (rows-result (map field-dvec->field-info result-dvecs) rows)] + [(and (pair? result-dvecs) cursor?) + (cursor-result (map field-dvec->field-info result-dvecs) + pst + (list (map field-dvec->typeid result-dvecs) + (box #f)))] + [else (simple-result '())])))) + + (define/public (fetch/cursor fsym cursor fetch-size) + (let ([pst (cursor-result-pst cursor)] + [extra (cursor-result-extra cursor)]) + (send pst check-owner fsym this pst) + (call-with-lock fsym + (lambda () + (let ([typeids (car extra)] + [end-box (cadr extra)]) + (cond [(unbox end-box) #f] + [else + (begin0 (fetch* fsym (send pst get-handle) typeids end-box fetch-size) + (when (unbox end-box) + (send pst after-exec #f)))])))))) (define/private (load-param fsym db stmt i param typeid) ;; NOTE: param buffers must not move between bind and execute @@ -208,16 +232,24 @@ (bind SQL_C_CHAR SQL_VARCHAR #f)] [else (error/internal fsym "cannot convert to typeid ~a: ~e" typeid param)])) - (define/private (fetch* fsym stmt result-typeids) + (define/private (fetch* fsym stmt result-typeids end-box limit) ;; scratchbuf: create a single buffer here to try to reduce garbage ;; Don't make too big; otherwise bad for queries with only small data. ;; Doesn't need to be large, since get-varbuf already smart for long data. ;; MUST be at least as large as any int/float type (see get-num) ;; SHOULD be at least as large as any structures (see uses of get-int-list) (let ([scratchbuf (make-bytes 50)]) - (let loop () - (let ([c (fetch fsym stmt result-typeids scratchbuf)]) - (if c (cons c (loop)) null))))) + (let loop ([fetched 0]) + (cond [(< fetched limit) + (let ([c (fetch fsym stmt result-typeids scratchbuf)]) + (cond [c + (cons c (loop (add1 fetched)))] + [else + (when end-box (set-box! end-box #t)) + (handle-status fsym (SQLFreeStmt stmt SQL_CLOSE) stmt) + (handle-status fsym (SQLFreeStmt stmt SQL_RESET_PARAMS) stmt) + null]))] + [else null])))) (define/private (fetch fsym stmt result-typeids scratchbuf) (let ([s (SQLFetch stmt)]) @@ -413,6 +445,7 @@ (close-on-exec? close-on-exec?) (param-typeids param-typeids) (result-dvecs result-dvecs) + (stmt sql) (stmt-type (classify-odbc-sql sql)) (owner this))]) (hash-set! statement-table pst #t) @@ -556,7 +589,7 @@ "WHERE " schema-cond))] [else (uerror fsym "not supported for this DBMS")])]) - (let* ([result (query fsym stmt)] + (let* ([result (query fsym stmt #f)] [rows (rows-result-rows result)]) (for/list ([row (in-list rows)]) (vector-ref row 0))))) diff --git a/collects/db/private/postgresql/connection.rkt b/collects/db/private/postgresql/connection.rkt index 748bf9a3aa..cedd379b82 100644 --- a/collects/db/private/postgresql/connection.rkt +++ b/collects/db/private/postgresql/connection.rkt @@ -110,7 +110,7 @@ ((transaction) (set! tx-status #t)) ((failed) (set! tx-status 'invalid)))] [(and or-eof? (eof-object? r)) (void)] - [else (error/comm fsym)]))) + [else (error/comm fsym "expected ready")]))) ;; == Asynchronous messages @@ -239,67 +239,71 @@ ;; == Query - ;; query : symbol Statement -> QueryResult - (define/public (query fsym stmt0) + ;; query : symbol Statement boolean -> QueryResult + (define/public (query fsym stmt cursor?) (let ([result (call-with-lock fsym (lambda () (check-valid-tx-status fsym) - (let* ([stmt (check-statement fsym stmt0)] + (let* ([stmt (check-statement fsym stmt cursor?)] [pst (statement-binding-pst stmt)] [stmt-type (send pst get-stmt-type)] - [close-on-exec? (send pst get-close-on-exec?)]) + [close-on-exec? (and (not cursor?) (send pst get-close-on-exec?))]) (check-statement/tx fsym stmt-type) - (query1 fsym stmt #f close-on-exec?))))]) + (when cursor? + (unless (eq? (get-tx-status) #t) + (error fsym "cursor allowed only within transaction"))) + (query1 fsym stmt close-on-exec? cursor?))))]) (query1:process-result fsym result))) - (define/private (query1 fsym stmt simple? [close-on-exec? #f]) - ;; if simple?: stmt must be string, no params, & results must be binary-readable - (query1:enqueue stmt close-on-exec?) - (send-message (make-Sync)) - (begin0 (query1:collect fsym simple? close-on-exec?) - (check-ready-for-query fsym #f) - (when DEBUG? - (fprintf (current-error-port) " ** ~a\n" (tx-state->string))))) + (define/private (query1 fsym stmt close-on-exec? cursor?) + ;; if stmt is string, must take no params & results must be binary-readable + (let ([portal (query1:enqueue stmt close-on-exec? cursor?)]) + (send-message (make-Sync)) + (begin0 (query1:collect fsym stmt portal (string? stmt) close-on-exec? cursor?) + (check-ready-for-query fsym #f) + (when DEBUG? + (fprintf (current-error-port) " ** ~a\n" (tx-state->string)))))) ;; check-statement : symbol statement -> statement-binding ;; Convert to statement-binding; need to prepare to get type information, used to ;; choose result formats. ;; FIXME: if text format eliminated, can skip prepare ;; FIXME: can use classify-pg-sql to avoid preparing stmts with no results - (define/private (check-statement fsym stmt) + (define/private (check-statement fsym stmt cursor?) (cond [(statement-binding? stmt) (let ([pst (statement-binding-pst stmt)]) (send pst check-owner fsym this stmt) stmt)] [(string? stmt) - (let ([pst (prepare1 fsym stmt #t)]) + (let ([pst (prepare1 fsym stmt (not cursor?))]) (send pst bind fsym null))])) - ;; query1:enqueue : Statement -> void - (define/private (query1:enqueue stmt close-on-exec?) - (cond [(statement-binding? stmt) - (let* ([pst (statement-binding-pst stmt)] - [pst-name (send pst get-handle)] - [params (statement-binding-params stmt)]) - (buffer-message (make-Bind "" pst-name - (map typeid->format (send pst get-param-typeids)) - params - (map typeid->format (send pst get-result-typeids)))) - (buffer-message (make-Describe 'portal "")) - (buffer-message (make-Execute "" 0)) - (buffer-message (make-Close 'portal "")) - (when close-on-exec? - (buffer-message (make-Close 'statement pst-name)) - (send pst set-handle #f)))] - [(string? stmt) - (buffer-message (make-Parse "" stmt '())) - (buffer-message (make-Bind "" "" '() '() '(1))) - (buffer-message (make-Describe 'portal "")) - (buffer-message (make-Execute "" 0)) - (buffer-message (make-Close 'portal ""))])) + ;; query1:enqueue : Statement boolean boolean -> string + (define/private (query1:enqueue stmt close-on-exec? cursor?) + (let ([portal (if cursor? (generate-name) "")]) + (cond [(statement-binding? stmt) + (let* ([pst (statement-binding-pst stmt)] + [pst-name (send pst get-handle)] + [params (statement-binding-params stmt)]) + (buffer-message (make-Bind portal pst-name + (map typeid->format (send pst get-param-typeids)) + params + (map typeid->format (send pst get-result-typeids)))))] + [(string? stmt) + (buffer-message (make-Parse "" stmt '())) + (buffer-message (make-Bind portal "" '() '() '(1)))]) + (buffer-message (make-Describe 'portal portal)) + (unless cursor? + (buffer-message (make-Execute portal 0)) + (buffer-message (make-Close 'portal portal)) + (when close-on-exec? + (let ([pst (statement-binding-pst stmt)]) + (buffer-message (make-Close 'statement (send pst get-handle))) + (send pst set-handle #f)))) + portal)) - (define/private (query1:collect fsym simple? close-on-exec?) + (define/private (query1:collect fsym stmt portal simple? close-on-exec? cursor?) (when simple? (match (recv-message fsym) [(struct ParseComplete ()) (void)] @@ -309,20 +313,27 @@ [other-r (query1:error fsym other-r)]) (match (recv-message fsym) [(struct RowDescription (field-dvecs)) - (let* ([rows (query1:data-loop fsym)]) - (query1:expect-close-complete fsym close-on-exec?) - (vector 'rows field-dvecs rows))] + (cond [cursor? + (vector 'cursor field-dvecs stmt portal)] + [else + (let* ([rows (query1:data-loop fsym #f)]) + (query1:expect-close-complete fsym close-on-exec?) + (vector 'rows field-dvecs rows))])] [(struct NoData ()) (let* ([command (query1:expect-completion fsym)]) (query1:expect-close-complete fsym close-on-exec?) (vector 'command command))] [other-r (query1:error fsym other-r)])) - (define/private (query1:data-loop fsym) + (define/private (query1:data-loop fsym end-box) (match (recv-message fsym) [(struct DataRow (row)) - (cons row (query1:data-loop fsym))] - [(struct CommandComplete (command)) null] + (cons row (query1:data-loop fsym end-box))] + [(struct CommandComplete (command)) + (when end-box (set-box! end-box #t)) + null] + [(struct PortalSuspended ()) + null] [other-r (query1:error fsym other-r)])) (define/private (query1:expect-completion fsym) @@ -343,22 +354,30 @@ (uerror fsym (nosupport "COPY IN statements"))] [(struct CopyOutResponse (format column-formats)) (uerror fsym (nosupport "COPY OUT statements"))] - [_ (error/comm fsym)])) + [_ (error/comm fsym (format "got: ~e" r))])) + + (define/private (get-convert-row! fsym field-dvecs) + (let* ([type-reader-v + (list->vector (query1:get-type-readers fsym field-dvecs))]) + (lambda (row) + (vector-map! (lambda (value type-reader) + (cond [(sql-null? value) sql-null] + [else (type-reader value)])) + row + type-reader-v)))) (define/private (query1:process-result fsym result) (match result [(vector 'rows field-dvecs rows) - (let* ([type-reader-v - (list->vector (query1:get-type-readers fsym field-dvecs))] - [convert-row! - (lambda (row) - (vector-map! (lambda (value type-reader) - (cond [(sql-null? value) sql-null] - [else (type-reader value)])) - row - type-reader-v))]) - (for-each convert-row! rows) - (rows-result (map field-dvec->field-info field-dvecs) rows))] + (for-each (get-convert-row! fsym field-dvecs) rows) + (rows-result (map field-dvec->field-info field-dvecs) rows)] + [(vector 'cursor field-dvecs stmt portal) + (let* ([convert-row! (get-convert-row! fsym field-dvecs)] + [pst (statement-binding-pst stmt)]) + ;; FIXME: register finalizer to close portal? + (cursor-result (map field-dvec->field-info field-dvecs) + pst + (list portal convert-row! (box #f))))] [(vector 'command command) (simple-result command)])) @@ -368,6 +387,38 @@ (typeid->type-reader fsym typeid))) field-dvecs)) + ;; == Cursor + + (define/public (fetch/cursor fsym cursor fetch-size) + (let ([pst (cursor-result-pst cursor)] + [extra (cursor-result-extra cursor)]) + (send pst check-owner fsym this pst) + (let ([portal (car extra)] + [convert-row! (cadr extra)] + [end-box (caddr extra)]) + (let ([rows + (call-with-lock fsym + (lambda () + (cond [(unbox end-box) #f] + [else + (buffer-message (make-Execute portal fetch-size)) + (send-message (make-Sync)) + (let ([rows (query1:data-loop fsym end-box)]) + (check-ready-for-query fsym #f) + (when (unbox end-box) + (cursor:close fsym pst portal)) + rows)])))]) + (and rows (begin (for-each convert-row! rows) rows)))))) + + (define/private (cursor:close fsym pst portal) + (let ([close-on-exec? (send pst get-close-on-exec?)]) + (buffer-message (make-Close 'portal portal)) + (when close-on-exec? + (buffer-message (make-Close 'statement (send pst get-handle))) + (send pst set-handle #f)) + (send-message (make-Sync)) + (query1:expect-close-complete fsym close-on-exec?) + (check-ready-for-query fsym #f))) ;; == Prepare @@ -447,12 +498,17 @@ #f) (do-free-statement))) + ;; == Internal query + + (define/private (internal-query1 fsym sql) + (query1 fsym sql #f #f)) + ;; == Transactions (define/override (start-transaction* fsym isolation) (cond [(eq? isolation 'nested) (let ([savepoint (generate-name)]) - (query1 fsym (format "SAVEPOINT ~a" savepoint) #t) + (internal-query1 fsym (format "SAVEPOINT ~a" savepoint)) savepoint)] [else (let* ([isolation-level (isolation-symbol->string isolation)] @@ -462,22 +518,22 @@ ;; FIXME: also support ;; 'read-only => "READ ONLY" ;; 'read-write => "READ WRITE" - (query1 fsym stmt #t) + (internal-query1 fsym stmt) #f)])) (define/override (end-transaction* fsym mode savepoint) (case mode ((commit) (cond [savepoint - (query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #t)] + (internal-query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint))] [else - (query1 fsym "COMMIT WORK" #t)])) + (internal-query1 fsym "COMMIT WORK")])) ((rollback) (cond [savepoint - (query1 fsym (format "ROLLBACK TO SAVEPOINT ~a" savepoint) #t) - (query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #t)] + (internal-query1 fsym (format "ROLLBACK TO SAVEPOINT ~a" savepoint)) + (internal-query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint))] [else - (query1 fsym "ROLLBACK WORK" #t)]))) + (internal-query1 fsym "ROLLBACK WORK")]))) (void)) ;; == Reflection @@ -491,7 +547,7 @@ "table_schema = SOME (current_schemas(false))") ((current) "table_schema = current_schema")))] - [result (call-with-lock fsym (lambda () (query1 fsym stmt #t)))] + [result (call-with-lock fsym (lambda () (internal-query1 fsym stmt)))] [rows (vector-ref result 2)]) (for/list ([row (in-list rows)]) (bytes->string/utf-8 (vector-ref row 0))))) diff --git a/collects/db/private/postgresql/message.rkt b/collects/db/private/postgresql/message.rkt index ca3c9e5d48..744dc7628a 100644 --- a/collects/db/private/postgresql/message.rkt +++ b/collects/db/private/postgresql/message.rkt @@ -376,7 +376,7 @@ (define-struct PortalSuspended () #:transparent) (define (parse:PortalSuspended p) - (with-length-in p #\p + (with-length-in p #\s (make-PortalSuspended))) (define-struct Query (query) #:transparent) @@ -464,7 +464,7 @@ ((#\t) (parse:ParameterDescription p)) ((#\S) (parse:ParameterStatus p)) ((#\1) (parse:ParseComplete p)) - ((#\p) (parse:PortalSuspended p)) + ((#\s) (parse:PortalSuspended p)) ((#\Z) (parse:ReadyForQuery p)) ((#\T) (parse:RowDescription p)) (else diff --git a/collects/db/private/sqlite3/connection.rkt b/collects/db/private/sqlite3/connection.rkt index 912e2c5157..a084c29040 100644 --- a/collects/db/private/sqlite3/connection.rkt +++ b/collects/db/private/sqlite3/connection.rkt @@ -38,21 +38,16 @@ (define/public (get-dbsystem) dbsystem) (define/override (connected?) (and -db #t)) - (define/public (query fsym stmt) + (define/public (query fsym stmt cursor?) (call-with-lock fsym (lambda () (check-valid-tx-status fsym) - (query1 fsym stmt #t)))) + (query1 fsym stmt #t cursor?)))) - (define/private (query1 fsym stmt check-tx?) - (let* ([stmt (cond [(string? stmt) - (let* ([pst (prepare1 fsym stmt #t)]) - (send pst bind fsym null))] - [(statement-binding? stmt) - stmt])] + (define/private (query1 fsym stmt check-tx? cursor?) + (let* ([stmt (check-statement fsym stmt cursor?)] [pst (statement-binding-pst stmt)] [params (statement-binding-params stmt)]) - (send pst check-owner fsym this stmt) (when check-tx? (check-statement/tx fsym (send pst get-stmt-type))) (let ([db (get-db fsym)] [stmt (send pst get-handle)]) @@ -61,21 +56,47 @@ (for ([i (in-naturals 1)] [param (in-list params)]) (load-param fsym db stmt i param)) - (let* ([info - (for/list ([i (in-range (sqlite3_column_count stmt))]) - `((name . ,(sqlite3_column_name stmt i)) - (decltype . ,(sqlite3_column_decltype stmt i))))] - [rows (step* fsym db stmt)]) - (HANDLE fsym (sqlite3_reset stmt)) - (HANDLE fsym (sqlite3_clear_bindings stmt)) + (let ([info + (for/list ([i (in-range (sqlite3_column_count stmt))]) + `((name . ,(sqlite3_column_name stmt i)) + (decltype . ,(sqlite3_column_decltype stmt i))))] + [result + (or cursor? + (step* fsym db stmt #f +inf.0))]) (unless (eq? tx-status 'invalid) (set! tx-status (get-tx-status))) - (send pst after-exec #f) - (cond [(pair? info) - (rows-result info rows)] + (unless cursor? (send pst after-exec #f)) + (cond [(and (pair? info) (not cursor?)) + (rows-result info result)] + [(and (pair? info) cursor?) + (cursor-result info pst (box #f))] [else (simple-result '())]))))) + (define/public (fetch/cursor fsym cursor fetch-size) + (let ([pst (cursor-result-pst cursor)] + [end-box (cursor-result-extra cursor)]) + (send pst check-owner fsym this pst) + (call-with-lock fsym + (lambda () + (cond [(unbox end-box) #f] + [else + (begin0 (step* fsym (get-db fsym) (send pst get-handle) end-box fetch-size) + (when (unbox end-box) + (send pst after-exec #f)))]))))) + + (define/private (check-statement fsym stmt cursor?) + (cond [(statement-binding? stmt) + (let ([pst (statement-binding-pst stmt)]) + (send pst check-owner fsym this stmt) + (cond [cursor? + (let ([pst* (prepare1 fsym (send pst get-stmt) #f)]) + (statement-binding pst* (statement-binding-params stmt)))] + [else stmt]))] + [(string? stmt) + (let* ([pst (prepare1 fsym stmt (not cursor?))]) + (send pst bind fsym null))])) + (define/private (load-param fsym db stmt i param) (HANDLE fsym (cond [(int64? param) @@ -91,9 +112,17 @@ [else (error/internal fsym "bad parameter: ~e" param)]))) - (define/private (step* fsym db stmt) - (let ([c (step fsym db stmt)]) - (if c (cons c (step* fsym db stmt)) null))) + (define/private (step* fsym db stmt end-box fetch-limit) + (if (zero? fetch-limit) + null + (let ([c (step fsym db stmt)]) + (cond [c + (cons c (step* fsym db stmt end-box (sub1 fetch-limit)))] + [else + (HANDLE fsym (sqlite3_reset stmt)) + (HANDLE fsym (sqlite3_clear_bindings stmt)) + (when end-box (set-box! end-box #t)) + null])))) (define/private (step fsym db stmt) (let ([s (HANDLE fsym (sqlite3_step stmt))]) @@ -132,13 +161,11 @@ (HANDLE fsym (let-values ([(prep-status stmt tail?) (sqlite3_prepare_v2 db sql)]) - (define (free!) (when stmt (sqlite3_finalize stmt))) - (unless stmt - (uerror fsym "SQL syntax error in ~e" sql)) (when tail? - (free!) (uerror fsym "multiple SQL statements given: ~e" sql)) + (when stmt (sqlite3_finalize stmt)) + (uerror fsym "multiple SQL statements given: ~e" sql)) (values prep-status stmt)))]) - (unless stmt (error/internal fsym "prepare failed")) + (unless stmt (uerror fsym "SQL syntax error in ~e" sql)) (let* ([param-typeids (for/list ([i (in-range (sqlite3_bind_parameter_count stmt))]) 'any)] @@ -190,7 +217,7 @@ ;; Internal query (define/private (internal-query1 fsym sql) - (query1 fsym sql #f)) + (query1 fsym sql #f #f)) ;; == Transactions diff --git a/collects/db/scribblings/query.scrbl b/collects/db/scribblings/query.scrbl index a139a0ff63..d79325f454 100644 --- a/collects/db/scribblings/query.scrbl +++ b/collects/db/scribblings/query.scrbl @@ -244,20 +244,32 @@ The types of parameters and returned fields are described in @defproc[(in-query [connection connection?] [stmt statement?] - [arg any/c] ...) + [arg any/c] ... + [#:fetch fetch-size (or/c exact-positive-integer? +inf.0) +inf.0]) sequence?]{ Executes a SQL query, which must produce rows, and returns a sequence. Each step in the sequence produces as many values as the - rows have columns. + rows have columns. + + If @racket[fetch-size] is @racket[+inf.0], all rows are fetched when + the sequence is created. If @racket[fetch-size] is finite, a + @deftech{cursor} is created and @racket[fetch-size] rows are fetched + at a time, allowing processing to be interleaved with retrieval. On + some database systems, ending a transaction implicitly closes all + open cursors; attempting to fetch more rows may fail. On PostgreSQL, + a cursor can be opened only within a transaction. @examples/results[ [(for/list ([n (in-query pgc "select n from the_numbers where n < 2")]) n) '(0 1)] -[(for ([(n d) - (in-query pgc "select * from the_numbers where n < $1" 4)]) - (printf "~a is ~a\n" n d)) +[(call-with-transaction pgc + (lambda () + (for ([(n d) + (in-query pgc "select * from the_numbers where n < $1" 4 + #:fetch 1)]) + (printf "~a is ~a\n" n d)))) (for-each (lambda (n d) (printf "~a: ~a\n" n d)) '(0 1 2 3) '("nothing" "the loneliest number" "company" "a crowd"))] ] diff --git a/collects/tests/db/db/query.rkt b/collects/tests/db/db/query.rkt index 432a46be4d..15137d2e25 100644 --- a/collects/tests/db/db/query.rkt +++ b/collects/tests/db/db/query.rkt @@ -132,7 +132,55 @@ (for ([(x y) (in-query c (bind-prepared-statement (prepare c stmt) (list 2)))]) 0)) ((gen) - (for ([(x y) (in-query c (virtual-statement stmt) 2)]) 0))))))))) + (for ([(x y) (in-query c (virtual-statement stmt) 2)]) 0))))))) + )) + +(define in-query-tests + (test-suite "in-query (cursor)" + ;; call-with-transaction necessary for postresql + (test-case "in-query w/ #:fetch" + (with-connection c + (for ([fs (in-range 1 10)]) + (check equal? + (call-with-transaction c + (lambda () + (for/list ([n (in-query c "select N from the_numbers order by N asc" #:fetch fs)]) n))) + (map car test-data))) + (check equal? + (call-with-transaction c + (lambda () + (for/first ([n (in-query c "select N from the_numbers order by N asc" #:fetch 1)]) n))) + (for/first ([n (map car test-data)]) n)))) + (test-case "in-query multiple different" + (with-connection c + (check equal? + (call-with-transaction c + (lambda () + (for/list ([n (in-query c "select N from the_numbers order by N asc" #:fetch 1)] + [m (in-query c "select N from the_numbers order by N desc" #:fetch 1)]) + (list n m)))) + (let ([nums (map car test-data)]) + (map list nums (reverse nums)))))) + (test-case "in-query multiple same" + (with-connection c + (let ([pst (prepare c "select N from the_numbers order by N asc")]) + (check equal? + (call-with-transaction c + (lambda () + (for/list ([n (in-query c pst #:fetch 1)] + [m (in-query c pst #:fetch 1)]) + (list n m)))) + (let ([nums (map car test-data)]) + (map list nums nums)))))) + (test-case "in-query with interleaved queries" + (with-connection c + (check equal? + (call-with-transaction c + (lambda () + (for/list ([n (in-query c "select N from the_numbers order by N asc" #:fetch 1)]) + (list n (query-value c (sql "select descr from the_numbers where N = $1") n))))) + test-data))) + )) (define low-level-tests (test-suite "low-level" @@ -556,6 +604,7 @@ (simple-tests 'prepare) (simple-tests 'bind) (simple-tests 'gen) + in-query-tests low-level-tests tx-tests misc-tests