Concurrency

Change Topic

Some languages support multiple threads, thread-safe data structures and cross-thread communication. Some languages also support asynchronous invocation of functions that may be waiting for an external asynchronous operation (e.g. IO, external process).

  • Async/Await
  • External Async Operations
  • Multi-Threading
  • Atomic Operations and Mutexes
  • Semaphores and inter-thread communication
  • OS signal handlers
  • Shared Memory

Python Language

Error loading typescript


Async/Await

Async Functions: The Equivalent of Goroutines

Python provides the async def syntax for defining asynchronous functions. These functions must be awaited using the await keyword to execute properly.

import asyncio async def async_task(): print("Task started") await asyncio.sleep(2) # Simulate work print("Task finished") async def main(): asyncio.create_task(async_task()) # Runs concurrently print("Main function continues...") await asyncio.sleep(3) # Wait for the task to complete asyncio.run(main())

Await Equivalent - Async Queues for Communication

Python's asyncio.Queue provides a way for coroutines to communicate safely.

import asyncio async def worker(queue): await queue.put("Task completed") async def main(): queue = asyncio.Queue() asyncio.create_task(worker(queue)) msg = await queue.get() # Like await, wait to receive a message print(msg) # Output: Task completed asyncio.run(main())

Buffered vs Unbuffered Queues

Unbuffered queues (default) block until an item is received, while buffered queues allow adding multiple items without an immediate receiver (up to a limit).

import asyncio async def main(): queue = asyncio.Queue(maxsize=2) # Buffered queue with capacity 2 await queue.put(1) # Non-blocking put await queue.put(2) # Non-blocking put print(await queue.get()) # 1 print(await queue.get()) # 2 asyncio.run(main())

External Async Operations

Asynchronous File Processing with asyncio.

import asyncio async def read_lines(file_path): async with aiofiles.open(file_path, mode='r') as file: async for line in file: # iterates without blocking print(line.strip()) asyncio.run(read_lines("example.txt"))

Multi-Threading Support

Python provides multi-threading via asyncio and concurrent.futures, but the asyncio event loop schedules coroutines instead of using OS threads directly.

Example: Running Multiple Coroutines

import asyncio async def worker(id): print(f"Worker {id} started") await asyncio.sleep(1) print(f"Worker {id} finished") async def main(): tasks = [asyncio.create_task(worker(i)) for i in range(5)] await asyncio.gather(*tasks) print("All workers finished") asyncio.run(main())

Handling Multiple Async Tasks

Python provides asyncio.wait() and asyncio.as_completed() to handle multiple coroutines concurrently.

import asyncio async def task1(): await asyncio.sleep(2) return "Task 1 completed" async def task2(): await asyncio.sleep(1) return "Task 2 completed" async def main(): tasks = [task1(), task2()] done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for d in done: print(d.result()) # Prints whichever task finishes first asyncio.run(main())

Timeouts: Avoiding Blocking

To prevent indefinite blocking, use timeouts with asyncio.wait_for().

import asyncio async def slow_task(): await asyncio.sleep(3) return "Slow task finished" async def main(): try: result = await asyncio.wait_for(slow_task(), timeout=2) # Timeout after 2 seconds print(result) except asyncio.TimeoutError: print("Timeout! Task took too long") asyncio.run(main())

Thread Safety and Locks

Python’s asyncio.Lock provides mutual exclusion to prevent race conditions in async code.

import asyncio lock = asyncio.Lock() counter = 0 async def increment(): global counter async with lock: temp = counter await asyncio.sleep(0.1) counter = temp + 1 async def main(): tasks = [asyncio.create_task(increment()) for _ in range(5)] await asyncio.gather(*tasks) print("Final Counter:", counter) asyncio.run(main())

Async Semaphores

asyncio.Semaphore can be used to limit concurrent tasks.

import asyncio async def worker(id, sem): async with sem: print(f"Worker {id} started") await asyncio.sleep(1) print(f"Worker {id} finished") async def main(): sem = asyncio.Semaphore(3) # Limit to 3 concurrent workers tasks = [asyncio.create_task(worker(i, sem)) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())

OS Signal Handling

Python can handle OS signals for graceful shutdown using signal and asyncio.

import asyncio import signal stop_event = asyncio.Event() def shutdown(): print("Signal received. Exiting...") stop_event.set() async def main(): loop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGINT, shutdown) print("Waiting for signal...") await stop_event.wait() asyncio.run(main())