Following system colour scheme Selected dark colour scheme Selected light colour scheme

Python Enhancement Proposals

PEP 3148 – futures - Berechnungen asynchron ausführen

Autor:
Brian Quinlan <brian at sweetapp.com>
Status:
Final
Typ:
Standards Track
Erstellt:
16. Okt 2009
Python-Version:
3.2
Post-History:


Inhaltsverzeichnis

Zusammenfassung

Diese PEP schlägt ein Design für ein Paket vor, das die Auswertung von Aufrufbaren mithilfe von Threads und Prozessen erleichtert.

Motivation

Python verfügt derzeit über leistungsstarke Primitiven zum Erstellen von Multithreading- und Multi-Prozess-Anwendungen, aber die Parallelisierung einfacher Operationen erfordert viel Aufwand, z. B. das explizite Starten von Prozessen/Threads, das Erstellen einer Arbeits-/Ergebniswarteschlange und das Warten auf den Abschluss oder eine andere Abbruchbedingung (z. B. Fehler, Zeitüberschreitung). Es ist auch schwierig, eine Anwendung mit einer globalen Prozess-/Thread-Grenze zu entwerfen, wenn jede Komponente ihre eigene parallele Ausführungsstrategie entwickelt.

Spezifikation

Benennung

Das vorgeschlagene Paket würde „futures“ heißen und in einem neuen „concurrent“-Top-Level-Paket leben. Die Begründung für die Verschiebung der Futures-Bibliothek in einen „concurrent“-Namensraum hat mehrere Komponenten. Die erste, einfachste ist, jegliche Verwirrung mit dem bestehenden Idiom „from __future__ import x“ zu vermeiden, das seit langem in Python verwendet wird. Darüber hinaus wird davon ausgegangen, dass die Hinzufügung des „concurrent“-Präfixes zum Namen vollständig angibt, worauf sich die Bibliothek bezieht – nämlich auf Nebenläufigkeit. Dies sollte jede zusätzliche Mehrdeutigkeit beseitigen, da festgestellt wurde, dass nicht jeder in der Community mit Java Futures oder dem Begriff Futures außer im Zusammenhang mit dem US-Aktienmarkt vertraut ist.

Schließlich schaffen wir einen neuen Namensraum für die Standardbibliothek – natürlich „concurrent“ genannt. Wir hoffen, in Zukunft weitere nebenläufigkeitsbezogene Bibliotheken hinzuzufügen oder bestehende dorthin zu verschieben. Ein Hauptbeispiel hierfür ist die Arbeit von multiprocessing.Pool sowie andere „Add-ons“, die in diesem Modul enthalten sind und sowohl Thread- als auch Prozessgrenzen überschreiten.

Interface

Das vorgeschlagene Paket bietet zwei Kernklassen: Executor und Future. Ein Executor empfängt asynchrone Arbeitsanfragen (in Form eines Aufrufbaren und seiner Argumente) und gibt ein Future zurück, um die Ausführung dieser Arbeitsanfrage darzustellen.

Executor

Executor ist eine abstrakte Klasse, die Methoden zur asynchronen Ausführung von Aufrufen bereitstellt.

submit(fn, *args, **kwargs)

Plant den Aufrufbaren zur Ausführung als fn(*args, **kwargs) und gibt eine Future-Instanz zurück, die die Ausführung des Aufrufbaren darstellt.

Dies ist eine abstrakte Methode und muss von Executor-Unterklassen implementiert werden.

map(func, *iterables, timeout=None)

Entspricht map(func, *iterables), aber func wird asynchron ausgeführt und mehrere Aufrufe von func können gleichzeitig erfolgen. Der zurückgegebene Iterator löst einen TimeoutError aus, wenn __next__() aufgerufen wird und das Ergebnis nicht innerhalb von timeout Sekunden nach dem ursprünglichen Aufruf von map() verfügbar ist. Wenn timeout nicht angegeben oder None ist, gibt es keine Wartezeitbegrenzung. Wenn ein Aufruf eine Ausnahme auslöst, wird diese Ausnahme ausgelöst, wenn ihr Wert aus dem Iterator abgerufen wird.

shutdown(wait=True)

Signalisiert dem Executor, dass er alle verwendeten Ressourcen freigeben soll, sobald die aktuell ausstehenden Futures ausgeführt sind. Aufrufe von Executor.submit und Executor.map, die nach shutdown erfolgen, lösen einen RuntimeError aus.

Wenn wait True ist, gibt diese Methode erst zurück, wenn alle ausstehenden Futures ausgeführt sind und die dem Executor zugeordneten Ressourcen freigegeben wurden. Wenn wait False ist, gibt diese Methode sofort zurück und die dem Executor zugeordneten Ressourcen werden freigegeben, sobald alle ausstehenden Futures ausgeführt sind. Unabhängig vom Wert von wait wird das gesamte Python-Programm erst beendet, wenn alle ausstehenden Futures ausgeführt sind.

__enter__()
__exit__(exc_type, exc_val, exc_tb)
Wenn ein Executor als Kontextmanager verwendet wird, ruft __exit__ Executor.shutdown(wait=True) auf.

ProcessPoolExecutor

Die Klasse ProcessPoolExecutor ist eine Executor-Unterklasse, die einen Pool von Prozessen zur asynchronen Ausführung von Aufrufen verwendet. Die an ProcessPoolExecutor.submit übergebenen Aufrufbaren und Argumente müssen gemäß den gleichen Einschränkungen wie das Modul `multiprocessing` picklebar sein.

Aufrufe von Executor- oder Future-Methoden innerhalb eines an einen ProcessPoolExecutor übergebenen Aufrufbaren führen zu einem Deadlock.

__init__(max_workers)

Führt Aufrufe asynchron mithilfe eines Pools von höchstens max_workers Prozessen aus. Wenn max_workers None ist oder nicht angegeben wird, werden so viele Worker-Prozesse erstellt, wie der Rechner Prozessoren hat.

ThreadPoolExecutor

Die Klasse ThreadPoolExecutor ist eine Executor-Unterklasse, die einen Pool von Threads zur asynchronen Ausführung von Aufrufen verwendet.

Es kann zu Deadlocks kommen, wenn der Aufrufbare, der mit einem Future verbunden ist, auf die Ergebnisse eines anderen Future wartet. Zum Beispiel

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

Und

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

__init__(max_workers)

Führt Aufrufe asynchron mithilfe eines Pools von höchstens max_workers Threads aus.

Future-Objekte

Die Klasse Future kapselt die asynchrone Ausführung eines Aufrufbaren. Future-Instanzen werden von Executor.submit zurückgegeben.

cancel()

Versucht, den Aufruf abzubrechen. Wenn der Aufruf gerade ausgeführt wird, kann er nicht abgebrochen werden und die Methode gibt False zurück, andernfalls wird der Aufruf abgebrochen und die Methode gibt True zurück.

cancelled()

Gibt True zurück, wenn der Aufruf erfolgreich abgebrochen wurde.

running()

Gibt True zurück, wenn der Aufruf gerade ausgeführt wird und nicht abgebrochen werden kann.

done()

Gibt True zurück, wenn der Aufruf erfolgreich abgebrochen oder die Ausführung beendet wurde.

result(timeout=None)

Gibt den vom Aufruf zurückgegebenen Wert zurück. Wenn der Aufruf noch nicht abgeschlossen ist, wartet diese Methode bis zu timeout Sekunden. Wenn der Aufruf in timeout Sekunden nicht abgeschlossen ist, wird ein TimeoutError ausgelöst. Wenn timeout nicht angegeben oder None ist, gibt es keine Wartezeitbegrenzung.

Wenn das Future vor dem Abschluss abgebrochen wird, wird CancelledError ausgelöst.

Wenn der Aufruf eine Ausnahme ausgelöst hat, löst diese Methode dieselbe Ausnahme aus.

exception(timeout=None)

Gibt die vom Aufruf ausgelöste Ausnahme zurück. Wenn der Aufruf noch nicht abgeschlossen ist, wartet diese Methode bis zu timeout Sekunden. Wenn der Aufruf in timeout Sekunden nicht abgeschlossen ist, wird ein TimeoutError ausgelöst. Wenn timeout nicht angegeben oder None ist, gibt es keine Wartezeitbegrenzung.

Wenn das Future vor dem Abschluss abgebrochen wird, wird CancelledError ausgelöst.

Wenn der Aufruf ohne Ausnahme abgeschlossen wurde, wird None zurückgegeben.

add_done_callback(fn)

Hängt eine aufrufbare Funktion fn an das Future an, die aufgerufen wird, wenn das Future abgebrochen wird oder seine Ausführung beendet hat. fn wird mit dem Future als einzigem Argument aufgerufen.

Hinzugefügte Aufrufbare werden in der Reihenfolge aufgerufen, in der sie hinzugefügt wurden, und immer in einem Thread aufgerufen, der zum Prozess gehört, der sie hinzugefügt hat. Wenn der Aufrufbare eine Exception auslöst, wird diese protokolliert und ignoriert. Wenn der Aufrufbare eine andere BaseException auslöst, ist das Verhalten undefiniert.

Wenn das Future bereits abgeschlossen oder abgebrochen wurde, wird fn sofort aufgerufen.

Interne Future-Methoden

Die folgenden Future-Methoden sind für die Verwendung in Unit-Tests und Executor-Implementierungen gedacht.

set_running_or_notify_cancel()

Sollte von Executor-Implementierungen aufgerufen werden, bevor die dem Future zugeordnete Arbeit ausgeführt wird.

Wenn die Methode False zurückgibt, wurde das Future abgebrochen, d. h. Future.cancel wurde aufgerufen und gab True zurück. Alle Threads, die auf den Abschluss des Future warten (d. h. über as_completed() oder wait()), werden geweckt.

Wenn die Methode True zurückgibt, wurde das Future nicht abgebrochen und wurde in den Zustand „running“ versetzt, d. h. Aufrufe von Future.running() geben True zurück.

Diese Methode kann nur einmal aufgerufen werden und kann nicht aufgerufen werden, nachdem Future.set_result() oder Future.set_exception() aufgerufen wurden.

set_result(result)

Setzt das Ergebnis der dem Future zugeordneten Arbeit.

set_exception(exception)

Setzt das Ergebnis der dem Future zugeordneten Arbeit auf die gegebene Exception.

Modulfunktionen

wait(fs, timeout=None, return_when=ALL_COMPLETED)

Wartet auf den Abschluss der Future-Instanzen (möglicherweise erstellt von verschiedenen Executor-Instanzen), die durch fs gegeben sind. Gibt ein benanntes 2-Tupel von Mengen zurück. Die erste Menge, genannt „done“, enthält die Futures, die vor Abschluss des Wartens abgeschlossen wurden (fertig oder abgebrochen). Die zweite Menge, genannt „not_done“, enthält nicht abgeschlossene Futures.

timeout kann verwendet werden, um die maximale Wartezeit in Sekunden zu steuern, bevor zurückgekehrt wird. Wenn timeout nicht angegeben oder None ist, gibt es keine Wartezeitbegrenzung.

return_when gibt an, wann die Methode zurückkehren soll. Es muss eine der folgenden Konstanten sein:

Konstante Description
FIRST_COMPLETED Die Methode kehrt zurück, wenn irgendein Future abgeschlossen oder abgebrochen wird.
FIRST_EXCEPTION Die Methode kehrt zurück, wenn irgendein Future eine Ausnahme auslöst. Wenn kein Future eine Ausnahme auslöst, ist sie äquivalent zu ALL_COMPLETED.
ALL_COMPLETED Die Methode kehrt zurück, wenn alle Aufrufe abgeschlossen sind.

as_completed(fs, timeout=None)

Gibt einen Iterator über die Future-Instanzen zurück, die durch fs gegeben sind, und liefert Futures, sobald sie abgeschlossen sind (fertig oder abgebrochen). Alle Futures, die vor dem Aufruf von as_completed() abgeschlossen wurden, werden zuerst geliefert. Der zurückgegebene Iterator löst einen TimeoutError aus, wenn __next__() aufgerufen wird und das Ergebnis nicht innerhalb von timeout Sekunden nach dem ursprünglichen Aufruf von as_completed() verfügbar ist. Wenn timeout nicht angegeben oder None ist, gibt es keine Wartezeitbegrenzung.

Die Future-Instanzen können von verschiedenen Executor-Instanzen erstellt worden sein.

Primzahlprüfung Beispiel

from concurrent import futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime,
                                                      PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Web-Crawler Beispiel

from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % (
                          url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % (
                          url, e))

if __name__ == '__main__':
    main()

Begründung

Das vorgeschlagene Design dieses Moduls wurde stark vom Java java.util.concurrent Paket [1] beeinflusst. Die konzeptionelle Grundlage des Moduls ist, wie in Java, die Future-Klasse, die den Fortschritt und das Ergebnis einer asynchronen Berechnung darstellt. Die Future-Klasse macht wenige Vorgaben bezüglich des verwendeten Ausführungsmodus, d. h. sie kann verwendet werden, um eine lazy oder eager Auswertung darzustellen, für die Auswertung mittels Threads, Prozesse oder Remote Procedure Call.

Futures werden von konkreten Implementierungen der Executor-Klasse (in Java ExecutorService genannt) erstellt. Die Referenzimplementierung bietet Klassen, die entweder einen Prozess- oder einen Thread-Pool verwenden, um Berechnungen eager auszuwerten.

Futures wurden bereits in Python als Teil eines beliebten Python-Cookbook-Rezepts [2] gesehen und auf der Python-3000-Mailingliste diskutiert [3].

Das vorgeschlagene Design ist explizit, d. h. es verlangt, dass Clients sich bewusst sind, dass sie Futures konsumieren. Es wäre möglich, ein Modul zu entwerfen, das Proxy-Objekte (im Stil von weakref) zurückgibt, die transparent verwendet werden könnten. Es ist möglich, eine Proxy-Implementierung auf Basis des vorgeschlagenen expliziten Mechanismus aufzubauen.

Das vorgeschlagene Design führt keine Änderungen an der Python-Sprachsyntax oder -Semantik ein. Eine spezielle Syntax könnte eingeführt werden [4], um Funktions- und Methodenaufrufe als asynchron zu markieren. Ein Proxy-Ergebnis würde zurückgegeben, während die Operation eager asynchron ausgewertet wird, und die Ausführung würde nur blockieren, wenn das Proxy-Objekt verwendet würde, bevor die Operation abgeschlossen ist.

Anh Hai Trinh schlug ein einfacheres, aber stärker eingeschränktes API-Konzept vor [5] und die API wurde im stdlib-sig-Forum im Detail diskutiert [6].

Das vorgeschlagene Design wurde auf der Python-Dev-Mailingliste diskutiert [7]. Nach diesen Diskussionen wurden folgende Änderungen vorgenommen:

  • Die Klasse Executor wurde zu einer abstrakten Basisklasse gemacht.
  • Die Methode Future.remove_done_callback wurde aufgrund fehlender überzeugender Anwendungsfälle entfernt.
  • Die Methode Future.add_done_callback wurde modifiziert, um den mehrfachen Hinzufügung desselben Aufrufbaren zu ermöglichen.
  • Die Mutationsmethoden der Future-Klasse wurden besser dokumentiert, um darauf hinzuweisen, dass sie privat für den Executor sind, der sie erstellt hat.

Referenzimplementierung

Die Referenzimplementierung [8] enthält eine vollständige Implementierung des vorgeschlagenen Designs. Sie wurde unter Linux und Mac OS X getestet.

Referenzen


Quelle: https://github.com/python/peps/blob/main/peps/pep-3148.rst

Zuletzt geändert: 2025-02-01 08:59:27 GMT