From e0de33a00563504396b7c095e04bc4736cf4b71d Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Thu, 13 Dec 2012 07:45:26 -0700 Subject: [PATCH] net/ftp: make progress proc keyword, change progress protocol The revised protocol for a progress procedure doesn't create the thread automatically, and it provides an event to indicate when the progress count changes. --- collects/net/ftp.rkt | 66 ++++++++++++------------------ collects/net/scribblings/ftp.scrbl | 60 ++++++++++++++++----------- collects/tests/net/ftp.rkt | 13 +++++- 3 files changed, 74 insertions(+), 65 deletions(-) diff --git a/collects/net/ftp.rkt b/collects/net/ftp.rkt index c9f7ebf6f3..431f469545 100644 --- a/collects/net/ftp.rkt +++ b/collects/net/ftp.rkt @@ -132,9 +132,9 @@ (tcp-abandon-port tcp-data-in) tcp-data-out)))))) -;; 230? is var. It always keep last line's action result. The lambda in this +;; 230? is var. It always keep last line's action result. The lambda in this ;; ftp-check-response means: -;; "if one line's head is 230, then this ftp server do not +;; "if one line's head is 230, then this ftp server do not ;; need PASS command. "or 230? (rege..." means if 230? is true already ;; , then do not check the line anymore, it's just true. (define (ftp-establish-connection* in out username password) @@ -197,7 +197,8 @@ (define r `(,(car m) ,@(cddr m))) (if size `(,@r ,size) r))) -(define (ftp-download-file ftp-ports folder filename [progress-proc #f]) +(define (ftp-download-file ftp-ports folder filename + #:progress [progress-proc #f]) ;; Save the file under the name tmp.file, rename it once download is ;; complete this assures we don't over write any existing file without ;; having a good file down @@ -209,7 +210,8 @@ (rename-file-or-directory tmpfile (build-path folder filename) #t))) -(define (ftp-upload-file ftp-ports filepath [progress-proc #f]) +(define (ftp-upload-file ftp-ports filepath + #:progress [progress-proc #f]) (let ([upload-file (open-input-file filepath)] [tcp-data (establish-data-connection ftp-ports 'out)]) @@ -219,17 +221,19 @@ (set! splitter "/") (set! splitter "\\\\")) - (transfer-data ftp-ports 'upload upload-file tcp-data (last (regexp-split (regexp splitter) filepath)) progress-proc)))) + (transfer-data ftp-ports 'upload upload-file tcp-data + (last (regexp-split (regexp splitter) filepath)) + progress-proc)))) ;; download and upload's share part -(define (transfer-data ftp-ports command from to filename [progress-proc #f]) +(define (transfer-data ftp-ports command from to filename progress-proc) (let ([inner-command ""]) (cond [(eq? command 'upload) (set! inner-command "STOR")] [(eq? command 'download) (set! inner-command "RETR")]) - + (fprintf (ftp-connection-out ftp-ports) "~a ~a\r\n" inner-command filename)) (ftp-check-response (ftp-connection-in ftp-ports) @@ -238,40 +242,24 @@ (let ([rcv-ch (make-channel)] [ctrl-ch (make-channel)] - [bytes-tranferred 0]) + [transfer-pair (cons 0 (make-semaphore))]) - (when (procedure? progress-proc) - (thread - (lambda () - (progress-proc rcv-ch ctrl-ch))) + (when progress-proc + (progress-proc (lambda () + (define p transfer-pair) + (values (car p) (semaphore-peek-evt (cdr p)))))) - (thread - (lambda () - (letrec ([loop - (lambda () - (channel-put rcv-ch bytes-tranferred) - (when (= 0 (channel-get ctrl-ch)) - (loop)))]) - (loop))))) - - (letrec ([loop - (lambda (read-from write-to) - (let ([bstr (read-bytes 40960 read-from)]) - (unless (eof-object? bstr) - (set! bytes-tranferred (+ bytes-tranferred (write-bytes bstr write-to))) - (loop read-from write-to))))]) - - (loop from to)) - - (when (procedure? progress-proc) - ; stop receiver thread - (channel-put rcv-ch bytes-tranferred) - (channel-get ctrl-ch) - (channel-put rcv-ch -1) - - ; stop sender thread - (channel-get rcv-ch) - (channel-put ctrl-ch -1))) + (define bstr (make-bytes 40960)) + (let loop () + (let ([n (read-bytes! bstr from)]) + (unless (eof-object? n) + (define sent (write-bytes bstr to 0 n)) + (when progress-proc + (define old-pair transfer-pair) + (set! transfer-pair (cons (+ sent (car old-pair)) + (make-semaphore))) + (semaphore-post (cdr old-pair))) + (loop))))) (close-input-port from) (close-output-port to) diff --git a/collects/net/scribblings/ftp.scrbl b/collects/net/scribblings/ftp.scrbl index da3fcd6f36..8a4c66930a 100644 --- a/collects/net/scribblings/ftp.scrbl +++ b/collects/net/scribblings/ftp.scrbl @@ -79,7 +79,13 @@ this information can be unreliable.} @defproc[(ftp-download-file [ftp-conn ftp-connection?] [local-dir path-string?] [file string?] - [progress-proc procedure? #f]) void?]{ + [#:progress progress-proc + (or/c #f + (-> (-> (values exact-nonnegative-integer? + evt?)) + any)) + #f]) + void?]{ Downloads @racket[file] from the server's current directory and puts it in @racket[local-dir] using the same name. If the file already @@ -87,37 +93,43 @@ exists in the local directory, it is replaced, but only after the transfer succeeds (i.e., the file is first downloaded to a temporary file, then moved into place on success). -@racket[progress-proc] is a @racket[(-> channel? channel? any?)], means @racket[(progress-proc receive-channel control-channel)]. -Inside the @racket[progress-proc], use @racket[(channel-get receive-channel)] to get bytes count has downloaded(uploaded). -After @racket[(channel-get receive-channel)], use @racket[(channel-put control-channel 0)] to launch sender to get a new bytes count. - -@racket[-1] means "transfer completed" @racket[0] means "normal" - -Warning: Do something between get and put, not "refresh too fast", this will slow down the transfer speed. - -@racket[Example:] +If @racket[progress-proc] is not @racket[#f], then it is called with a +function @racket[_get-count] that returns two values: the number of bytes +transferred so far, and an event that becomes ready when the +transferred-bye count changes. The @racket[_get-count] function can be +called in any thread and any number of times. The @racket[progress-proc] function should +return immediately, perhaps starting a thread that periodically polls +@racket[_get-count]. Do not poll too frequently, or else polling +will slow the transfer; the second argument from @racket[_get-count] +is intended to limit polling. @racketblock[ -(ftp-download-file - ftp-conn "." "testfile" - (lambda (rcv-ch ctrl-ch) - (letrec ([loop - (lambda () - (let ([data (channel-get rcv-ch)]) - (unless (= data -1) - (channel-put ctrl-ch 0) - (printf "[~a] bytes has downloaded~%" data) - (loop))))]) - (loop))))] -} +(ftp-download-file + ftp-conn "." "testfile" + (lambda (get-count) + (thread + (lambda () + (let loop () + (define-values (count changed-evt) (get-count)) + (printf "~a bytes downloaded\n" count) + (sync changed-evt) + (loop)))))) +]} @defproc[(ftp-upload-file [ftp-conn ftp-connection?] [file-path path-string?] - [progress-proc procedure? #f]) void?]{ + [#:progress progress-proc + (or/c #f + (-> (-> (values exact-nonnegative-integer? + evt?)) + any)) + #f]) + void?]{ Upload @racket[file-path] to the server's current directory using the same name. If the file already exists in the local directory, it is replaced. -@racket[progress-proc] usage is same as @racket[ftp-download-file].} +The @racket[progress-proc] argument is used in the same way as in @racket[ftp-download-file], +but to report uploaded bytes instead of downloaded bytes.} @defproc[(ftp-delete-file [ftp-conn ftp-connection?] [file-path path-string?]) void?]{ diff --git a/collects/tests/net/ftp.rkt b/collects/tests/net/ftp.rkt index 06b7d273c5..fea640bb53 100644 --- a/collects/tests/net/ftp.rkt +++ b/collects/tests/net/ftp.rkt @@ -22,6 +22,15 @@ (define-values [thd port] (tcp-serve dest text)) (values thd (port->splitstr port))) +(define ((progress-proc dir) get-count) + (thread + (lambda () + (let loop () + (define-values (count changed-evt) (get-count)) + (printf "~a bytes ~aloaded\n" count dir) + (sync changed-evt) + (loop))))) + (provide tests) (module+ main (tests)) (define (tests) @@ -46,8 +55,8 @@ (for ([f (in-list (ftp-directory-list conn))]) (match-define (list* type ftp-date name ?size) f) (test (ftp-make-file-seconds ftp-date))) - (ftp-download-file conn tmp-dir pth) - (ftp-upload-file conn (path->string (build-path tmp-dir pth))) + (ftp-download-file conn tmp-dir pth #:progress (progress-proc "down")) + (ftp-upload-file conn (path->string (build-path tmp-dir pth)) #:progress (progress-proc "up")) (ftp-delete-file conn "3dldf/test.file") (ftp-make-directory conn "test") (ftp-delete-directory conn "test")