db: eliminate some copying from postgresql message reading
This commit is contained in:
parent
8363db9258
commit
c5472fbf3e
|
@ -370,28 +370,19 @@
|
|||
[_ (error/internal* fsym "unexpected message from back end"
|
||||
'("message" value) r)]))
|
||||
|
||||
(define/private (get-convert-row! fsym field-dvecs)
|
||||
(let* ([type-reader-v
|
||||
(list->vector (query1:get-type-readers fsym field-dvecs))])
|
||||
(lambda (row)
|
||||
(vector-map! (lambda (value type-reader)
|
||||
(cond [(sql-null? value) sql-null]
|
||||
[else (type-reader value)]))
|
||||
row
|
||||
type-reader-v))))
|
||||
|
||||
(define/private (query1:process-result fsym result)
|
||||
(match result
|
||||
[(vector 'rows field-dvecs rows)
|
||||
(for-each (get-convert-row! fsym field-dvecs) rows)
|
||||
(rows-result (map field-dvec->field-info field-dvecs) rows)]
|
||||
(let ([type-readers (query1:get-type-readers fsym field-dvecs)])
|
||||
(rows-result (map field-dvec->field-info field-dvecs)
|
||||
(map (lambda (data) (bytes->row data type-readers))
|
||||
rows)))]
|
||||
[(vector 'cursor field-dvecs stmt portal)
|
||||
(let* ([convert-row! (get-convert-row! fsym field-dvecs)]
|
||||
[pst (statement-binding-pst stmt)])
|
||||
;; FIXME: register finalizer to close portal?
|
||||
(let ([pst (statement-binding-pst stmt)]
|
||||
[type-readers (query1:get-type-readers fsym field-dvecs)])
|
||||
(cursor-result (map field-dvec->field-info field-dvecs)
|
||||
pst
|
||||
(list portal convert-row! (box #f))))]
|
||||
(list portal type-readers (box #f))))]
|
||||
[(vector 'command command)
|
||||
(simple-result command)]))
|
||||
|
||||
|
@ -408,7 +399,7 @@
|
|||
[extra (cursor-result-extra cursor)])
|
||||
(send pst check-owner fsym this pst)
|
||||
(let ([portal (car extra)]
|
||||
[convert-row! (cadr extra)]
|
||||
[type-readers (cadr extra)]
|
||||
[end-box (caddr extra)])
|
||||
(let ([rows
|
||||
(call-with-lock fsym
|
||||
|
@ -423,7 +414,7 @@
|
|||
(when (unbox end-box)
|
||||
(cursor:close fsym pst portal))
|
||||
rows)])))])
|
||||
(and rows (begin (for-each convert-row! rows) rows))))))
|
||||
(and rows (map (lambda (data) (bytes->row data type-readers)) rows))))))
|
||||
|
||||
(define/private (cursor:close fsym pst portal)
|
||||
(let ([close-on-exec? (send pst get-close-on-exec?)])
|
||||
|
@ -561,9 +552,11 @@
|
|||
((current)
|
||||
"table_schema = current_schema")))]
|
||||
[result (call-with-lock fsym (lambda () (internal-query1 fsym stmt)))]
|
||||
[rows (vector-ref result 2)])
|
||||
[rows (vector-ref result 2)]
|
||||
[type-readers (list subbytes)])
|
||||
(for/list ([row (in-list rows)])
|
||||
(bytes->string/utf-8 (vector-ref row 0)))))
|
||||
(let ([row (bytes->row row type-readers)])
|
||||
(bytes->string/utf-8 (vector-ref row 0))))))
|
||||
))
|
||||
|
||||
;; ========================================
|
||||
|
|
|
@ -292,9 +292,9 @@ record = cols:int4 (typeoid:int4 len/-1:int4 data:byte^len)^cols
|
|||
0)])
|
||||
(sql-interval years months days hours mins secs nsecs))]))
|
||||
|
||||
(define ((c-parse-array parse-elt) x)
|
||||
(define ((c-parse-array parse-elt) buf start end)
|
||||
;; NOTE: we assume that array enclosed with "{" and "}", and separator is ","
|
||||
(let* ([s (bytes->string/utf-8 x)]
|
||||
(let* ([s (bytes->string/utf-8 buf #f start end)]
|
||||
[vals
|
||||
(let loop ([s s])
|
||||
(cond [(equal? s "{}") '#()]
|
||||
|
@ -378,53 +378,55 @@ record = cols:int4 (typeoid:int4 len/-1:int4 data:byte^len)^cols
|
|||
"}")]))])]))
|
||||
|
||||
;; Binary readers
|
||||
;; Take bytes, start offset, end offset (but most ignore end)
|
||||
|
||||
(define (recv-bits x)
|
||||
(let* ([len (integer-bytes->integer x #t #t 0 4)])
|
||||
(make-sql-bits/bytes len (subbytes x 4) 0)))
|
||||
(define (recv-bits buf start end)
|
||||
(let* ([bitslen (integer-bytes->integer buf #t #t start (+ start 4))])
|
||||
(make-sql-bits/bytes bitslen (subbytes buf (+ start 4) end) 0)))
|
||||
|
||||
(define (recv-boolean x)
|
||||
(case (bytes-ref x 0)
|
||||
(define (recv-boolean buf start end)
|
||||
(case (bytes-ref buf start)
|
||||
((0) #f)
|
||||
((1) #t)
|
||||
(else (error/internal* 'recv-boolean "bad value" '("value" value) x))))
|
||||
(else (error/internal* 'recv-boolean "bad value"
|
||||
'("value" value) (bytes-ref buf start)))))
|
||||
|
||||
(define (recv-char1 x)
|
||||
(integer->char (bytes-ref x 0)))
|
||||
(define (recv-char1 buf start end)
|
||||
(integer->char (bytes-ref buf start)))
|
||||
|
||||
(define (recv-bytea x)
|
||||
x)
|
||||
(define (recv-bytea buf start end)
|
||||
(subbytes buf start end))
|
||||
|
||||
(define (recv-string x)
|
||||
(bytes->string/utf-8 x))
|
||||
(define (recv-string buf start end)
|
||||
(bytes->string/utf-8 buf #f start end))
|
||||
|
||||
(define (recv-integer x)
|
||||
(integer-bytes->integer x #t #t))
|
||||
(define (recv-integer buf start end)
|
||||
(integer-bytes->integer buf #t #t start end))
|
||||
|
||||
(define (recv-unsigned-integer x)
|
||||
(integer-bytes->integer x #f #t))
|
||||
(define (recv-unsigned-integer buf start end)
|
||||
(integer-bytes->integer buf #f #t start end))
|
||||
|
||||
(define (recv-float x)
|
||||
(floating-point-bytes->real x #t))
|
||||
(define (recv-float buf start end)
|
||||
(floating-point-bytes->real buf #t start end))
|
||||
|
||||
(define (get-double bs offset)
|
||||
(floating-point-bytes->real bs #t offset (+ 8 offset)))
|
||||
(define (recv-point x [offset 0])
|
||||
(point (get-double x (+ offset 0)) (get-double x (+ offset 8))))
|
||||
(define (recv-box x)
|
||||
(pg-box (recv-point x 0) (recv-point x 16)))
|
||||
(define (recv-circle x)
|
||||
(pg-circle (recv-point x 0) (get-double x 16)))
|
||||
(define (recv-lseg x)
|
||||
(line-string (list (recv-point x 0) (recv-point x 16))))
|
||||
(define (recv-path x)
|
||||
(pg-path (not (zero? (bytes-ref x 0)))
|
||||
(for/list ([i (integer-bytes->integer x #t #t 1 5)])
|
||||
(recv-point x (+ 5 (* 16 i))))))
|
||||
(define (recv-polygon x)
|
||||
(define (recv-point buf start end)
|
||||
(point (get-double buf start) (get-double buf (+ start 8))))
|
||||
(define (recv-box buf start end)
|
||||
(pg-box (recv-point buf start #f) (recv-point buf (+ start 16) #f)))
|
||||
(define (recv-circle buf start end)
|
||||
(pg-circle (recv-point buf start #f) (get-double buf (+ start 16))))
|
||||
(define (recv-lseg buf start end)
|
||||
(line-string (list (recv-point buf start #f) (recv-point buf (+ start 16) #f))))
|
||||
(define (recv-path buf start end)
|
||||
(pg-path (not (zero? (bytes-ref buf start)))
|
||||
(for/list ([i (integer-bytes->integer buf #t #t (+ start 1) (+ start 5))])
|
||||
(recv-point buf (+ start 5 (* 16 i)) #f))))
|
||||
(define (recv-polygon buf start end)
|
||||
(let* ([points0
|
||||
(for/list ([i (in-range (integer-bytes->integer x #t #t 0 4))])
|
||||
(recv-point x (+ 4 (* 16 i))))]
|
||||
(for/list ([i (in-range (integer-bytes->integer buf #t #t start (+ start 4)))])
|
||||
(recv-point buf (+ start 4 (* 16 i)) #f))]
|
||||
[points (append points0 (list (car points0)))])
|
||||
(polygon (line-string points)
|
||||
null)))
|
||||
|
@ -453,56 +455,53 @@ record = cols:int4 (typeoid:int4 len/-1:int4 data:byte^len)^cols
|
|||
(values recv-time recv-timetz)))
|
||||
|#
|
||||
|
||||
(define (recv-record x)
|
||||
(let ([start 0])
|
||||
(define (get-int signed?)
|
||||
(begin0 (integer-bytes->integer x signed? #t start (+ start 4))
|
||||
(set! start (+ start 4))))
|
||||
(define (get-bytes len)
|
||||
(begin0 (subbytes x start (+ start len))
|
||||
(set! start (+ start len))))
|
||||
(define (recv-col)
|
||||
(let* ([typeid (get-int #t)]
|
||||
[len (get-int #t)])
|
||||
(if (= len -1)
|
||||
sql-null
|
||||
(let* ([bin? (= (typeid->format typeid) 1)] ;; binary reader available
|
||||
[reader (and bin? (typeid->type-reader 'recv-record typeid))])
|
||||
(if reader
|
||||
(reader (get-bytes len))
|
||||
'unreadable)))))
|
||||
(let ([columns (get-int #t)])
|
||||
(build-vector columns (lambda (i) (recv-col))))))
|
||||
(define (recv-record buf start end)
|
||||
(define (get-int signed?)
|
||||
(begin0 (integer-bytes->integer buf signed? #t start (+ start 4))
|
||||
(set! start (+ start 4))))
|
||||
(define (recv-col)
|
||||
(let* ([typeid (get-int #t)]
|
||||
[len (get-int #t)])
|
||||
(if (= len -1)
|
||||
sql-null
|
||||
(let* ([bin? (= (typeid->format typeid) 1)] ;; binary reader available
|
||||
[reader (and bin? (typeid->type-reader 'recv-record typeid))])
|
||||
(if reader
|
||||
(reader buf start (+ start (max 0 len)))
|
||||
'unreadable)))))
|
||||
(let* ([columns (get-int #t)]
|
||||
[result (make-vector columns #f)])
|
||||
(for ([i (in-range columns)])
|
||||
(vector-set! result i (recv-col)))
|
||||
result))
|
||||
|
||||
(define (recv-array x)
|
||||
(let ([start 0])
|
||||
(define (get-int signed?)
|
||||
(begin0 (integer-bytes->integer x signed? #t start (+ start 4))
|
||||
(set! start (+ start 4))))
|
||||
(define (get-bytes len)
|
||||
(begin0 (subbytes x start (+ start len))
|
||||
(set! start (+ start len))))
|
||||
(let* ([ndim (get-int #t)]
|
||||
[flags (get-int #f)]
|
||||
[elttype (get-int #f)]
|
||||
[reader (typeid->type-reader 'recv-array elttype)]
|
||||
[bounds
|
||||
(for/list ([i (in-range ndim)])
|
||||
(let* ([dim (get-int #t)]
|
||||
[lbound (get-int #t)])
|
||||
(cons dim lbound)))]
|
||||
[vals ;; (vector^ndim X)
|
||||
(cond [(zero? ndim) '#()]
|
||||
[else
|
||||
(let loop ([bounds bounds])
|
||||
(cond [(pair? bounds)
|
||||
(for/vector ([i (in-range (car (car bounds)))])
|
||||
(loop (cdr bounds)))]
|
||||
[else
|
||||
(let* ([len (get-int #t)])
|
||||
(cond [(= len -1) sql-null]
|
||||
[else (reader (get-bytes len))]))]))])])
|
||||
(pg-array ndim (map car bounds) (map cdr bounds) vals))))
|
||||
(define (recv-array buf start end)
|
||||
(define (get-int signed?)
|
||||
(begin0 (integer-bytes->integer buf signed? #t start (+ start 4))
|
||||
(set! start (+ start 4))))
|
||||
(let* ([ndim (get-int #t)]
|
||||
[flags (get-int #f)]
|
||||
[elttype (get-int #f)]
|
||||
[reader (typeid->type-reader 'recv-array elttype)]
|
||||
[bounds
|
||||
(for/list ([i (in-range ndim)])
|
||||
(let* ([dim (get-int #t)]
|
||||
[lbound (get-int #t)])
|
||||
(cons dim lbound)))]
|
||||
[vals ;; (vector^ndim X)
|
||||
(cond [(zero? ndim) '#()]
|
||||
[else
|
||||
(let loop ([bounds bounds])
|
||||
(cond [(pair? bounds)
|
||||
(for/vector ([i (in-range (car (car bounds)))])
|
||||
(loop (cdr bounds)))]
|
||||
[else
|
||||
(let* ([len (get-int #t)])
|
||||
(cond [(= len -1) sql-null]
|
||||
[else
|
||||
(begin0 (reader buf start (+ start len))
|
||||
(set! start (+ start len)))]))]))])])
|
||||
(pg-array ndim (map car bounds) (map cdr bounds) vals)))
|
||||
|
||||
#|
|
||||
(define (recv-numeric x)
|
||||
|
@ -538,7 +537,7 @@ record = cols:int4 (typeoid:int4 len/-1:int4 data:byte^len)^cols
|
|||
c-parse-timestamp-tz
|
||||
c-parse-interval
|
||||
c-parse-decimal)
|
||||
(let ([c (lambda (f) (lambda (x) (f (bytes->string/utf-8 x))))])
|
||||
(let ([c (lambda (f) (lambda (buf start end) (f (bytes->string/utf-8 buf #f start end))))])
|
||||
(values (c parse-date)
|
||||
(c parse-time)
|
||||
(c parse-time-tz)
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
(struct-out RowDescription)
|
||||
(struct-out Sync)
|
||||
(struct-out Terminate)
|
||||
|
||||
bytes->row
|
||||
field-dvec->typeid
|
||||
field-dvec->field-info)
|
||||
|
||||
|
@ -288,17 +288,25 @@
|
|||
(io:read-int16 p))])
|
||||
(make-CopyOutResponse format column-formats))))
|
||||
|
||||
(define-struct DataRow (values) #:transparent)
|
||||
(define-struct DataRow (data) #:transparent)
|
||||
(define (parse:DataRow p)
|
||||
(with-length-in p #\D
|
||||
(let* ([values
|
||||
(build-vector (io:read-int16 p)
|
||||
(lambda (i)
|
||||
(let ([len (io:read-int32 p)])
|
||||
(if (= len -1)
|
||||
sql-null
|
||||
(io:read-bytes-as-bytes p len)))))])
|
||||
(make-DataRow values))))
|
||||
(let ([len (io:read-int32 p)])
|
||||
(make-DataRow (read-bytes (- len 4) p))))
|
||||
|
||||
(define (bytes->row buf type-readers)
|
||||
(let* ([columns (integer-bytes->integer buf #t #t 0 2)]
|
||||
[row (make-vector columns)])
|
||||
(let loop ([i 0] [type-readers type-readers] [start 2])
|
||||
(when (< i columns)
|
||||
(let ([len (integer-bytes->integer buf #t #t start (+ start 4))]
|
||||
[start* (+ start 4)])
|
||||
(vector-set! row i
|
||||
(if (= len -1)
|
||||
sql-null
|
||||
((car type-readers) buf start* (+ start* len))))
|
||||
(let ([next (+ start* (max 0 len))])
|
||||
(loop (add1 i) (cdr type-readers) next)))))
|
||||
row))
|
||||
|
||||
(define-struct Describe (type name) #:transparent)
|
||||
(define (write:Describe p v)
|
||||
|
|
Loading…
Reference in New Issue
Block a user