Split DrDr2 from the main repository.

The `drdr2` pkg is now at
  https://github.com/racket/drdr2
This commit is contained in:
Sam Tobin-Hochstadt 2014-11-28 15:20:48 -05:00
parent 10629db83a
commit 58896d8d23
15 changed files with 0 additions and 666 deletions

View File

@ -1,25 +0,0 @@
DrDr2 is a network of message passing servers.
- Monitor finds pushes and sends them to the scheduler
- The scheduler receives pushes, prepares a tar ball, and sends
jobs to the masters
- Each master sends commands to its slave and confirms to the scheduler
(so that it will receive more commands)
- The slave receives commands and executes them on a test machine
and sends the answer to the master
- The master receives the answer and sends it to the analyzer
- The analyzer records data for the renderer and informs the notifier
- The renderer queries the database
----
DrDr2 uses the MongoDB database for storage
DrDr2 is monitored using Monit

View File

@ -1 +0,0 @@
#lang racket

View File

@ -1,6 +0,0 @@
#lang info
(define name "DrDr2")
(define compile-omit-paths 'all)
(define test-responsibles '((all jay)))

View File

@ -1,72 +0,0 @@
#lang racket/base
(require racket/match
racket/list
racket/serialize
racket/contract
racket/local
unstable/bytes
(planet jaymccarthy/mongodb))
(struct db (m d heap-mc tree-mc))
(define (ensure-mongo-collection d c #:init? init?)
(if init?
(local [(define mc (mongo-db-create-collection! d c #:capped? #f #:size 10000))]
(mongo-collection-index! mc (hasheq 'key 1) #:name "key")
mc)
(mongo-collection d c)))
(define (db-connect spec #:init? [init? #f])
(match-define (regexp #rx"^([a-zA-Z]+):([0-9]+):([a-zA-Z]+)$"
(list _ host (app string->number port) db-name))
spec)
(define m (create-mongo #:host host #:port port))
(define d (mongo-db m db-name))
(define h (ensure-mongo-collection d "heap" #:init? init?))
(define t (ensure-mongo-collection d "tree" #:init? init?))
(db m d h t))
(define (db-ref the-db . path)
(define e
(seqn-first
(mongo-collection-find
(db-heap-mc the-db)
(hasheq 'key path)
#:selector (hasheq 'value 1)
#:limit 1)))
(read/bytes (hash-ref e 'value)))
(define (db-set! the-db value . path)
(mongo-collection-repsert!
(db-heap-mc the-db)
(hasheq 'key path)
(hasheq 'key path
'value (write/bytes value)))
(define-values (dir entry-l) (split-at path (sub1 (length path))))
(define entry (first entry-l))
(mongo-collection-repsert!
(db-tree-mc the-db)
(hasheq 'key dir)
(hasheq '$addToSet (hasheq 'entries entry))))
(define (db-list the-db . path)
(vector->list
(hash-ref
(seqn-first
(mongo-collection-find
(db-tree-mc the-db)
(hasheq 'key path)
#:selector (hasheq 'entries 1)
#:limit 1))
'entries)))
(define (db-close! db)
(close-mongo! (db-m db)))
(provide/contract
[db? (any/c . -> . boolean?)]
[db-connect ((string?) (#:init? boolean?) . ->* . db?)]
[db-ref ((db?) () #:rest (non-empty-listof string?) . ->* . serializable?)]
[db-list ((db?) () #:rest (listof string?) . ->* . (listof string?))]
[db-set! ((db? serializable?) () #:rest (non-empty-listof string?) . ->* . void)]
[db-close! (db? . -> . void)])

View File

@ -1,64 +0,0 @@
#lang racket/base
(require mzlib/os
racket/contract
racket/file
racket/match)
(struct pqueue (dir))
(define (pqueue-tmp dir) (build-path dir "tmp"))
(define (pqueue-dest dir) (build-path dir "queue"))
(define pqueue-init!
(match-lambda
[(pqueue dir)
(make-directory* (pqueue-tmp dir))
(make-directory* (pqueue-dest dir))]))
(define (pqueue-enqueue! pq v)
(match-define (pqueue dir) pq)
(define uniq
(format "~a.~a"
(current-inexact-milliseconds)
(getpid)))
(define tmp (build-path (pqueue-tmp dir) uniq))
(define dest (build-path (pqueue-dest dir) uniq))
(with-output-to-file tmp
(λ () (write v)))
(rename-file-or-directory tmp dest))
(define current-pqueue-wait-seconds (make-parameter 10))
(define (pqueue-dequeue! pq)
(match-define (pqueue dir) pq)
(match (directory-list (pqueue-dest dir))
[(list-rest choice _)
(define dest
(build-path (pqueue-dest dir) choice))
(define tmp
(build-path (pqueue-tmp dir) choice))
(define succeeded?
(with-handlers ([exn? (λ (x) #f)])
(rename-file-or-directory dest tmp)
#t))
(if (not succeeded?)
(pqueue-dequeue! pq)
(dynamic-wind
void
(λ ()
(with-input-from-file tmp read))
(λ ()
(delete-file tmp))))]
[_
(sleep (current-pqueue-wait-seconds))
(pqueue-dequeue! pq)]))
(provide/contract
[current-pqueue-wait-seconds (parameter/c exact-nonnegative-integer?)]
[struct pqueue ([dir path-string?])]
[pqueue-init! (pqueue? . -> . void)]
[pqueue-enqueue! (pqueue? any/c . -> . void)]
[pqueue-dequeue! (pqueue? . -> . any/c)])

View File

@ -1,41 +0,0 @@
#lang racket/base
(require racket/list
racket/contract)
(struct git-push (num author previous-commit commits) #:prefab)
(struct git-commit (hash author date msg) #:prefab)
(struct git-diff git-commit (mfiles) #:prefab)
(struct git-merge git-commit (from to) #:prefab)
(provide/contract
[struct git-push
([num exact-nonnegative-integer?]
[author string?]
[previous-commit string?]
[commits (listof git-commit?)])]
[struct git-commit
([hash string?]
[author string?]
[date string?]
[msg (listof string?)])]
[struct git-diff
([hash string?]
[author string?]
[date string?]
[msg (listof string?)]
[mfiles (listof string?)])]
[struct git-merge
([hash string?]
[author string?]
[date string?]
[msg (listof string?)]
[from string?]
[to string?])])
(define (git-push-start-commit gp)
(git-commit-hash (last (git-push-commits gp))))
(define (git-push-end-commit gp)
(git-commit-hash (first (git-push-commits gp))))
(provide/contract
[git-push-start-commit (git-push? . -> . string?)]
[git-push-end-commit (git-push? . -> . string?)])

View File

@ -1 +0,0 @@
#lang racket

View File

@ -1,49 +0,0 @@
#lang racket
(require "../lib/pqueue.rkt"
"../lib/db.rkt"
"scm.rkt")
(define-syntax-rule (atomic e ...)
(begin e ...))
(define (main . argv)
(define push-queue (make-parameter #f))
(define the-db (make-parameter #f))
(define repo (make-parameter #f))
(define monitoring-interval (make-parameter 60))
(command-line
#:program "monitor"
#:argv argv
#:once-each
[("--interval") num "Monitoring interval" (monitoring-interval (string->number num))]
[("--repo") dir "Local Git repository" (repo (string->path dir))]
[("--pushes") dir "Persistent queue of pushes" (push-queue dir)]
[("--db") spec "Specification of database" (the-db spec)])
; Setup the queue to receive push information
(define pushes (pqueue (push-queue)))
(pqueue-init! pushes)
(define db (db-connect (the-db)))
; While true
(let loop ()
; Read the short term database to find out what push we're at
(define current (db-ref db "monitor" "last-push"))
; Update the git repository
(git-update (repo))
; Check the online push counter
(for ([new (in-list (git-pushes-after current))])
; Get the information about a push
(define push-info (get-git-push (repo) new))
(atomic
; Add it to the queue
(pqueue-enqueue! pushes push-info)
; Add it to the long term database
(db-set! db push-info "push-info" new)
; Update the latest push in the short term database
(db-set! db new "monitor" "last-push")))
; Wait
(sleep (monitoring-interval))
(loop))
(db-close! db))
(provide main)

View File

@ -1,136 +0,0 @@
#lang racket/base
(require net/url
racket/system
racket/function
racket/list
racket/match
racket/port
racket/contract
"../lib/scm.rkt")
(define git-path (find-executable-path "git"))
(define git-url-base "http://git.racket-lang.org/plt.git")
(define (get-newest-push)
(string->number (port->string (get-pure-port (string->url (format "~a/push-counter" git-url-base))))))
(define (pad2zeros n)
(format "~a~a"
(if (n . < . 10)
"0" "")
(number->string n)))
(struct push-data (who end-commit branches) #:prefab)
(define (get-push-data push-n)
(define push-n100s (quotient push-n 100))
(define push-nrem (pad2zeros (modulo push-n 100)))
(define ls
(port->lines
(get-pure-port
(string->url
(format "~a/pushes/~a/~a" git-url-base push-n100s push-nrem)))))
(match ls
[(list (regexp #rx"^([^ ]+) +([0-9abcdef]+)$" (list _ who end-commit))
(regexp #rx"^([0-9abcdef]+) +([0-9abcdef]+) +(.+)$" (list _ bstart bend branch))
...)
(push-data who end-commit
(make-immutable-hash
(map (lambda (b bs be) (cons b (vector bs be)))
branch bstart bend)))]
[_
#f]))
(define (close-input-port* p)
(when p (close-input-port p)))
(define (close-output-port* p)
(when p (close-output-port p)))
(define (system/output-port #:k k #:stdout [init-stdout #f] . as)
(define-values (sp stdout stdin stderr)
(apply subprocess init-stdout #f #f as))
(begin0 (k stdout)
(subprocess-wait sp)
(subprocess-kill sp #t)
(close-input-port* stdout)
(close-output-port* stdin)
(close-input-port* stderr)))
(define (read-until-empty-line in-p)
(let loop ()
(let ([l (read-line in-p)])
(cond
[(eof-object? l)
(close-input-port in-p)
empty]
[(string=? l "")
empty]
[else
(list* (regexp-replace #rx"^ +" l "") (loop))]))))
(define (read-commit in-p)
(match (read-line in-p)
[(? eof-object?)
#f]
[(regexp #rx"^commit +(.+)$" (list _ hash))
(match (read-line in-p)
[(regexp #rx"^Merge: +(.+) +(.+)$" (list _ from to))
(match-define (regexp #rx"^Author: +(.+)$" (list _ author)) (read-line in-p))
(match-define (regexp #rx"^Date: +(.+)$" (list _ date)) (read-line in-p))
(define _1 (read-line in-p))
(define msg (read-until-empty-line in-p))
(git-merge hash author date msg from to)]
[(regexp #rx"^Author: +(.+)$" (list _ author))
(match-define (regexp #rx"^Date: +(.+)$" (list _ date)) (read-line in-p))
(define _1 (read-line in-p))
(define msg (read-until-empty-line in-p))
(define mfiles (read-until-empty-line in-p))
(git-diff hash author date msg mfiles)])]))
(define master-branch "refs/heads/master")
(define (git-pushes-after cur-rev)
(define newest-rev (get-newest-push))
(for/list ([rev (in-range (add1 cur-rev) (add1 newest-rev))]
#:when
(let ([info (get-push-data rev)])
(and info (hash-has-key? (push-data-branches info) master-branch))))
rev))
(define (git-update repo)
(parameterize ([current-directory repo])
(system* git-path "fetch" git-url-base))
(void))
(define (read-commits in-p)
(cond
[(port-closed? in-p)
empty]
[(read-commit in-p)
=> (lambda (c)
(printf "~S\n" c)
(list* c (read-commits in-p)))]
[else
empty]))
(define (parse-push repo num author in-p)
(define commits (read-commits in-p))
(define start (git-commit-hash (last commits)))
(define previous-commit
(parameterize ([current-directory repo])
(system/output-port
#:k (λ (port) (read-line port))
git-path "--no-pager" "log" "--format=format:%P" start "-1")))
(git-push num author previous-commit commits))
(define (get-git-push repo rev)
(match-define (push-data who _ branches) (get-push-data rev))
(match-define (vector start-commit end-commit) (hash-ref branches master-branch))
(parameterize ([current-directory repo])
(system/output-port
#:k (curry parse-push repo rev who)
git-path
"--no-pager" "log" "--date=iso" "--name-only" "--no-merges"
(format "~a..~a" start-commit end-commit))))
(provide/contract
[git-pushes-after (exact-nonnegative-integer? . -> . (listof exact-nonnegative-integer?))]
[git-update (path? . -> . void?)]
[get-git-push (path? exact-nonnegative-integer? . -> . git-push?)])

View File

@ -1 +0,0 @@
#lang racket

View File

@ -1 +0,0 @@
#lang racket

View File

@ -1,111 +0,0 @@
#lang racket/base
(require mzlib/thread
unstable/match
racket/match
racket/port
racket/sandbox)
(provide (all-defined-out))
(define (write-output-bytes obs op)
(define bs (get-output-bytes obs))
(write (bytes-length bs) op)
(write-bytes bs op))
(define (handle-one-msg password log! ip op authenticated?)
(define (is-authenticated? x) authenticated?)
(match (with-handlers ([exn? (λ (x) x)]) (read ip))
[(? is-authenticated? (list 'run (? number? timeout) (? path-string? command) (? string? arg) ...))
(call-with-custodian-shutdown
(λ ()
(define stdout-obs (open-output-bytes 'stdout))
(define stderr-obs (open-output-bytes 'stderr))
(define info (list* command arg))
(log! "Running with timeout (~a) ~S" timeout info)
(define start-time (current-inexact-milliseconds))
(define-values (sp stdout stdin stderr) (apply subprocess #f #f #f command arg))
(close-output-port stdin)
(define stdout-t
(thread (λ () (copy-port stdout stdout-obs))))
(define stderr-t
(thread (λ () (copy-port stderr stderr-obs))))
(define exit-status
(sync
(handle-evt sp
(λ _
(subprocess-status sp)))
(handle-evt (alarm-evt (+ start-time (* 1000 timeout)))
(λ _
(subprocess-kill sp #f)
(subprocess-kill sp #t)
#f))))
(define end-time (current-inexact-milliseconds))
(log! "Finished running ~S, status was ~a" info exit-status)
(thread-wait stdout-t)
(thread-wait stderr-t)
(close-input-port stdout)
(close-input-port stderr)
(write (vector start-time end-time exit-status) op)
(write-output-bytes stdout-obs op)
(write-output-bytes stderr-obs op)))
authenticated?]
[(list 'auth (== password string=?))
(log! "Authenticated")
(write #t op)
#t]
[(? eof-object?)
(log! "Master disconnect")
(void)]
[x
(log! "Illegal message: ~e" x)
(write #f op)
authenticated?]))
(define (call-with-safe-read t)
(parameterize
([read-case-sensitive #t]
[read-square-bracket-as-paren #t]
[read-curly-brace-as-paren #t]
[read-accept-box #f]
[read-accept-compiled #f]
[read-accept-bar-quote #f]
[read-accept-graph #f]
[read-decimal-as-inexact #t]
[read-accept-dot #f]
[read-accept-infix-dot #f]
[read-accept-quasiquote #f]
[read-accept-reader #f])
(t)))
(define (handle ip op password log!)
(call-with-safe-read
(λ ()
(let loop ([authenticated? #f])
(match (handle-one-msg password log! ip op authenticated?)
[(? void?) (void)]
[authenticated? (loop authenticated?)])))))
(define (port-closing-curry f . args)
(λ (ip op)
(dynamic-wind
void
(λ () (apply f ip op args))
(λ ()
(close-input-port ip)
(close-output-port op)))))
(define (main)
; XXX commandline
(define port 4532)
(define *password* "foo")
; XXX make web server to view recent things
(define (log! fmt . vals)
(apply printf fmt vals))
; XXX use ssl
(run-server
port
(port-closing-curry handle *password* log!)
#f))

View File

@ -1,32 +0,0 @@
#lang racket
(require tests/eli-tester
"../lib/db.rkt")
(test
(local [(define db (db-connect "localhost:27017:test" #:init? #t))
(struct num (n) #:prefab)]
(test
(for ([i (in-range 10)])
(test
(for ([j (in-range 10)])
(test
(db-set! db (num (+ i j)) (number->string i) (number->string j))))
(sort (db-list db (number->string i)) string<=?) =>
(for/list ([j (in-range 10)]) (number->string j))
(for ([j (in-range 10)])
(test
(db-ref db (number->string i) (number->string j)) =>
(num (+ i j))))
(for ([j (in-range 10)])
(test
(db-set! db (num (* i j)) (number->string i) (number->string j))))
(for ([j (in-range 10)])
(test
(db-ref db (number->string i) (number->string j)) =>
(num (* i j))))))
(db-close! db))))

View File

@ -1,18 +0,0 @@
#lang racket
(require tests/eli-tester
"../lib/pqueue.rkt")
(define N 10)
(test
(local [(define pq
(pqueue (make-temporary-file "tmp~a" 'directory)))]
(test (pqueue-init! pq)
(for ([i (in-range N)])
(pqueue-enqueue! pq i))
(for/list ([i (in-range N)])
(pqueue-dequeue! pq))
=>
(for/list ([i (in-range N)]) i))))

View File

@ -1,108 +0,0 @@
#lang racket
(require "../slave/slave.rkt"
tests/eli-tester)
(define (test-handle-one-msg
password m authenticated?
expected-authenticated? expected-log expected-bs-rx)
(define-values (ip-read ip-write) (make-pipe))
(define op (open-output-bytes))
(define log empty)
(define (log! fmt . args)
(set! log (cons (apply format fmt args) log)))
(when m
(write m ip-write))
(close-output-port ip-write)
(define new-authenticated?
(handle-one-msg password log! ip-read op authenticated?))
(define new-log
(reverse log))
(define new-bs
(get-output-bytes op))
(test #:failure-prefix (format "~S" (list password m authenticated?))
(test new-authenticated? => expected-authenticated?
new-log => expected-log
(regexp-match expected-bs-rx new-bs))))
(test
; write-output-bytes
(local [(define obs1 (open-output-bytes))
(define obs2 (open-output-bytes))]
(test
(display "123" obs1)
(write-output-bytes obs1 obs2)
(close-output-port obs1)
(close-output-port obs2)
(get-output-bytes obs2) => #"3123"))
; handle-one-msg
(test-handle-one-msg "foo" '(auth "foo") #t
#t '("Authenticated") #"#t")
(test-handle-one-msg "foo" '(auth "foo") #f
#t '("Authenticated") #"#t")
(test-handle-one-msg "foo" '(auth "bar") #t
#t '("Illegal message: '(auth \"bar\")") #"#f")
(test-handle-one-msg "foo" '(auth "bar") #f
#f '("Illegal message: '(auth \"bar\")") #"#f")
(test-handle-one-msg "foo" #f #f
(void) '("Master disconnect") #"")
(test-handle-one-msg "foo" '(run 10 "/bin/echo" "foo") #f
#f '("Illegal message: '(run 10 \"/bin/echo\" \"foo\")") #"#f")
(test-handle-one-msg "foo" '(run 10 "/bin/echo" "foo") #t
#t
'("Running with timeout (10) (\"/bin/echo\" \"foo\")" "Finished running (\"/bin/echo\" \"foo\"), status was 0")
#rx"#\\([0-9]+\\.[0-9]+ [0-9]+\\.[0-9]+ 0\\)4foo\n0")
(test-handle-one-msg "foo" '(run 0 "/bin/echo" "foo") #t
#t
'("Running with timeout (0) (\"/bin/echo\" \"foo\")" "Finished running (\"/bin/echo\" \"foo\"), status was #f")
#rx"#\\([0-9]+\\.[0-9]+ [0-9]+\\.[0-9]+ #f\\)00")
; call-with-safe-read
(call-with-safe-read (λ () (read (open-input-string "(run 10 \"/bin/echo\" \"foo\")"))))
=>
'(run 10 "/bin/echo" "foo")
(call-with-safe-read (λ () (read (open-input-string "(auth \"foo\")"))))
=>
'(auth "foo")
(call-with-safe-read (λ () (read (open-input-string ""))))
=>
eof
(call-with-safe-read (λ () (read (open-input-string "(auth #&\"foo\")"))))
=>
(error 'read "#& expressions not currently enabled")
(call-with-safe-read (λ () (read (open-input-string "(auth #~\"foo\")"))))
=>
(error 'read "#~~ compiled expressions not currently enabled")
(call-with-safe-read (λ () (read (open-input-string "#0='(3 #0#)"))))
=>
(error 'read "#..= expressions not currently enabled")
; call-with-safe-read + handle-one-msg
(call-with-safe-read
(λ ()
(test-handle-one-msg "foo" `(auth ,(box "bar")) #f
#f
'("Illegal message: (exn:fail:read \"read: #& expressions not currently enabled\" #<continuation-mark-set> (list (srcloc #f #f #f 7 2)))")
#"#f")))
; XXX handle
; XXX port-closing-curry
; XXX main
)