;;; -*- mode: scheme; coding: utf-8; -*- ;;; ;;; Copyright (C) 2010, 2011, 2012 Free Software Foundation, Inc. ;;; ;;; This library is free software; you can redistribute it and/or ;;; modify it under the terms of the GNU Lesser General Public ;;; License as published by the Free Software Foundation; either ;;; version 3 of the License, or (at your option) any later version. ;;; ;;; This library is distributed in the hope that it will be useful, ;;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;;; Lesser General Public License for more details. ;;; ;;; You should have received a copy of the GNU Lesser General Public ;;; License along with this library; if not, write to the Free Software ;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA (define-module (ice-9 futures) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-11) #:use-module (ice-9 q) #:use-module (ice-9 match) #:use-module (ice-9 control) #:use-module (ice-9 threads) #:export (future make-future future? touch)) ;;; Author: Ludovic Courtès <ludo@gnu.org> ;;; ;;; Commentary: ;;; ;;; This module provides an implementation of futures, a mechanism for ;;; fine-grain parallelism. Futures were first described by Henry Baker ;;; in ``The Incremental Garbage Collection of Processes'', 1977, and ;;; then implemented in MultiLisp (an implicit variant thereof, i.e., ;;; without `touch'.) ;;; ;;; This modules uses a fixed thread pool, normally one per CPU core. ;;; Futures are off-loaded to these threads, when they are idle. ;;; ;;; Code: ;;; ;;; Futures. ;;; (define-record-type <future> (%make-future thunk state mutex completion) future? (thunk future-thunk set-future-thunk!) (state future-state set-future-state!) ; done | started | queued (result future-result set-future-result!) (mutex future-mutex) (completion future-completion)) ; completion cond. var. (set-record-type-printer! <future> (lambda (future port) (simple-format port "#<future ~a ~a ~s>" (number->string (object-address future) 16) (future-state future) (future-thunk future)))) (define (make-future thunk) "Return a new future for THUNK. Execution may start at any point concurrently, or it can start at the time when the returned future is touched." (create-workers!) (let ((future (%make-future thunk 'queued (make-mutex) (make-condition-variable)))) (register-future! future) future)) ;;; ;;; Future queues. ;;; ;; Global queue of pending futures. ;; TODO: Use per-worker queues to reduce contention. (define %futures (make-q)) ;; Lock for %FUTURES and %FUTURES-WAITING. (define %futures-mutex (make-mutex)) (define %futures-available (make-condition-variable)) ;; A mapping of nested futures to futures waiting for them to complete. (define %futures-waiting '()) ;; Whether currently running within a future. (define %within-future? (make-parameter #f)) (define-syntax-rule (with-mutex m e0 e1 ...) ;; Copied from (ice-9 threads) to avoid circular dependency. (let ((x m)) (dynamic-wind (lambda () (lock-mutex x)) (lambda () (begin e0 e1 ...)) (lambda () (unlock-mutex x))))) (define %future-prompt ;; The prompt futures abort to when they want to wait for another ;; future. (make-prompt-tag)) (define (register-future! future) ;; Register FUTURE as being processable. (lock-mutex %futures-mutex) (enq! %futures future) (signal-condition-variable %futures-available) (unlock-mutex %futures-mutex)) (define (process-future! future) "Process FUTURE. When FUTURE completes, return #t and update its result; otherwise, when FUTURE touches a nested future that has not completed yet, then suspend it and return #f. Suspending a future consists in capturing its continuation, marking it as `queued', and adding it to the waiter queue." (let/ec return (let* ((suspend (lambda (cont future-to-wait) ;; FUTURE wishes to wait for the completion of FUTURE-TO-WAIT. ;; At this point, FUTURE is unlocked and in `started' state, ;; and FUTURE-TO-WAIT is unlocked. (with-mutex %futures-mutex (with-mutex (future-mutex future) (set-future-thunk! future cont) (set-future-state! future 'queued)) (with-mutex (future-mutex future-to-wait) ;; If FUTURE-TO-WAIT completed in the meantime, then ;; reschedule FUTURE directly; otherwise, add it to the ;; waiter queue. (if (eq? 'done (future-state future-to-wait)) (begin (enq! %futures future) (signal-condition-variable %futures-available)) (set! %futures-waiting (alist-cons future-to-wait future %futures-waiting)))) (return #f)))) (thunk (lambda () (call-with-prompt %future-prompt (lambda () (parameterize ((%within-future? #t)) ((future-thunk future)))) suspend)))) (set-future-result! future (catch #t (lambda () (call-with-values thunk (lambda results (lambda () (apply values results))))) (lambda args (lambda () (apply throw args))))) #t))) (define (process-one-future) "Attempt to pick one future from the queue and process it." ;; %FUTURES-MUTEX must be locked on entry, and is locked on exit. (or (q-empty? %futures) (let ((future (deq! %futures))) (lock-mutex (future-mutex future)) (case (future-state future) ((done started) ;; Nothing to do. (unlock-mutex (future-mutex future))) (else ;; Do the actual work. ;; We want to release %FUTURES-MUTEX so that other workers can ;; progress. However, to avoid deadlocks, we have to unlock ;; FUTURE as well, to preserve lock ordering. (unlock-mutex (future-mutex future)) (unlock-mutex %futures-mutex) (lock-mutex (future-mutex future)) (if (eq? (future-state future) 'queued) ; lost the race? (begin ; no, so let's process it (set-future-state! future 'started) (unlock-mutex (future-mutex future)) (let ((done? (process-future! future))) (when done? (with-mutex %futures-mutex (with-mutex (future-mutex future) (set-future-state! future 'done) (notify-completion future)))))) (unlock-mutex (future-mutex future))) ; yes (lock-mutex %futures-mutex)))))) (define (process-futures) "Continuously process futures from the queue." (lock-mutex %futures-mutex) (let loop () (when (q-empty? %futures) (wait-condition-variable %futures-available %futures-mutex)) (process-one-future) (loop))) (define (notify-completion future) "Notify futures and callers waiting that FUTURE completed." ;; FUTURE and %FUTURES-MUTEX are locked. (broadcast-condition-variable (future-completion future)) (let-values (((waiting remaining) (partition (match-lambda ; TODO: optimize ((waitee . _) (eq? waitee future))) %futures-waiting))) (set! %futures-waiting remaining) (for-each (match-lambda ((_ . waiter) (enq! %futures waiter))) waiting))) (define (touch future) "Return the result of FUTURE, computing it if not already done." (define (work) ;; Do some work while waiting for FUTURE to complete. (lock-mutex %futures-mutex) (if (q-empty? %futures) (begin (unlock-mutex %futures-mutex) (with-mutex (future-mutex future) (unless (eq? 'done (future-state future)) (wait-condition-variable (future-completion future) (future-mutex future))))) (begin (process-one-future) (unlock-mutex %futures-mutex)))) (let loop () (lock-mutex (future-mutex future)) (case (future-state future) ((done) (unlock-mutex (future-mutex future))) ((started) (unlock-mutex (future-mutex future)) (if (%within-future?) (abort-to-prompt %future-prompt future) (begin (work) (loop)))) (else (unlock-mutex (future-mutex future)) (work) (loop)))) ((future-result future))) ;;; ;;; Workers. ;;; (define %worker-count (if (provided? 'threads) (- (current-processor-count) 1) 0)) ;; A dock of workers that stay here forever. ;; TODO ;; 1. Allow the pool to be shrunk, as in libgomp (though that we'd ;; need semaphores, which aren't yet in libguile!). ;; 2. Provide a `worker-count' fluid. (define %workers '()) (define (%create-workers!) (with-mutex %futures-mutex ;; Setting 'create-workers!' to a no-op is an optimization, but it is ;; still possible for '%create-workers!' to be called more than once ;; from different threads. Therefore, to avoid creating %workers more ;; than once (and thus creating too many threads), we check to make ;; sure %workers is empty within the critical section. (when (null? %workers) (set! %workers (unfold (lambda (i) (>= i %worker-count)) (lambda (i) (call-with-new-thread process-futures)) 1+ 0)) (set! create-workers! (lambda () #t))))) (define create-workers! (lambda () (%create-workers!))) ;;; ;;; Syntax. ;;; (define-syntax-rule (future body) "Return a new future for BODY." (make-future (lambda () body))) ;;; Local Variables: ;;; eval: (put 'with-mutex 'scheme-indent-function 1) ;;; End:
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
peg | Folder | 0755 |
|
|
and-let-star.scm | File | 2.53 KB | 0644 |
|
arrays.scm | File | 2.63 KB | 0644 |
|
atomic.scm | File | 1.55 KB | 0644 |
|
binary-ports.scm | File | 1.99 KB | 0644 |
|
boot-9.scm | File | 143.94 KB | 0644 |
|
buffered-input.scm | File | 4.82 KB | 0644 |
|
calling.scm | File | 10.54 KB | 0644 |
|
channel.scm | File | 5.19 KB | 0644 |
|
command-line.scm | File | 18.2 KB | 0644 |
|
common-list.scm | File | 8.95 KB | 0644 |
|
control.scm | File | 4.08 KB | 0644 |
|
curried-definitions.scm | File | 1.79 KB | 0644 |
|
debug.scm | File | 1.09 KB | 0644 |
|
deprecated.scm | File | 2.95 KB | 0644 |
|
documentation.scm | File | 7.41 KB | 0644 |
|
eval-string.scm | File | 2.99 KB | 0644 |
|
eval.scm | File | 25.12 KB | 0644 |
|
expect.scm | File | 5.5 KB | 0644 |
|
fdes-finalizers.scm | File | 1.06 KB | 0644 |
|
format.scm | File | 74.37 KB | 0644 |
|
ftw.scm | File | 24.17 KB | 0644 |
|
futures.scm | File | 10.49 KB | 0644 |
|
gap-buffer.scm | File | 10.14 KB | 0644 |
|
getopt-long.scm | File | 16.49 KB | 0644 |
|
hash-table.scm | File | 1.77 KB | 0644 |
|
hcons.scm | File | 2.55 KB | 0644 |
|
history.scm | File | 2.29 KB | 0644 |
|
i18n.scm | File | 20.51 KB | 0644 |
|
iconv.scm | File | 3.65 KB | 0644 |
|
lineio.scm | File | 3.85 KB | 0644 |
|
list.scm | File | 1.29 KB | 0644 |
|
local-eval.scm | File | 9.96 KB | 0644 |
|
ls.scm | File | 3.2 KB | 0644 |
|
mapping.scm | File | 4.84 KB | 0644 |
|
match.scm | File | 2 KB | 0644 |
|
match.upstream.scm | File | 35.92 KB | 0644 |
|
networking.scm | File | 3.33 KB | 0644 |
|
null.scm | File | 1.13 KB | 0644 |
|
occam-channel.scm | File | 7.26 KB | 0644 |
|
optargs.scm | File | 15.75 KB | 0644 |
|
peg.scm | File | 1.64 KB | 0644 |
|
poe.scm | File | 3.3 KB | 0644 |
|
poll.scm | File | 5.79 KB | 0644 |
|
popen.scm | File | 6.82 KB | 0644 |
|
ports.scm | File | 18.89 KB | 0644 |
|
posix.scm | File | 2.73 KB | 0644 |
|
pretty-print.scm | File | 16.88 KB | 0644 |
|
psyntax-pp.scm | File | 180.55 KB | 0644 |
|
psyntax.scm | File | 148.7 KB | 0644 |
|
q.scm | File | 4.2 KB | 0644 |
|
quasisyntax.scm | File | 5.22 KB | 0644 |
|
r5rs.scm | File | 1.56 KB | 0644 |
|
r6rs-libraries.scm | File | 9.43 KB | 0644 |
|
rdelim.scm | File | 7.72 KB | 0644 |
|
readline.scm | File | 9.56 KB | 0644 |
|
receive.scm | File | 1.06 KB | 0644 |
|
regex.scm | File | 8.87 KB | 0644 |
|
runq.scm | File | 8.18 KB | 0644 |
|
rw.scm | File | 1.02 KB | 0644 |
|
safe-r5rs.scm | File | 3.72 KB | 0644 |
|
safe.scm | File | 1.25 KB | 0644 |
|
sandbox.scm | File | 34.23 KB | 0644 |
|
save-stack.scm | File | 2.15 KB | 0644 |
|
scm-style-repl.scm | File | 11.62 KB | 0644 |
|
serialize.scm | File | 3.78 KB | 0644 |
|
session.scm | File | 17.72 KB | 0644 |
|
slib.scm | File | 1.55 KB | 0644 |
|
stack-catch.scm | File | 1.94 KB | 0644 |
|
streams.scm | File | 5.86 KB | 0644 |
|
string-fun.scm | File | 8.59 KB | 0644 |
|
suspendable-ports.scm | File | 29.87 KB | 0644 |
|
syncase.scm | File | 1.52 KB | 0644 |
|
textual-ports.scm | File | 2.29 KB | 0644 |
|
threads.scm | File | 12.54 KB | 0644 |
|
time.scm | File | 2.07 KB | 0644 |
|
top-repl.scm | File | 2.75 KB | 0644 |
|
unicode.scm | File | 1005 B | 0644 |
|
vlist.scm | File | 21.56 KB | 0644 |
|
weak-vector.scm | File | 1.2 KB | 0644 |
|