net/ftp: make progress proc keyword, change progress protocol

The revised protocol for a progress procedure doesn't create
the thread automatically, and it provides an event to indicate
when the progress count changes.
This commit is contained in:
Matthew Flatt 2012-12-13 07:45:26 -07:00
parent 4cc287f7e5
commit e0de33a005
3 changed files with 74 additions and 65 deletions

View File

@ -132,9 +132,9 @@
(tcp-abandon-port tcp-data-in) (tcp-abandon-port tcp-data-in)
tcp-data-out)))))) tcp-data-out))))))
;; 230? is var. It always keep last line's action result. The lambda in this ;; 230? is var. It always keep last line's action result. The lambda in this
;; ftp-check-response means: ;; ftp-check-response means:
;; "if one line's head is 230, then this ftp server do not ;; "if one line's head is 230, then this ftp server do not
;; need PASS command. "or 230? (rege..." means if 230? is true already ;; need PASS command. "or 230? (rege..." means if 230? is true already
;; , then do not check the line anymore, it's just true. ;; , then do not check the line anymore, it's just true.
(define (ftp-establish-connection* in out username password) (define (ftp-establish-connection* in out username password)
@ -197,7 +197,8 @@
(define r `(,(car m) ,@(cddr m))) (define r `(,(car m) ,@(cddr m)))
(if size `(,@r ,size) r))) (if size `(,@r ,size) r)))
(define (ftp-download-file ftp-ports folder filename [progress-proc #f]) (define (ftp-download-file ftp-ports folder filename
#:progress [progress-proc #f])
;; Save the file under the name tmp.file, rename it once download is ;; Save the file under the name tmp.file, rename it once download is
;; complete this assures we don't over write any existing file without ;; complete this assures we don't over write any existing file without
;; having a good file down ;; having a good file down
@ -209,7 +210,8 @@
(rename-file-or-directory tmpfile (build-path folder filename) #t))) (rename-file-or-directory tmpfile (build-path folder filename) #t)))
(define (ftp-upload-file ftp-ports filepath [progress-proc #f]) (define (ftp-upload-file ftp-ports filepath
#:progress [progress-proc #f])
(let ([upload-file (open-input-file filepath)] (let ([upload-file (open-input-file filepath)]
[tcp-data (establish-data-connection ftp-ports 'out)]) [tcp-data (establish-data-connection ftp-ports 'out)])
@ -219,17 +221,19 @@
(set! splitter "/") (set! splitter "/")
(set! splitter "\\\\")) (set! splitter "\\\\"))
(transfer-data ftp-ports 'upload upload-file tcp-data (last (regexp-split (regexp splitter) filepath)) progress-proc)))) (transfer-data ftp-ports 'upload upload-file tcp-data
(last (regexp-split (regexp splitter) filepath))
progress-proc))))
;; download and upload's share part ;; download and upload's share part
(define (transfer-data ftp-ports command from to filename [progress-proc #f]) (define (transfer-data ftp-ports command from to filename progress-proc)
(let ([inner-command ""]) (let ([inner-command ""])
(cond (cond
[(eq? command 'upload) [(eq? command 'upload)
(set! inner-command "STOR")] (set! inner-command "STOR")]
[(eq? command 'download) [(eq? command 'download)
(set! inner-command "RETR")]) (set! inner-command "RETR")])
(fprintf (ftp-connection-out ftp-ports) "~a ~a\r\n" inner-command filename)) (fprintf (ftp-connection-out ftp-ports) "~a ~a\r\n" inner-command filename))
(ftp-check-response (ftp-connection-in ftp-ports) (ftp-check-response (ftp-connection-in ftp-ports)
@ -238,40 +242,24 @@
(let ([rcv-ch (make-channel)] (let ([rcv-ch (make-channel)]
[ctrl-ch (make-channel)] [ctrl-ch (make-channel)]
[bytes-tranferred 0]) [transfer-pair (cons 0 (make-semaphore))])
(when (procedure? progress-proc) (when progress-proc
(thread (progress-proc (lambda ()
(lambda () (define p transfer-pair)
(progress-proc rcv-ch ctrl-ch))) (values (car p) (semaphore-peek-evt (cdr p))))))
(thread (define bstr (make-bytes 40960))
(lambda () (let loop ()
(letrec ([loop (let ([n (read-bytes! bstr from)])
(lambda () (unless (eof-object? n)
(channel-put rcv-ch bytes-tranferred) (define sent (write-bytes bstr to 0 n))
(when (= 0 (channel-get ctrl-ch)) (when progress-proc
(loop)))]) (define old-pair transfer-pair)
(loop))))) (set! transfer-pair (cons (+ sent (car old-pair))
(make-semaphore)))
(letrec ([loop (semaphore-post (cdr old-pair)))
(lambda (read-from write-to) (loop)))))
(let ([bstr (read-bytes 40960 read-from)])
(unless (eof-object? bstr)
(set! bytes-tranferred (+ bytes-tranferred (write-bytes bstr write-to)))
(loop read-from write-to))))])
(loop from to))
(when (procedure? progress-proc)
; stop receiver thread
(channel-put rcv-ch bytes-tranferred)
(channel-get ctrl-ch)
(channel-put rcv-ch -1)
; stop sender thread
(channel-get rcv-ch)
(channel-put ctrl-ch -1)))
(close-input-port from) (close-input-port from)
(close-output-port to) (close-output-port to)

View File

@ -79,7 +79,13 @@ this information can be unreliable.}
@defproc[(ftp-download-file [ftp-conn ftp-connection?] @defproc[(ftp-download-file [ftp-conn ftp-connection?]
[local-dir path-string?] [local-dir path-string?]
[file string?] [file string?]
[progress-proc procedure? #f]) void?]{ [#:progress progress-proc
(or/c #f
(-> (-> (values exact-nonnegative-integer?
evt?))
any))
#f])
void?]{
Downloads @racket[file] from the server's current directory and puts Downloads @racket[file] from the server's current directory and puts
it in @racket[local-dir] using the same name. If the file already it in @racket[local-dir] using the same name. If the file already
@ -87,37 +93,43 @@ exists in the local directory, it is replaced, but only after the
transfer succeeds (i.e., the file is first downloaded to a temporary transfer succeeds (i.e., the file is first downloaded to a temporary
file, then moved into place on success). file, then moved into place on success).
@racket[progress-proc] is a @racket[(-> channel? channel? any?)], means @racket[(progress-proc receive-channel control-channel)]. If @racket[progress-proc] is not @racket[#f], then it is called with a
Inside the @racket[progress-proc], use @racket[(channel-get receive-channel)] to get bytes count has downloaded(uploaded). function @racket[_get-count] that returns two values: the number of bytes
After @racket[(channel-get receive-channel)], use @racket[(channel-put control-channel 0)] to launch sender to get a new bytes count. transferred so far, and an event that becomes ready when the
transferred-bye count changes. The @racket[_get-count] function can be
@racket[-1] means "transfer completed" @racket[0] means "normal" called in any thread and any number of times. The @racket[progress-proc] function should
return immediately, perhaps starting a thread that periodically polls
Warning: Do something between get and put, not "refresh too fast", this will slow down the transfer speed. @racket[_get-count]. Do not poll too frequently, or else polling
will slow the transfer; the second argument from @racket[_get-count]
@racket[Example:] is intended to limit polling.
@racketblock[ @racketblock[
(ftp-download-file (ftp-download-file
ftp-conn "." "testfile" ftp-conn "." "testfile"
(lambda (rcv-ch ctrl-ch) (lambda (get-count)
(letrec ([loop (thread
(lambda () (lambda ()
(let ([data (channel-get rcv-ch)]) (let loop ()
(unless (= data -1) (define-values (count changed-evt) (get-count))
(channel-put ctrl-ch 0) (printf "~a bytes downloaded\n" count)
(printf "[~a] bytes has downloaded~%" data) (sync changed-evt)
(loop))))]) (loop))))))
(loop))))] ]}
}
@defproc[(ftp-upload-file [ftp-conn ftp-connection?] @defproc[(ftp-upload-file [ftp-conn ftp-connection?]
[file-path path-string?] [file-path path-string?]
[progress-proc procedure? #f]) void?]{ [#:progress progress-proc
(or/c #f
(-> (-> (values exact-nonnegative-integer?
evt?))
any))
#f])
void?]{
Upload @racket[file-path] to the server's current directory using the same name. Upload @racket[file-path] to the server's current directory using the same name.
If the file already exists in the local directory, it is replaced. If the file already exists in the local directory, it is replaced.
@racket[progress-proc] usage is same as @racket[ftp-download-file].} The @racket[progress-proc] argument is used in the same way as in @racket[ftp-download-file],
but to report uploaded bytes instead of downloaded bytes.}
@defproc[(ftp-delete-file [ftp-conn ftp-connection?] @defproc[(ftp-delete-file [ftp-conn ftp-connection?]
[file-path path-string?]) void?]{ [file-path path-string?]) void?]{

View File

@ -22,6 +22,15 @@
(define-values [thd port] (tcp-serve dest text)) (define-values [thd port] (tcp-serve dest text))
(values thd (port->splitstr port))) (values thd (port->splitstr port)))
(define ((progress-proc dir) get-count)
(thread
(lambda ()
(let loop ()
(define-values (count changed-evt) (get-count))
(printf "~a bytes ~aloaded\n" count dir)
(sync changed-evt)
(loop)))))
(provide tests) (provide tests)
(module+ main (tests)) (module+ main (tests))
(define (tests) (define (tests)
@ -46,8 +55,8 @@
(for ([f (in-list (ftp-directory-list conn))]) (for ([f (in-list (ftp-directory-list conn))])
(match-define (list* type ftp-date name ?size) f) (match-define (list* type ftp-date name ?size) f)
(test (ftp-make-file-seconds ftp-date))) (test (ftp-make-file-seconds ftp-date)))
(ftp-download-file conn tmp-dir pth) (ftp-download-file conn tmp-dir pth #:progress (progress-proc "down"))
(ftp-upload-file conn (path->string (build-path tmp-dir pth))) (ftp-upload-file conn (path->string (build-path tmp-dir pth)) #:progress (progress-proc "up"))
(ftp-delete-file conn "3dldf/test.file") (ftp-delete-file conn "3dldf/test.file")
(ftp-make-directory conn "test") (ftp-make-directory conn "test")
(ftp-delete-directory conn "test") (ftp-delete-directory conn "test")