Du kannst mit
asyncio.all_tasks()
auf alle aktuell laufenden Tasks im Event-Loop zugreifen. Hier ist ein Beispiel bei dem alle Tasks mit ungeraden Parametern nach der ersten Ausgabe abgebrochen werden.
import asyncio
import time
async def say_hello(n):
print(f"Hello {n}...")
try:
await asyncio.sleep(2)
print(f"...World {n}!")
except asyncio.CancelledError:
print(f"Task {n} wurde abgebrochen.")
raise
async def main():
tasks = []
for i in range(1, 6):
task = asyncio.create_task(say_hello(i))
tasks.append((i, task))
await asyncio.sleep(0.1) # kurz warten, damit Tasks gestartet sind
# Ungerade Tasks abbrechen
for n, task in tasks:
if n % 2 == 1:
task.cancel()
# Alle Tasks abwarten (auch abgebrochene)
for _, task in tasks:
try:
await task
except asyncio.CancelledError:
pass # ignorieren, da erwartet
start = time.time()
asyncio.run(main())
print(f"Duration: {time.time() - start:.2f} Sekunden")