From 5aebb1c53931b10480a0d400a7354f14a03b2fa8 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Wed, 9 Sep 2015 20:53:27 -0600 Subject: [PATCH] raco pkg: get catalog info and checksums in parallel When `raco pkg {install,update}` has a list of packages to check, first run through the list in a "prefetch" mode to create futures to fetch the information. Issuing requests to servers in parallel can greatly speed up `raco pkg update --all`. Package content is still downloaded sequentially. --- racket/collects/pkg/private/install.rkt | 592 +++++++++++++---------- racket/collects/pkg/private/prefetch.rkt | 187 +++++++ racket/collects/pkg/private/stage.rkt | 46 +- 3 files changed, 555 insertions(+), 270 deletions(-) create mode 100644 racket/collects/pkg/private/prefetch.rkt diff --git a/racket/collects/pkg/private/install.rkt b/racket/collects/pkg/private/install.rkt index 9eb7aaf1f8..4b6ff339a0 100644 --- a/racket/collects/pkg/private/install.rkt +++ b/racket/collects/pkg/private/install.rkt @@ -31,19 +31,29 @@ "orig-pkg.rkt" "info-to-desc.rkt" "git.rkt" - "check-will-exist.rkt") + "check-will-exist.rkt" + "prefetch.rkt") (provide pkg-install pkg-update) +;; A [prefetch-shared] annotation means that a hash table is shared +;; with prefetch threads. If a prefetch group is terminated, then all +;; prefetch-shared tables must be abaondoned, because a thread with a +;; lock can be terminated. + (define (checksum-for-pkg-source pkg-source type pkg-name given-checksum download-printf - #:catalog-lookup-cache [catalog-lookup-cache #f] - #:remote-checksum-cache [remote-checksum-cache #f]) + #:prefetch? [prefetch? #f] + #:prefetch-group [prefetch-group #f] + #:catalog-lookup-cache [catalog-lookup-cache #f] ; [prefetch-shared] + #:remote-checksum-cache [remote-checksum-cache #f]) ; [prefetch-shared] (case type [(file-url dir-url github git clone) (or given-checksum (remote-package-checksum `(url ,pkg-source) download-printf pkg-name #:type type + #:prefetch? prefetch? + #:prefetch-group prefetch-group #:catalog-lookup-cache catalog-lookup-cache #:remote-checksum-cache remote-checksum-cache))] [(file) @@ -56,6 +66,8 @@ (or given-checksum (remote-package-checksum `(catalog ,pkg-source) download-printf pkg-name #:type type + #:prefetch? prefetch? + #:prefetch-group prefetch-group #:catalog-lookup-cache catalog-lookup-cache #:remote-checksum-cache remote-checksum-cache))] [else given-checksum])) @@ -134,8 +146,9 @@ #:update-deps? update-deps? #:update-implies? update-implies? #:update-cache update-cache - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache + #:prefetch-group prefetch-group + #:catalog-lookup-cache catalog-lookup-cache ; [prefetch-shared] + #:remote-checksum-cache remote-checksum-cache ; [prefetch-shared] #:updating? updating-all? #:extra-updating extra-updating #:ignore-checksums? ignore-checksums? @@ -416,20 +429,23 @@ (or do-update-deps? (set-member? implies name)) (not (hash-ref simultaneous-installs name #f)) - ((packages-to-update download-printf current-scope-db - #:must-update? #f - #:deps? do-update-deps? - #:implies? update-implies? - #:update-cache update-cache - #:namespace metadata-ns - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache - #:all-platforms? all-platforms? - #:ignore-checksums? ignore-checksums? - #:use-cache? use-cache? - #:from-command-line? from-command-line? - #:link-dirs? link-dirs?) - name)) + (let ([updater + (packages-to-update download-printf current-scope-db + #:must-update? #f + #:deps? do-update-deps? + #:implies? update-implies? + #:update-cache update-cache + #:prefetch-group prefetch-group + #:namespace metadata-ns + #:catalog-lookup-cache catalog-lookup-cache + #:remote-checksum-cache remote-checksum-cache + #:all-platforms? all-platforms? + #:ignore-checksums? ignore-checksums? + #:use-cache? use-cache? + #:from-command-line? from-command-line? + #:link-dirs? link-dirs?)]) + (updater #:prefetch? #t name) + (updater name))) null)) deps)) (and (not (empty? update-pkgs)) @@ -527,19 +543,21 @@ (define update-pkgs (map car update-deps)) (define (make-pre-succeed) (define db current-scope-db) - (let ([to-update (append-map (packages-to-update download-printf db - #:deps? update-deps? - #:implies? update-implies? - #:update-cache update-cache - #:namespace metadata-ns - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache - #:all-platforms? all-platforms? - #:ignore-checksums? ignore-checksums? - #:use-cache? use-cache? - #:from-command-line? from-command-line? - #:link-dirs? link-dirs?) - update-pkgs)]) + (let ([to-update (let ([updater (packages-to-update download-printf db + #:deps? update-deps? + #:implies? update-implies? + #:update-cache update-cache + #:prefetch-group prefetch-group + #:namespace metadata-ns + #:catalog-lookup-cache catalog-lookup-cache + #:remote-checksum-cache remote-checksum-cache + #:all-platforms? all-platforms? + #:ignore-checksums? ignore-checksums? + #:use-cache? use-cache? + #:from-command-line? from-command-line? + #:link-dirs? link-dirs?)]) + (for ([pkg (in-list update-pkgs)]) (updater #:prefetch? #t pkg)) + (append-map updater update-pkgs))]) (λ () (for-each (compose (remove-package #t quiet? use-trash?) pkg-desc-name) to-update)))) (match this-dep-behavior ['fail @@ -820,8 +838,9 @@ #:update-deps? [update-deps? #f] #:update-implies? [update-implies? #t] #:update-cache [update-cache (make-hash)] - #:catalog-lookup-cache [catalog-lookup-cache (make-hash)] - #:remote-checksum-cache [remote-checksum-cache (make-hash)] + #:prefetch-group [prefetch-group (make-prefetch-group)] + #:catalog-lookup-cache [catalog-lookup-cache (make-hash)] ; [prefetch-shared] + #:remote-checksum-cache [remote-checksum-cache (make-hash)] ; [prefetch-shared] #:check-pkg-early? [check-pkg-early? #t] #:updating? [updating? #f] #:quiet? [quiet? #f] @@ -874,89 +893,95 @@ download-printf from-command-line? convert-to-non-clone?)) - (with-handlers* ([vector? - (match-lambda - [(vector updating? new-infos dep-pkg deps more-pre-succeed conv clone-info) - (pkg-install - #:summary-deps (snoc summary-deps (vector dep-pkg deps)) - #:old-infos new-infos - #:old-descs (append done-descs new-descs) - #:all-platforms? all-platforms? - #:force? force - #:ignore-checksums? ignore-checksums? - #:strict-doc-conflicts? strict-doc-conflicts? - #:use-cache? use-cache? - #:dep-behavior dep-behavior - #:update-deps? update-deps? - #:update-implies? update-implies? - #:update-cache update-cache - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache - #:check-pkg-early? #f - #:pre-succeed (lambda () (pre-succeed) (more-pre-succeed)) - #:updating? updating? - #:quiet? quiet? - #:use-trash? use-trash? - #:from-command-line? from-command-line? - #:conversation conv - #:strip strip-mode - #:force-strip? force-strip? - #:multi-clone-behavior (vector-ref clone-info 0) - #:repo-descs (vector-ref clone-info 1) - #:pull-behavior pull-behavior - (for/list ([dep (in-list deps)]) - (if (pkg-desc? dep) - dep - (pkg-desc dep #f #f #f #t #f))))])]) - (begin0 - (install-packages - #:old-infos done-infos - #:old-descs done-descs - #:all-platforms? all-platforms? - #:force? force - #:ignore-checksums? ignore-checksums? - #:use-cache? use-cache? - #:skip-installed? skip-installed? - #:dep-behavior dep-behavior - #:update-deps? update-deps? - #:update-implies? update-implies? - #:update-cache update-cache - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache - #:pre-succeed (λ () - (for ([pkg-name (in-hash-keys extra-updating)]) - ((remove-package #t quiet? use-trash?) pkg-name)) - (pre-succeed)) - #:updating? updating? - #:extra-updating extra-updating - #:quiet? quiet? - #:use-trash? use-trash? - #:from-command-line? from-command-line? - #:conversation conversation - #:strip strip-mode - #:force-strip? force-strip? - #:link-dirs? link-dirs? - #:local-docs-ok? (not strict-doc-conflicts?) - #:ai-cache (box #f) - #:clone-info (vector clone-behavior - repo-descs) - #:pull-behavior pull-behavior - new-descs) - (unless (empty? summary-deps) - (unless quiet? - (printf/flush "The following~a packages were listed as dependencies~a:~a\n" - (if updating? " out-of-date" " uninstalled") - (format "\nand they were ~a~a" - (if (eq? dep-behavior 'search-auto) "automatically " "") - (if updating? "updated" "installed")) - (string-append* - (for/list ([p*ds (in-list summary-deps)]) - (match-define (vector n ds) p*ds) - (format "\n dependencies of ~a:~a" - n - (if updating? - (format-deps ds) - (format-list ds))))))))))) + + (call-with-prefetch-cleanup + prefetch-group + (lambda () + (with-handlers* ([vector? + (match-lambda + [(vector updating? new-infos dep-pkg deps more-pre-succeed conv clone-info) + (pkg-install + #:summary-deps (snoc summary-deps (vector dep-pkg deps)) + #:old-infos new-infos + #:old-descs (append done-descs new-descs) + #:all-platforms? all-platforms? + #:force? force + #:ignore-checksums? ignore-checksums? + #:strict-doc-conflicts? strict-doc-conflicts? + #:use-cache? use-cache? + #:dep-behavior dep-behavior + #:update-deps? update-deps? + #:update-implies? update-implies? + #:update-cache update-cache + #:prefetch-group prefetch-group + #:catalog-lookup-cache catalog-lookup-cache + #:remote-checksum-cache remote-checksum-cache + #:check-pkg-early? #f + #:pre-succeed (lambda () (pre-succeed) (more-pre-succeed)) + #:updating? updating? + #:quiet? quiet? + #:use-trash? use-trash? + #:from-command-line? from-command-line? + #:conversation conv + #:strip strip-mode + #:force-strip? force-strip? + #:multi-clone-behavior (vector-ref clone-info 0) + #:repo-descs (vector-ref clone-info 1) + #:pull-behavior pull-behavior + (for/list ([dep (in-list deps)]) + (if (pkg-desc? dep) + dep + (pkg-desc dep #f #f #f #t #f))))])]) + (begin0 + (install-packages + #:old-infos done-infos + #:old-descs done-descs + #:all-platforms? all-platforms? + #:force? force + #:ignore-checksums? ignore-checksums? + #:use-cache? use-cache? + #:skip-installed? skip-installed? + #:dep-behavior dep-behavior + #:update-deps? update-deps? + #:update-implies? update-implies? + #:update-cache update-cache + #:prefetch-group prefetch-group + #:catalog-lookup-cache catalog-lookup-cache + #:remote-checksum-cache remote-checksum-cache + #:pre-succeed (λ () + (for ([pkg-name (in-hash-keys extra-updating)]) + ((remove-package #t quiet? use-trash?) pkg-name)) + (pre-succeed)) + #:updating? updating? + #:extra-updating extra-updating + #:quiet? quiet? + #:use-trash? use-trash? + #:from-command-line? from-command-line? + #:conversation conversation + #:strip strip-mode + #:force-strip? force-strip? + #:link-dirs? link-dirs? + #:local-docs-ok? (not strict-doc-conflicts?) + #:ai-cache (box #f) + #:clone-info (vector clone-behavior + repo-descs) + #:pull-behavior pull-behavior + new-descs) + (unless (empty? summary-deps) + (unless quiet? + (printf/flush "The following~a packages were listed as dependencies~a:~a\n" + (if updating? " out-of-date" " uninstalled") + (format "\nand they were ~a~a" + (if (eq? dep-behavior 'search-auto) "automatically " "") + (if updating? "updated" "installed")) + (string-append* + (for/list ([p*ds (in-list summary-deps)]) + (match-define (vector n ds) p*ds) + (format "\n dependencies of ~a:~a" + n + (if updating? + (format-deps ds) + (format-list ds))))))))))))) ;; Determine packages to update, starting with `pkg-name'. If `pkg-name' ;; needs to be updated, return it in a list. Otherwise, if `deps?', @@ -974,9 +999,10 @@ #:deps? deps? #:implies? implies? #:namespace metadata-ns - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache + #:catalog-lookup-cache catalog-lookup-cache ; [prefetch-shared] + #:remote-checksum-cache remote-checksum-cache ; [prefetch-shared] #:update-cache update-cache + #:prefetch-group prefetch-group #:all-platforms? all-platforms? #:ignore-checksums? ignore-checksums? #:use-cache? use-cache? @@ -985,11 +1011,17 @@ #:skip-uninstalled? [skip-uninstalled? #f] #:all-mode? [all-mode? #f] #:force-update? [force-update? #f]) - pkg-name) + pkg-name + ;; In prefetch mode, do as much work as possible to generate + ;; server requests without waiting for results and without + ;; making any other state changes --- but forced errors are + ;; ok. + #:prefetch? [prefetch? #f]) (let update-loop ([pkg-name pkg-name] [must-update? must-update?] [force-update? force-update?] - [report-skip? #t]) + [report-skip? #t] + [prefetch? prefetch?]) (cond [(pkg-desc? pkg-name) ;; Infer the package-source type and name: @@ -1013,46 +1045,59 @@ name (pkg-desc-checksum pkg-name) download-printf + #:prefetch? prefetch? + #:prefetch-group prefetch-group #:catalog-lookup-cache catalog-lookup-cache #:remote-checksum-cache remote-checksum-cache)) - (hash-set! update-cache name new-checksum) ; record downloaded checksum - (unless (or ignore-checksums? (not (pkg-desc-checksum pkg-name))) - (unless (equal? (pkg-desc-checksum pkg-name) new-checksum) - (pkg-error (~a "incorrect checksum on package\n" - " package source: ~a\n" - " expected: ~e\n" - " got: ~e") - (pkg-desc-source pkg-name) - (pkg-desc-checksum pkg-name) - new-checksum))) - - (if (or force-update? - (not (equal? (pkg-info-checksum info) - new-checksum)) - ;; No checksum available => always update - (not new-checksum) - ;; Different source => always update - (not (same-orig-pkg? (pkg-info-orig-pkg info) - (desc->orig-pkg type - (pkg-desc-source pkg-name) - (pkg-desc-extra-path pkg-name))))) - ;; Update: - (begin - (hash-set! update-cache (box name) #t) - (list (pkg-desc (pkg-desc-source pkg-name) - (pkg-desc-type pkg-name) - name - (pkg-desc-checksum pkg-name) - (pkg-desc-auto? pkg-name) - (or (pkg-desc-extra-path pkg-name) - (and (eq? type 'clone) - (current-directory)))))) - ;; No update needed, but maybe check dependencies: - (if (or deps? - implies?) - (update-loop name #f #f #f) - null))])] - [(hash-ref update-cache (box pkg-name) #f) + (cond + [prefetch? + ;; Don't proceed further if we're just issuing prefetches + null] + [else + (hash-set! update-cache name new-checksum) ; record downloaded checksum + (unless (or ignore-checksums? (not (pkg-desc-checksum pkg-name))) + (unless (equal? (pkg-desc-checksum pkg-name) new-checksum) + (pkg-error (~a "incorrect checksum on package\n" + " package source: ~a\n" + " expected: ~e\n" + " got: ~e") + (pkg-desc-source pkg-name) + (pkg-desc-checksum pkg-name) + new-checksum))) + + (if (or force-update? + ;; Different checksum => update + (not (equal? (pkg-info-checksum info) + new-checksum)) + ;; No checksum available => always update + (not new-checksum) + ;; Different source => always update + (not (same-orig-pkg? (pkg-info-orig-pkg info) + (desc->orig-pkg type + (pkg-desc-source pkg-name) + (pkg-desc-extra-path pkg-name))))) + ;; Update: + (begin + (hash-set! update-cache (box name) #t) + (list (pkg-desc (pkg-desc-source pkg-name) + (pkg-desc-type pkg-name) + name + (pkg-desc-checksum pkg-name) + (pkg-desc-auto? pkg-name) + (or (pkg-desc-extra-path pkg-name) + (and (eq? type 'clone) + (current-directory)))))) + ;; No update needed, but maybe check dependencies: + (if (or deps? + implies?) + (update-loop name #f #f #f prefetch?) + null))])])] + [(and prefetch? + (hash-ref (prefetch-group-in-progress prefetch-group) pkg-name #f)) + ;; Already covered for prefetch + null] + [(and (not prefetch?) + (hash-ref update-cache (box pkg-name) #f)) ;; package is already being updated null] ;; A string indicates that package source that should be @@ -1083,22 +1128,26 @@ ;; needing an update, even if it is installed as a link, so ;; that the user is asked about installing dependencies, etc. (log-pkg-debug "Missing dependencies of ~s: ~s" pkg-name missing-deps) - (update-loop (pkg-info->desc pkg-name info) #f #t #t)] + (update-loop (pkg-info->desc pkg-name info) #f #t #t prefetch?)] [else (k)])) (define (update-dependencies) - (hash-set! update-cache (box pkg-name) #t) + ;; Mark in progress: + (if prefetch? + (hash-set! (prefetch-group-in-progress prefetch-group) pkg-name #t) + (hash-set! update-cache (box pkg-name) #t)) + ;; Dependencies? (if (or deps? implies?) ;; Check dependencies (append-map - (lambda (dep) (update-loop dep #f #f #t)) + (lambda (dep) (update-loop dep #f #f #t prefetch?)) deps) null)) (define (skip/update-dependencies kind) (check-missing-dependencies (lambda () - (unless (or all-mode? (not report-skip?)) + (unless (or all-mode? (not report-skip?) prefetch?) (download-printf "Skipping update of ~a: ~a\n" kind pkg-name)) @@ -1148,20 +1197,27 @@ (hash-ref update-cache pkg-name (lambda () (remote-package-checksum orig-pkg download-printf pkg-name + #:prefetch? prefetch? + #:prefetch-group prefetch-group #:catalog-lookup-cache catalog-lookup-cache #:remote-checksum-cache remote-checksum-cache)))) ;; Record downloaded checksum: - (hash-set! update-cache pkg-name new-checksum) + (unless prefetch? + (hash-set! update-cache pkg-name new-checksum)) (or (and new-checksum (not (equal? checksum new-checksum)) ;; Update it: - (begin + (cond + [prefetch? + ;; Don't proceed further if we're just issuing prefetches + null] + [else ;; Flush cache of downloaded checksums, in case ;; there was a race between our checkig and updates on ;; the catalog server: (clear-checksums-in-cache! update-cache) (list (pkg-desc orig-pkg-source orig-pkg-type pkg-name #f auto? - orig-pkg-dir)))) + orig-pkg-dir))])) ;; Continue with dependencies, maybe (check-missing-dependencies update-dependencies))]))] [else null]))) @@ -1199,103 +1255,113 @@ (early-check-for-installed in-pkgs db #:wanted? #t)) in-pkgs])) (define update-cache (make-hash)) - (define catalog-lookup-cache (make-hash)) - (define remote-checksum-cache (make-hash)) - (define to-updat* (append-map (packages-to-update download-printf db - #:must-update? (and (not all-mode?) - (not update-deps?)) - #:deps? (or update-deps? - all-mode?) ; avoid races - #:implies? update-implies? - #:update-cache update-cache - #:namespace metadata-ns - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache - #:all-platforms? all-platforms? - #:ignore-checksums? ignore-checksums? - #:use-cache? use-cache? - #:from-command-line? from-command-line? - #:skip-uninstalled? skip-uninstalled? - #:link-dirs? link-dirs? - #:all-mode? all-mode?) - (map (compose - (if infer-clone-from-dir? - (convert-directory-to-installed-clone db) - values) - (if lookup-for-clone? - (convert-clone-name-to-clone-repo/install catalog-lookup-cache - download-printf) - (convert-clone-name-to-clone-repo/update db - skip-uninstalled? - from-command-line?))) - pkgs))) - (cond - [(empty? pkgs) - (unless quiet? - (cond - [all? - (printf/flush (~a "No updates available; no packages installed in ~a scope\n") - (current-pkg-scope))] - [else - (printf/flush (~a "No packages given to update" - (if from-command-line? - (~a - ";\n use `--all' to update all packages, or run from a package's directory" - "\n to update that package") - "") - "\n"))])) - 'skip] - [(empty? to-updat*) - (unless quiet? - (printf/flush "No updates available\n")) - 'skip] - [else - (define to-update - (hash-values - (for/fold ([ht #hash()]) ([u (in-list to-updat*)]) - (cond - [(hash-ref ht (pkg-desc-name u) #f) - => (lambda (v) - (cond - [(pkg-desc=? v u) ht] - [else - (pkg-error (~a "cannot update with conflicting update information;\n" - " package name: ~a") - (pkg-desc-name u))]))] - [else - (hash-set ht (pkg-desc-name u) u)])))) - (unless quiet? - (printf "Updating:\n") - (for ([u (in-list to-update)]) - (printf " ~a\n" (pkg-desc-name u))) - (flush-output)) - (pkg-install - #:updating? #t - #:pre-succeed (λ () (for-each (compose (remove-package #t quiet? use-trash?) pkg-desc-name) to-update)) - #:dep-behavior dep-behavior - #:update-deps? update-deps? - #:update-implies? update-implies? - #:update-cache update-cache - #:catalog-lookup-cache catalog-lookup-cache - #:remote-checksum-cache remote-checksum-cache - #:check-pkg-early? #f - #:quiet? quiet? - #:use-trash? use-trash? - #:from-command-line? from-command-line? - #:strip strip-mode - #:force-strip? force-strip? - #:all-platforms? all-platforms? - #:force? force? - #:ignore-checksums? ignore-checksums? - #:strict-doc-conflicts? strict-doc-conflicts? - #:use-cache? use-cache? - #:link-dirs? link-dirs? - #:multi-clone-behavior clone-behavior - #:convert-to-non-clone? (and lookup-for-clone? - (andmap pkg-desc? in-pkgs) - (not (ormap pkg-desc-extra-path in-pkgs))) - #:pull-behavior pull-behavior - to-update)])) + (define prefetch-group (make-prefetch-group)) + (define catalog-lookup-cache (make-hash)) ; [prefetch-shared] + (define remote-checksum-cache (make-hash)) ; [prefetch-shared] + (call-with-prefetch-cleanup + prefetch-group + (lambda () + (define to-updat* (let ([updater (packages-to-update download-printf db + #:must-update? (and (not all-mode?) + (not update-deps?)) + #:deps? (or update-deps? + all-mode?) ; avoid races + #:implies? update-implies? + #:update-cache update-cache + #:prefetch-group prefetch-group + #:namespace metadata-ns + #:catalog-lookup-cache catalog-lookup-cache + #:remote-checksum-cache remote-checksum-cache + #:all-platforms? all-platforms? + #:ignore-checksums? ignore-checksums? + #:use-cache? use-cache? + #:from-command-line? from-command-line? + #:skip-uninstalled? skip-uninstalled? + #:link-dirs? link-dirs? + #:all-mode? all-mode?)] + [pkgs (map (compose + (if infer-clone-from-dir? + (convert-directory-to-installed-clone db) + values) + (if lookup-for-clone? + (convert-clone-name-to-clone-repo/install catalog-lookup-cache + download-printf) + (convert-clone-name-to-clone-repo/update db + skip-uninstalled? + from-command-line?))) + pkgs)]) + ;; Prefetch packages info and checksums: + (for ([pkg (in-list pkgs)]) (updater #:prefetch? #t pkg)) + ;; Build update info: + (append-map updater pkgs))) + (cond + [(empty? pkgs) + (unless quiet? + (cond + [all? + (printf/flush (~a "No updates available; no packages installed in ~a scope\n") + (current-pkg-scope))] + [else + (printf/flush (~a "No packages given to update" + (if from-command-line? + (~a + ";\n use `--all' to update all packages, or run from a package's directory" + "\n to update that package") + "") + "\n"))])) + 'skip] + [(empty? to-updat*) + (unless quiet? + (printf/flush "No updates available\n")) + 'skip] + [else + (define to-update + (hash-values + (for/fold ([ht #hash()]) ([u (in-list to-updat*)]) + (cond + [(hash-ref ht (pkg-desc-name u) #f) + => (lambda (v) + (cond + [(pkg-desc=? v u) ht] + [else + (pkg-error (~a "cannot update with conflicting update information;\n" + " package name: ~a") + (pkg-desc-name u))]))] + [else + (hash-set ht (pkg-desc-name u) u)])))) + (unless quiet? + (printf "Updating:\n") + (for ([u (in-list to-update)]) + (printf " ~a\n" (pkg-desc-name u))) + (flush-output)) + (pkg-install + #:updating? #t + #:pre-succeed (λ () (for-each (compose (remove-package #t quiet? use-trash?) pkg-desc-name) to-update)) + #:dep-behavior dep-behavior + #:update-deps? update-deps? + #:update-implies? update-implies? + #:update-cache update-cache + #:prefetch-group prefetch-group + #:catalog-lookup-cache catalog-lookup-cache + #:remote-checksum-cache remote-checksum-cache + #:check-pkg-early? #f + #:quiet? quiet? + #:use-trash? use-trash? + #:from-command-line? from-command-line? + #:strip strip-mode + #:force-strip? force-strip? + #:all-platforms? all-platforms? + #:force? force? + #:ignore-checksums? ignore-checksums? + #:strict-doc-conflicts? strict-doc-conflicts? + #:use-cache? use-cache? + #:link-dirs? link-dirs? + #:multi-clone-behavior clone-behavior + #:convert-to-non-clone? (and lookup-for-clone? + (andmap pkg-desc? in-pkgs) + (not (ormap pkg-desc-extra-path in-pkgs))) + #:pull-behavior pull-behavior + to-update)])))) ;; ---------------------------------------- diff --git a/racket/collects/pkg/private/prefetch.rkt b/racket/collects/pkg/private/prefetch.rkt new file mode 100644 index 0000000000..4400d20e4e --- /dev/null +++ b/racket/collects/pkg/private/prefetch.rkt @@ -0,0 +1,187 @@ +#lang racket/base +(require racket/async-channel) + +(provide make-prefetch-group + prefetch-group-in-progress + call-with-prefetch-cleanup + make-prefetch-future + prefetch-future? + prefetch-touch) + +;; A prefetch is a kind of future for getting package info from +;; a catalog or getting a checksum from a remote source. We +;; run at most `MAX-PARALLEL` of them at once, and we avoid +;; creating any threads if only one needs to run. + +;; Output via `download-printf` is pumped back to the original thread +;; so that it's not interleaved. Output is pumped while waiting on a +;; future or when creating one. + +(define MAX-PARALLEL 32) + +;; Only mutate a group or a future with the group's lock held + +(struct prefetch-group (custodian + [in-cleanup? #:mutable] + in-progress + lock + [pending #:mutable] + [pending-rev #:mutable] + [tokens #:mutable] + output)) + +(struct prefetch-future ([proc #:mutable] + [get-result #:mutable] + [evt #:mutable])) + +(struct output-msg (token fmt args)) + +(define (make-prefetch-group) + (prefetch-group (make-custodian) + #f + (make-hash) + (make-semaphore 1) + null + null + (for/list ([i (in-range MAX-PARALLEL)]) + i) + (make-async-channel))) + + +;; Ensure that `group` is terminated on exit from `thunk`. +;; Terminate only on the outer exit in case of nested calls. +(define (call-with-prefetch-cleanup group thunk) + (if (prefetch-group-in-cleanup? group) + (thunk) + (dynamic-wind + (lambda () + (set-prefetch-group-in-cleanup?! group #t)) + thunk + (lambda () + (custodian-shutdown-all (prefetch-group-custodian group)))))) + +;; Put `proc` in a future to be potentially run in a thread. +(define (make-prefetch-future group download-printf proc) + (pump-output group download-printf) + (define f (prefetch-future proc #t #f)) + (call-with-semaphore (prefetch-group-lock group) + (lambda () + (set-prefetch-group-pending-rev! group (cons f (prefetch-group-pending-rev group))) + (maybe-start-future group download-printf))) + (pump-output group download-printf) + f) + +;; Wait for a future to be ready +(define (prefetch-touch f group download-printf) + (pump-output group download-printf) + (cond + [(prefetch-future-evt f) + (pump-output group download-printf + #:evt (prefetch-future-evt f) + #:timeout #f) + ((prefetch-future-get-result f))] + [else + ((call-with-semaphore (prefetch-group-lock group) + (lambda () + (cond + [(prefetch-future-evt f) + ;; Got a thread meanwhile, so recur: + (lambda () + (prefetch-touch f group download-printf))] + [else + ;; Mark this future as no longer pending, and + ;; run its procedure directly: + (make-thread-proc f download-printf)]))))])) + +;; Wraps the procedure in `f` so that it's ready to run in a thread, +;; and replace the procured in `f` to eventually return the result +;; or propagate an exception. +(define (make-thread-proc f download-printf) + (define s (make-semaphore)) + (define proc (prefetch-future-proc f)) + (set-prefetch-future-proc! f #f) + (define result #f) + (define result-is-exn? #f) + (define (get-result) (if result-is-exn? + (raise result) + result)) + (set-prefetch-future-get-result! f get-result) + (set-prefetch-future-evt! f (semaphore-peek-evt s)) + (lambda () + (with-handlers ([values (lambda (v) + (set! result-is-exn? #t) + (set! result v))]) + (set! result (proc download-printf))) + (semaphore-post s) + (get-result))) + +;; Call with lock: +(define (maybe-start-future group download-printf) + (cond + [(null? (prefetch-group-pending group)) + (cond + [(null? (prefetch-group-pending-rev group)) + ;; Nothing to do + (void)] + [else + ;; Move reversed list to ready list + (set-prefetch-group-pending! group (reverse (prefetch-group-pending-rev group))) + (set-prefetch-group-pending-rev! group null) + (maybe-start-future group download-printf)])] + [(not (prefetch-future-proc (car (prefetch-group-pending group)))) + ;; Discard no-longer-needed future + (set-prefetch-group-pending! group (cdr (prefetch-group-pending group))) + (maybe-start-future group download-printf)] + [(and (null? (cdr (prefetch-group-pending group))) + (null? (prefetch-group-pending-rev group))) + ;; Only one prefetch available, so don't start a thread + (void)] + [(null? (prefetch-group-tokens group)) + ;; Too many running already + (void)] + [else + ;; Start a thread: + (define token (car (prefetch-group-tokens group))) + (set-prefetch-group-tokens! group (cdr (prefetch-group-tokens group))) + (define proc + (make-thread-proc (car (prefetch-group-pending group)) + (lambda (fmt . args) + (async-channel-put (prefetch-group-output group) + (output-msg token fmt args))))) + (set-prefetch-group-pending! group (cdr (prefetch-group-pending group))) + (parameterize ([current-custodian (prefetch-group-custodian group)]) + (thread + (lambda () + (proc) + ;; The future computation is done, so release the token and + ;; maybe start a new future: + (call-with-semaphore (prefetch-group-lock group) + (lambda () + (set-prefetch-group-tokens! group (cons token (prefetch-group-tokens group))) + (maybe-start-future group download-printf)))))) + (void)])) + +;; Check for output from a future thread: +(define (pump-output group + download-printf + #:evt [evt never-evt] + #:timeout [timeout 0]) + (sync/timeout + timeout + evt + (handle-evt (prefetch-group-output group) + (lambda (msg) + (define token (output-msg-token msg)) + (define fmt (output-msg-fmt msg)) + (define args (output-msg-args msg)) + (apply download-printf + (string-append + (format "~a~a: " + (if (token . < . 10) "0" "") + token) + fmt) + args) + (pump-output group + download-printf + #:evt evt + #:timeout timeout))))) diff --git a/racket/collects/pkg/private/stage.rkt b/racket/collects/pkg/private/stage.rkt index f62a5a578a..eceef6021b 100644 --- a/racket/collects/pkg/private/stage.rkt +++ b/racket/collects/pkg/private/stage.rkt @@ -28,7 +28,8 @@ "addl-installs.rkt" "repo-path.rkt" "orig-pkg.rkt" - "git.rkt") + "git.rkt" + "prefetch.rkt") (provide (struct-out install-info) remote-package-checksum @@ -48,16 +49,16 @@ (define (remote-package-checksum pkg download-printf pkg-name #:type [type #f] + #:prefetch? [prefetch? #f] + #:prefetch-group [prefetch-group #f] #:catalog-lookup-cache [catalog-lookup-cache #f] #:remote-checksum-cache [remote-checksum-cache #f]) - (cond - [(and remote-checksum-cache - (hash-ref remote-checksum-cache pkg #f)) - => (lambda (checksum) checksum)] - [else + (define (lookup-normally download-printf) (define checksum (match pkg [`(catalog ,pkg-name . ,_) + ;; If we're in a prefetch thread, we expect no other prefetchs in + ;; progress for `pkg-name`: (hash-ref (package-catalog-lookup pkg-name #f catalog-lookup-cache download-printf) 'checksum)] @@ -73,7 +74,38 @@ #:pkg-name pkg-name)])) (when remote-checksum-cache (hash-set! remote-checksum-cache pkg checksum)) - checksum])) + checksum) + + (when (and prefetch? (not (and catalog-lookup-cache + remote-checksum-cache + prefetch-group))) + (error "internal error: insufficient caches or group for prefetch of package checksum")) + + ;; Loop to combine cache lookup and prefetch dispatch: + (let loop ([prefetch? prefetch?] [download-printf download-printf]) + (cond + [(and remote-checksum-cache + (hash-ref remote-checksum-cache pkg #f)) + => (lambda (checksum) + (if (and (prefetch-future? checksum) + (not prefetch?)) + (prefetch-touch checksum prefetch-group download-printf) + checksum))] + [prefetch? + (define s (make-semaphore)) + (define f (make-prefetch-future + prefetch-group + download-printf + (lambda (download-printf) + ;; Don't start until hash table has future: + (semaphore-wait s) + ;; Adjusts cache when it has a result: + (lookup-normally download-printf)))) + (hash-set! remote-checksum-cache pkg f) + (semaphore-post s) + f] + [else + (lookup-normally download-printf)]))) ;; Downloads a package (if needed) and unpacks it (if needed) into a ;; temporary directory.