raco pkg: get catalog info and checksums in parallel

When `raco pkg {install,update}` has a list of packages to check,
first run through the list in a "prefetch" mode to create futures to
fetch the information. Issuing requests to servers in parallel
can greatly speed up `raco pkg update --all`.

Package content is still downloaded sequentially.
This commit is contained in:
Matthew Flatt 2015-09-09 20:53:27 -06:00
parent 9bf68db7f7
commit 5aebb1c539
3 changed files with 555 additions and 270 deletions

View File

@ -31,19 +31,29 @@
"orig-pkg.rkt"
"info-to-desc.rkt"
"git.rkt"
"check-will-exist.rkt")
"check-will-exist.rkt"
"prefetch.rkt")
(provide pkg-install
pkg-update)
;; A [prefetch-shared] annotation means that a hash table is shared
;; with prefetch threads. If a prefetch group is terminated, then all
;; prefetch-shared tables must be abaondoned, because a thread with a
;; lock can be terminated.
(define (checksum-for-pkg-source pkg-source type pkg-name given-checksum download-printf
#:catalog-lookup-cache [catalog-lookup-cache #f]
#:remote-checksum-cache [remote-checksum-cache #f])
#:prefetch? [prefetch? #f]
#:prefetch-group [prefetch-group #f]
#:catalog-lookup-cache [catalog-lookup-cache #f] ; [prefetch-shared]
#:remote-checksum-cache [remote-checksum-cache #f]) ; [prefetch-shared]
(case type
[(file-url dir-url github git clone)
(or given-checksum
(remote-package-checksum `(url ,pkg-source) download-printf pkg-name
#:type type
#:prefetch? prefetch?
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache))]
[(file)
@ -56,6 +66,8 @@
(or given-checksum
(remote-package-checksum `(catalog ,pkg-source) download-printf pkg-name
#:type type
#:prefetch? prefetch?
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache))]
[else given-checksum]))
@ -134,8 +146,9 @@
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache ; [prefetch-shared]
#:remote-checksum-cache remote-checksum-cache ; [prefetch-shared]
#:updating? updating-all?
#:extra-updating extra-updating
#:ignore-checksums? ignore-checksums?
@ -416,20 +429,23 @@
(or do-update-deps?
(set-member? implies name))
(not (hash-ref simultaneous-installs name #f))
((packages-to-update download-printf current-scope-db
#:must-update? #f
#:deps? do-update-deps?
#:implies? update-implies?
#:update-cache update-cache
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:from-command-line? from-command-line?
#:link-dirs? link-dirs?)
name))
(let ([updater
(packages-to-update download-printf current-scope-db
#:must-update? #f
#:deps? do-update-deps?
#:implies? update-implies?
#:update-cache update-cache
#:prefetch-group prefetch-group
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:from-command-line? from-command-line?
#:link-dirs? link-dirs?)])
(updater #:prefetch? #t name)
(updater name)))
null))
deps))
(and (not (empty? update-pkgs))
@ -527,19 +543,21 @@
(define update-pkgs (map car update-deps))
(define (make-pre-succeed)
(define db current-scope-db)
(let ([to-update (append-map (packages-to-update download-printf db
#:deps? update-deps?
#:implies? update-implies?
#:update-cache update-cache
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:from-command-line? from-command-line?
#:link-dirs? link-dirs?)
update-pkgs)])
(let ([to-update (let ([updater (packages-to-update download-printf db
#:deps? update-deps?
#:implies? update-implies?
#:update-cache update-cache
#:prefetch-group prefetch-group
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:from-command-line? from-command-line?
#:link-dirs? link-dirs?)])
(for ([pkg (in-list update-pkgs)]) (updater #:prefetch? #t pkg))
(append-map updater update-pkgs))])
(λ () (for-each (compose (remove-package #t quiet? use-trash?) pkg-desc-name) to-update))))
(match this-dep-behavior
['fail
@ -820,8 +838,9 @@
#:update-deps? [update-deps? #f]
#:update-implies? [update-implies? #t]
#:update-cache [update-cache (make-hash)]
#:catalog-lookup-cache [catalog-lookup-cache (make-hash)]
#:remote-checksum-cache [remote-checksum-cache (make-hash)]
#:prefetch-group [prefetch-group (make-prefetch-group)]
#:catalog-lookup-cache [catalog-lookup-cache (make-hash)] ; [prefetch-shared]
#:remote-checksum-cache [remote-checksum-cache (make-hash)] ; [prefetch-shared]
#:check-pkg-early? [check-pkg-early? #t]
#:updating? [updating? #f]
#:quiet? [quiet? #f]
@ -874,89 +893,95 @@
download-printf
from-command-line?
convert-to-non-clone?))
(with-handlers* ([vector?
(match-lambda
[(vector updating? new-infos dep-pkg deps more-pre-succeed conv clone-info)
(pkg-install
#:summary-deps (snoc summary-deps (vector dep-pkg deps))
#:old-infos new-infos
#:old-descs (append done-descs new-descs)
#:all-platforms? all-platforms?
#:force? force
#:ignore-checksums? ignore-checksums?
#:strict-doc-conflicts? strict-doc-conflicts?
#:use-cache? use-cache?
#:dep-behavior dep-behavior
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:check-pkg-early? #f
#:pre-succeed (lambda () (pre-succeed) (more-pre-succeed))
#:updating? updating?
#:quiet? quiet?
#:use-trash? use-trash?
#:from-command-line? from-command-line?
#:conversation conv
#:strip strip-mode
#:force-strip? force-strip?
#:multi-clone-behavior (vector-ref clone-info 0)
#:repo-descs (vector-ref clone-info 1)
#:pull-behavior pull-behavior
(for/list ([dep (in-list deps)])
(if (pkg-desc? dep)
dep
(pkg-desc dep #f #f #f #t #f))))])])
(begin0
(install-packages
#:old-infos done-infos
#:old-descs done-descs
#:all-platforms? all-platforms?
#:force? force
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:skip-installed? skip-installed?
#:dep-behavior dep-behavior
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:pre-succeed (λ ()
(for ([pkg-name (in-hash-keys extra-updating)])
((remove-package #t quiet? use-trash?) pkg-name))
(pre-succeed))
#:updating? updating?
#:extra-updating extra-updating
#:quiet? quiet?
#:use-trash? use-trash?
#:from-command-line? from-command-line?
#:conversation conversation
#:strip strip-mode
#:force-strip? force-strip?
#:link-dirs? link-dirs?
#:local-docs-ok? (not strict-doc-conflicts?)
#:ai-cache (box #f)
#:clone-info (vector clone-behavior
repo-descs)
#:pull-behavior pull-behavior
new-descs)
(unless (empty? summary-deps)
(unless quiet?
(printf/flush "The following~a packages were listed as dependencies~a:~a\n"
(if updating? " out-of-date" " uninstalled")
(format "\nand they were ~a~a"
(if (eq? dep-behavior 'search-auto) "automatically " "")
(if updating? "updated" "installed"))
(string-append*
(for/list ([p*ds (in-list summary-deps)])
(match-define (vector n ds) p*ds)
(format "\n dependencies of ~a:~a"
n
(if updating?
(format-deps ds)
(format-list ds)))))))))))
(call-with-prefetch-cleanup
prefetch-group
(lambda ()
(with-handlers* ([vector?
(match-lambda
[(vector updating? new-infos dep-pkg deps more-pre-succeed conv clone-info)
(pkg-install
#:summary-deps (snoc summary-deps (vector dep-pkg deps))
#:old-infos new-infos
#:old-descs (append done-descs new-descs)
#:all-platforms? all-platforms?
#:force? force
#:ignore-checksums? ignore-checksums?
#:strict-doc-conflicts? strict-doc-conflicts?
#:use-cache? use-cache?
#:dep-behavior dep-behavior
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:check-pkg-early? #f
#:pre-succeed (lambda () (pre-succeed) (more-pre-succeed))
#:updating? updating?
#:quiet? quiet?
#:use-trash? use-trash?
#:from-command-line? from-command-line?
#:conversation conv
#:strip strip-mode
#:force-strip? force-strip?
#:multi-clone-behavior (vector-ref clone-info 0)
#:repo-descs (vector-ref clone-info 1)
#:pull-behavior pull-behavior
(for/list ([dep (in-list deps)])
(if (pkg-desc? dep)
dep
(pkg-desc dep #f #f #f #t #f))))])])
(begin0
(install-packages
#:old-infos done-infos
#:old-descs done-descs
#:all-platforms? all-platforms?
#:force? force
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:skip-installed? skip-installed?
#:dep-behavior dep-behavior
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:pre-succeed (λ ()
(for ([pkg-name (in-hash-keys extra-updating)])
((remove-package #t quiet? use-trash?) pkg-name))
(pre-succeed))
#:updating? updating?
#:extra-updating extra-updating
#:quiet? quiet?
#:use-trash? use-trash?
#:from-command-line? from-command-line?
#:conversation conversation
#:strip strip-mode
#:force-strip? force-strip?
#:link-dirs? link-dirs?
#:local-docs-ok? (not strict-doc-conflicts?)
#:ai-cache (box #f)
#:clone-info (vector clone-behavior
repo-descs)
#:pull-behavior pull-behavior
new-descs)
(unless (empty? summary-deps)
(unless quiet?
(printf/flush "The following~a packages were listed as dependencies~a:~a\n"
(if updating? " out-of-date" " uninstalled")
(format "\nand they were ~a~a"
(if (eq? dep-behavior 'search-auto) "automatically " "")
(if updating? "updated" "installed"))
(string-append*
(for/list ([p*ds (in-list summary-deps)])
(match-define (vector n ds) p*ds)
(format "\n dependencies of ~a:~a"
n
(if updating?
(format-deps ds)
(format-list ds)))))))))))))
;; Determine packages to update, starting with `pkg-name'. If `pkg-name'
;; needs to be updated, return it in a list. Otherwise, if `deps?',
@ -974,9 +999,10 @@
#:deps? deps?
#:implies? implies?
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:catalog-lookup-cache catalog-lookup-cache ; [prefetch-shared]
#:remote-checksum-cache remote-checksum-cache ; [prefetch-shared]
#:update-cache update-cache
#:prefetch-group prefetch-group
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
@ -985,11 +1011,17 @@
#:skip-uninstalled? [skip-uninstalled? #f]
#:all-mode? [all-mode? #f]
#:force-update? [force-update? #f])
pkg-name)
pkg-name
;; In prefetch mode, do as much work as possible to generate
;; server requests without waiting for results and without
;; making any other state changes --- but forced errors are
;; ok.
#:prefetch? [prefetch? #f])
(let update-loop ([pkg-name pkg-name]
[must-update? must-update?]
[force-update? force-update?]
[report-skip? #t])
[report-skip? #t]
[prefetch? prefetch?])
(cond
[(pkg-desc? pkg-name)
;; Infer the package-source type and name:
@ -1013,46 +1045,59 @@
name
(pkg-desc-checksum pkg-name)
download-printf
#:prefetch? prefetch?
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache))
(hash-set! update-cache name new-checksum) ; record downloaded checksum
(unless (or ignore-checksums? (not (pkg-desc-checksum pkg-name)))
(unless (equal? (pkg-desc-checksum pkg-name) new-checksum)
(pkg-error (~a "incorrect checksum on package\n"
" package source: ~a\n"
" expected: ~e\n"
" got: ~e")
(pkg-desc-source pkg-name)
(pkg-desc-checksum pkg-name)
new-checksum)))
(if (or force-update?
(not (equal? (pkg-info-checksum info)
new-checksum))
;; No checksum available => always update
(not new-checksum)
;; Different source => always update
(not (same-orig-pkg? (pkg-info-orig-pkg info)
(desc->orig-pkg type
(pkg-desc-source pkg-name)
(pkg-desc-extra-path pkg-name)))))
;; Update:
(begin
(hash-set! update-cache (box name) #t)
(list (pkg-desc (pkg-desc-source pkg-name)
(pkg-desc-type pkg-name)
name
(pkg-desc-checksum pkg-name)
(pkg-desc-auto? pkg-name)
(or (pkg-desc-extra-path pkg-name)
(and (eq? type 'clone)
(current-directory))))))
;; No update needed, but maybe check dependencies:
(if (or deps?
implies?)
(update-loop name #f #f #f)
null))])]
[(hash-ref update-cache (box pkg-name) #f)
(cond
[prefetch?
;; Don't proceed further if we're just issuing prefetches
null]
[else
(hash-set! update-cache name new-checksum) ; record downloaded checksum
(unless (or ignore-checksums? (not (pkg-desc-checksum pkg-name)))
(unless (equal? (pkg-desc-checksum pkg-name) new-checksum)
(pkg-error (~a "incorrect checksum on package\n"
" package source: ~a\n"
" expected: ~e\n"
" got: ~e")
(pkg-desc-source pkg-name)
(pkg-desc-checksum pkg-name)
new-checksum)))
(if (or force-update?
;; Different checksum => update
(not (equal? (pkg-info-checksum info)
new-checksum))
;; No checksum available => always update
(not new-checksum)
;; Different source => always update
(not (same-orig-pkg? (pkg-info-orig-pkg info)
(desc->orig-pkg type
(pkg-desc-source pkg-name)
(pkg-desc-extra-path pkg-name)))))
;; Update:
(begin
(hash-set! update-cache (box name) #t)
(list (pkg-desc (pkg-desc-source pkg-name)
(pkg-desc-type pkg-name)
name
(pkg-desc-checksum pkg-name)
(pkg-desc-auto? pkg-name)
(or (pkg-desc-extra-path pkg-name)
(and (eq? type 'clone)
(current-directory))))))
;; No update needed, but maybe check dependencies:
(if (or deps?
implies?)
(update-loop name #f #f #f prefetch?)
null))])])]
[(and prefetch?
(hash-ref (prefetch-group-in-progress prefetch-group) pkg-name #f))
;; Already covered for prefetch
null]
[(and (not prefetch?)
(hash-ref update-cache (box pkg-name) #f))
;; package is already being updated
null]
;; A string indicates that package source that should be
@ -1083,22 +1128,26 @@
;; needing an update, even if it is installed as a link, so
;; that the user is asked about installing dependencies, etc.
(log-pkg-debug "Missing dependencies of ~s: ~s" pkg-name missing-deps)
(update-loop (pkg-info->desc pkg-name info) #f #t #t)]
(update-loop (pkg-info->desc pkg-name info) #f #t #t prefetch?)]
[else (k)]))
(define (update-dependencies)
(hash-set! update-cache (box pkg-name) #t)
;; Mark in progress:
(if prefetch?
(hash-set! (prefetch-group-in-progress prefetch-group) pkg-name #t)
(hash-set! update-cache (box pkg-name) #t))
;; Dependencies?
(if (or deps? implies?)
;; Check dependencies
(append-map
(lambda (dep) (update-loop dep #f #f #t))
(lambda (dep) (update-loop dep #f #f #t prefetch?))
deps)
null))
(define (skip/update-dependencies kind)
(check-missing-dependencies
(lambda ()
(unless (or all-mode? (not report-skip?))
(unless (or all-mode? (not report-skip?) prefetch?)
(download-printf "Skipping update of ~a: ~a\n"
kind
pkg-name))
@ -1148,20 +1197,27 @@
(hash-ref update-cache pkg-name
(lambda ()
(remote-package-checksum orig-pkg download-printf pkg-name
#:prefetch? prefetch?
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache))))
;; Record downloaded checksum:
(hash-set! update-cache pkg-name new-checksum)
(unless prefetch?
(hash-set! update-cache pkg-name new-checksum))
(or (and new-checksum
(not (equal? checksum new-checksum))
;; Update it:
(begin
(cond
[prefetch?
;; Don't proceed further if we're just issuing prefetches
null]
[else
;; Flush cache of downloaded checksums, in case
;; there was a race between our checkig and updates on
;; the catalog server:
(clear-checksums-in-cache! update-cache)
(list (pkg-desc orig-pkg-source orig-pkg-type pkg-name #f auto?
orig-pkg-dir))))
orig-pkg-dir))]))
;; Continue with dependencies, maybe
(check-missing-dependencies update-dependencies))]))]
[else null])))
@ -1199,103 +1255,113 @@
(early-check-for-installed in-pkgs db #:wanted? #t))
in-pkgs]))
(define update-cache (make-hash))
(define catalog-lookup-cache (make-hash))
(define remote-checksum-cache (make-hash))
(define to-updat* (append-map (packages-to-update download-printf db
#:must-update? (and (not all-mode?)
(not update-deps?))
#:deps? (or update-deps?
all-mode?) ; avoid races
#:implies? update-implies?
#:update-cache update-cache
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:from-command-line? from-command-line?
#:skip-uninstalled? skip-uninstalled?
#:link-dirs? link-dirs?
#:all-mode? all-mode?)
(map (compose
(if infer-clone-from-dir?
(convert-directory-to-installed-clone db)
values)
(if lookup-for-clone?
(convert-clone-name-to-clone-repo/install catalog-lookup-cache
download-printf)
(convert-clone-name-to-clone-repo/update db
skip-uninstalled?
from-command-line?)))
pkgs)))
(cond
[(empty? pkgs)
(unless quiet?
(cond
[all?
(printf/flush (~a "No updates available; no packages installed in ~a scope\n")
(current-pkg-scope))]
[else
(printf/flush (~a "No packages given to update"
(if from-command-line?
(~a
";\n use `--all' to update all packages, or run from a package's directory"
"\n to update that package")
"")
"\n"))]))
'skip]
[(empty? to-updat*)
(unless quiet?
(printf/flush "No updates available\n"))
'skip]
[else
(define to-update
(hash-values
(for/fold ([ht #hash()]) ([u (in-list to-updat*)])
(cond
[(hash-ref ht (pkg-desc-name u) #f)
=> (lambda (v)
(cond
[(pkg-desc=? v u) ht]
[else
(pkg-error (~a "cannot update with conflicting update information;\n"
" package name: ~a")
(pkg-desc-name u))]))]
[else
(hash-set ht (pkg-desc-name u) u)]))))
(unless quiet?
(printf "Updating:\n")
(for ([u (in-list to-update)])
(printf " ~a\n" (pkg-desc-name u)))
(flush-output))
(pkg-install
#:updating? #t
#:pre-succeed (λ () (for-each (compose (remove-package #t quiet? use-trash?) pkg-desc-name) to-update))
#:dep-behavior dep-behavior
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:check-pkg-early? #f
#:quiet? quiet?
#:use-trash? use-trash?
#:from-command-line? from-command-line?
#:strip strip-mode
#:force-strip? force-strip?
#:all-platforms? all-platforms?
#:force? force?
#:ignore-checksums? ignore-checksums?
#:strict-doc-conflicts? strict-doc-conflicts?
#:use-cache? use-cache?
#:link-dirs? link-dirs?
#:multi-clone-behavior clone-behavior
#:convert-to-non-clone? (and lookup-for-clone?
(andmap pkg-desc? in-pkgs)
(not (ormap pkg-desc-extra-path in-pkgs)))
#:pull-behavior pull-behavior
to-update)]))
(define prefetch-group (make-prefetch-group))
(define catalog-lookup-cache (make-hash)) ; [prefetch-shared]
(define remote-checksum-cache (make-hash)) ; [prefetch-shared]
(call-with-prefetch-cleanup
prefetch-group
(lambda ()
(define to-updat* (let ([updater (packages-to-update download-printf db
#:must-update? (and (not all-mode?)
(not update-deps?))
#:deps? (or update-deps?
all-mode?) ; avoid races
#:implies? update-implies?
#:update-cache update-cache
#:prefetch-group prefetch-group
#:namespace metadata-ns
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:all-platforms? all-platforms?
#:ignore-checksums? ignore-checksums?
#:use-cache? use-cache?
#:from-command-line? from-command-line?
#:skip-uninstalled? skip-uninstalled?
#:link-dirs? link-dirs?
#:all-mode? all-mode?)]
[pkgs (map (compose
(if infer-clone-from-dir?
(convert-directory-to-installed-clone db)
values)
(if lookup-for-clone?
(convert-clone-name-to-clone-repo/install catalog-lookup-cache
download-printf)
(convert-clone-name-to-clone-repo/update db
skip-uninstalled?
from-command-line?)))
pkgs)])
;; Prefetch packages info and checksums:
(for ([pkg (in-list pkgs)]) (updater #:prefetch? #t pkg))
;; Build update info:
(append-map updater pkgs)))
(cond
[(empty? pkgs)
(unless quiet?
(cond
[all?
(printf/flush (~a "No updates available; no packages installed in ~a scope\n")
(current-pkg-scope))]
[else
(printf/flush (~a "No packages given to update"
(if from-command-line?
(~a
";\n use `--all' to update all packages, or run from a package's directory"
"\n to update that package")
"")
"\n"))]))
'skip]
[(empty? to-updat*)
(unless quiet?
(printf/flush "No updates available\n"))
'skip]
[else
(define to-update
(hash-values
(for/fold ([ht #hash()]) ([u (in-list to-updat*)])
(cond
[(hash-ref ht (pkg-desc-name u) #f)
=> (lambda (v)
(cond
[(pkg-desc=? v u) ht]
[else
(pkg-error (~a "cannot update with conflicting update information;\n"
" package name: ~a")
(pkg-desc-name u))]))]
[else
(hash-set ht (pkg-desc-name u) u)]))))
(unless quiet?
(printf "Updating:\n")
(for ([u (in-list to-update)])
(printf " ~a\n" (pkg-desc-name u)))
(flush-output))
(pkg-install
#:updating? #t
#:pre-succeed (λ () (for-each (compose (remove-package #t quiet? use-trash?) pkg-desc-name) to-update))
#:dep-behavior dep-behavior
#:update-deps? update-deps?
#:update-implies? update-implies?
#:update-cache update-cache
#:prefetch-group prefetch-group
#:catalog-lookup-cache catalog-lookup-cache
#:remote-checksum-cache remote-checksum-cache
#:check-pkg-early? #f
#:quiet? quiet?
#:use-trash? use-trash?
#:from-command-line? from-command-line?
#:strip strip-mode
#:force-strip? force-strip?
#:all-platforms? all-platforms?
#:force? force?
#:ignore-checksums? ignore-checksums?
#:strict-doc-conflicts? strict-doc-conflicts?
#:use-cache? use-cache?
#:link-dirs? link-dirs?
#:multi-clone-behavior clone-behavior
#:convert-to-non-clone? (and lookup-for-clone?
(andmap pkg-desc? in-pkgs)
(not (ormap pkg-desc-extra-path in-pkgs)))
#:pull-behavior pull-behavior
to-update)]))))
;; ----------------------------------------

View File

@ -0,0 +1,187 @@
#lang racket/base
(require racket/async-channel)
(provide make-prefetch-group
prefetch-group-in-progress
call-with-prefetch-cleanup
make-prefetch-future
prefetch-future?
prefetch-touch)
;; A prefetch is a kind of future for getting package info from
;; a catalog or getting a checksum from a remote source. We
;; run at most `MAX-PARALLEL` of them at once, and we avoid
;; creating any threads if only one needs to run.
;; Output via `download-printf` is pumped back to the original thread
;; so that it's not interleaved. Output is pumped while waiting on a
;; future or when creating one.
(define MAX-PARALLEL 32)
;; Only mutate a group or a future with the group's lock held
(struct prefetch-group (custodian
[in-cleanup? #:mutable]
in-progress
lock
[pending #:mutable]
[pending-rev #:mutable]
[tokens #:mutable]
output))
(struct prefetch-future ([proc #:mutable]
[get-result #:mutable]
[evt #:mutable]))
(struct output-msg (token fmt args))
(define (make-prefetch-group)
(prefetch-group (make-custodian)
#f
(make-hash)
(make-semaphore 1)
null
null
(for/list ([i (in-range MAX-PARALLEL)])
i)
(make-async-channel)))
;; Ensure that `group` is terminated on exit from `thunk`.
;; Terminate only on the outer exit in case of nested calls.
(define (call-with-prefetch-cleanup group thunk)
(if (prefetch-group-in-cleanup? group)
(thunk)
(dynamic-wind
(lambda ()
(set-prefetch-group-in-cleanup?! group #t))
thunk
(lambda ()
(custodian-shutdown-all (prefetch-group-custodian group))))))
;; Put `proc` in a future to be potentially run in a thread.
(define (make-prefetch-future group download-printf proc)
(pump-output group download-printf)
(define f (prefetch-future proc #t #f))
(call-with-semaphore (prefetch-group-lock group)
(lambda ()
(set-prefetch-group-pending-rev! group (cons f (prefetch-group-pending-rev group)))
(maybe-start-future group download-printf)))
(pump-output group download-printf)
f)
;; Wait for a future to be ready
(define (prefetch-touch f group download-printf)
(pump-output group download-printf)
(cond
[(prefetch-future-evt f)
(pump-output group download-printf
#:evt (prefetch-future-evt f)
#:timeout #f)
((prefetch-future-get-result f))]
[else
((call-with-semaphore (prefetch-group-lock group)
(lambda ()
(cond
[(prefetch-future-evt f)
;; Got a thread meanwhile, so recur:
(lambda ()
(prefetch-touch f group download-printf))]
[else
;; Mark this future as no longer pending, and
;; run its procedure directly:
(make-thread-proc f download-printf)]))))]))
;; Wraps the procedure in `f` so that it's ready to run in a thread,
;; and replace the procured in `f` to eventually return the result
;; or propagate an exception.
(define (make-thread-proc f download-printf)
(define s (make-semaphore))
(define proc (prefetch-future-proc f))
(set-prefetch-future-proc! f #f)
(define result #f)
(define result-is-exn? #f)
(define (get-result) (if result-is-exn?
(raise result)
result))
(set-prefetch-future-get-result! f get-result)
(set-prefetch-future-evt! f (semaphore-peek-evt s))
(lambda ()
(with-handlers ([values (lambda (v)
(set! result-is-exn? #t)
(set! result v))])
(set! result (proc download-printf)))
(semaphore-post s)
(get-result)))
;; Call with lock:
(define (maybe-start-future group download-printf)
(cond
[(null? (prefetch-group-pending group))
(cond
[(null? (prefetch-group-pending-rev group))
;; Nothing to do
(void)]
[else
;; Move reversed list to ready list
(set-prefetch-group-pending! group (reverse (prefetch-group-pending-rev group)))
(set-prefetch-group-pending-rev! group null)
(maybe-start-future group download-printf)])]
[(not (prefetch-future-proc (car (prefetch-group-pending group))))
;; Discard no-longer-needed future
(set-prefetch-group-pending! group (cdr (prefetch-group-pending group)))
(maybe-start-future group download-printf)]
[(and (null? (cdr (prefetch-group-pending group)))
(null? (prefetch-group-pending-rev group)))
;; Only one prefetch available, so don't start a thread
(void)]
[(null? (prefetch-group-tokens group))
;; Too many running already
(void)]
[else
;; Start a thread:
(define token (car (prefetch-group-tokens group)))
(set-prefetch-group-tokens! group (cdr (prefetch-group-tokens group)))
(define proc
(make-thread-proc (car (prefetch-group-pending group))
(lambda (fmt . args)
(async-channel-put (prefetch-group-output group)
(output-msg token fmt args)))))
(set-prefetch-group-pending! group (cdr (prefetch-group-pending group)))
(parameterize ([current-custodian (prefetch-group-custodian group)])
(thread
(lambda ()
(proc)
;; The future computation is done, so release the token and
;; maybe start a new future:
(call-with-semaphore (prefetch-group-lock group)
(lambda ()
(set-prefetch-group-tokens! group (cons token (prefetch-group-tokens group)))
(maybe-start-future group download-printf))))))
(void)]))
;; Check for output from a future thread:
(define (pump-output group
download-printf
#:evt [evt never-evt]
#:timeout [timeout 0])
(sync/timeout
timeout
evt
(handle-evt (prefetch-group-output group)
(lambda (msg)
(define token (output-msg-token msg))
(define fmt (output-msg-fmt msg))
(define args (output-msg-args msg))
(apply download-printf
(string-append
(format "~a~a: "
(if (token . < . 10) "0" "")
token)
fmt)
args)
(pump-output group
download-printf
#:evt evt
#:timeout timeout)))))

View File

@ -28,7 +28,8 @@
"addl-installs.rkt"
"repo-path.rkt"
"orig-pkg.rkt"
"git.rkt")
"git.rkt"
"prefetch.rkt")
(provide (struct-out install-info)
remote-package-checksum
@ -48,16 +49,16 @@
(define (remote-package-checksum pkg download-printf pkg-name
#:type [type #f]
#:prefetch? [prefetch? #f]
#:prefetch-group [prefetch-group #f]
#:catalog-lookup-cache [catalog-lookup-cache #f]
#:remote-checksum-cache [remote-checksum-cache #f])
(cond
[(and remote-checksum-cache
(hash-ref remote-checksum-cache pkg #f))
=> (lambda (checksum) checksum)]
[else
(define (lookup-normally download-printf)
(define checksum
(match pkg
[`(catalog ,pkg-name . ,_)
;; If we're in a prefetch thread, we expect no other prefetchs in
;; progress for `pkg-name`:
(hash-ref (package-catalog-lookup pkg-name #f catalog-lookup-cache
download-printf)
'checksum)]
@ -73,7 +74,38 @@
#:pkg-name pkg-name)]))
(when remote-checksum-cache
(hash-set! remote-checksum-cache pkg checksum))
checksum]))
checksum)
(when (and prefetch? (not (and catalog-lookup-cache
remote-checksum-cache
prefetch-group)))
(error "internal error: insufficient caches or group for prefetch of package checksum"))
;; Loop to combine cache lookup and prefetch dispatch:
(let loop ([prefetch? prefetch?] [download-printf download-printf])
(cond
[(and remote-checksum-cache
(hash-ref remote-checksum-cache pkg #f))
=> (lambda (checksum)
(if (and (prefetch-future? checksum)
(not prefetch?))
(prefetch-touch checksum prefetch-group download-printf)
checksum))]
[prefetch?
(define s (make-semaphore))
(define f (make-prefetch-future
prefetch-group
download-printf
(lambda (download-printf)
;; Don't start until hash table has future:
(semaphore-wait s)
;; Adjusts cache when it has a result:
(lookup-normally download-printf))))
(hash-set! remote-checksum-cache pkg f)
(semaphore-post s)
f]
[else
(lookup-normally download-printf)])))
;; Downloads a package (if needed) and unpacks it (if needed) into a
;; temporary directory.