Before learning about Race conditions, deadlocks, synchronisation, Pools, etc., please check out these articles for a better understanding of various things related to multiprocessing in Python:
- Python Multiprocessing: Start Methods, Pools, and Communication.
- Inter Process Communication in Python Multiprocessing (With Examples).
Race Conditions in Multiprocessing
Even though processes do not share memory (by default), they can still share synchronised objects (Value
, Array
, Manager
, shared_memory
). When multiple processes update the same shared object at the same time, they can still get race conditions.
Example (race condition with Value
):
from multiprocessing import Process, Value
import time
def increment(counter):
for _ in range(100_000):
counter.value += 1 # NOT protected!
if __name__ == "__main__":
counter = Value('i', 0)
p1 = Process(target=increment, args=(counter,))
p2 = Process(target=increment, args=(counter,))
p1.start(); p2.start()
p1.join(); p2.join()
print("Final counter:", counter.value)
Output:
Final counter: 100200
Even in multiprocessing, there is a need for locks.
Example:
from multiprocessing import Process, Value, Lock
def increment(counter, lock):
for _ in range(100_000):
with lock:
counter.value += 1
if __name__ == "__main__":
lock = Lock()
counter = Value('i', 0)
p1 = Process(target=increment, args=(counter, lock))
p2 = Process(target=increment, args=(counter, lock))
p1.start(); p2.start()
p1.join(); p2.join()
print("Final counter:", counter.value) # Always 200000
Output:
Final counter: 200000
Where race conditions can occur in multiprocessing
Race conditions happen only when multiple processes directly share mutable state and update it concurrently without synchronisation. That means race conditions can occur with:
-
Value
: shared single variable (mutable, updates are not atomic). -
Array
: shared array (multiple processes can update elements). -
shared_memory.SharedMemory
ā raw block of memory, fully control reads/writes (easy to corrupt without locks). - Manager objects (
manager.list
,manager.dict
, etc.): proxied objects, but updates go through a server process. They are safer, but still not atomic (two processes could overwrite each otherās updates if not careful).
Where race conditions do not occur (by default) in multiprocessing
Race conditions do not occur when processes do not share memory, i.e., they only exchange messages:
-
Queue
: built-in locking ensuresput()
andget()
are thread- and process-safe. -
Pipe
: message-passing, no shared state. - Normal data types (
int
,float
,list
,dict
inside a process): each process has its own copy (afterfork
/spawn
). No races, because nothing is shared.
Deadlocks in Multiprocessing
Deadlocks occur when processes are waiting on resources forever. Some common causes:
- Forgetting to release a lock: A process acquires a lock but never releases so others are blocked forever.
- Improper use of
Queue
/Pipe
:- If a process does
.get()
from aQueue
thatās empty and the producer never sends data, so consumer is blocked forever. - If a process is joined using
join()
but it is itself waiting on (circular wait); so, then deadlock.
- If a process does
Example (Deadlock with Queue
):
from multiprocessing import Process, Queue
def consumer(q):
print("Waiting for item...")
item = q.get() # blocks forever if no item
print("Got:", item)
if __name__ == "__main__":
q = Queue()
p = Process(target=consumer, args=(q,))
p.start()
p.join() # Deadlock: child is waiting, parent is waiting
To fix this, sentinel values or timeouts can be used.
Example:
item = q.get(timeout=5) # raises queue.Empty if no item in 5s
Where deadlocks can happen
-
Lock
/RLock
- If a process acquires a lock and never releases it so other processes block forever.
- Circular wait (Process
A
waits onlock X
, ProcessB
waits onlock Y
, each holding the otherās lock).
-
Queue
- If a consumer does
.get()
but no producer sends; so, blocked forever. - If parent calls
.join()
while child is waiting on.get()
with no item; so, deadlock.
- If a consumer does
-
Pipe
- If one process is blocked on
recv()
but the other side never sends. - Or, both ends are waiting on each otherās
recv()
.
- If one process is blocked on
-
Value
/Array
(when combined with locks)- The shared memory itself does not deadlock, but if a lock is wrapped, forgetting to release or circular lock dependencies creates a deadlock.
-
Manager
objects (list
,dict
,Namespace
, etc.)- Internally use a server process and proxies. If a process blocks waiting for a response (like a slow operation or circular access), deadlock can occur.
-
shared_memory
(Python 3.8+)- The raw shared block itself will not deadlock, but if multiple processes coordinate via locks/semaphores on top. Same risks as
Value
/Array
.
- The raw shared block itself will not deadlock, but if multiple processes coordinate via locks/semaphores on top. Same risks as
Where deadlocks generally do not occur
- Normal process-local variables (for example,
int
,list
,dict
in separate processes); so, no sharing; so, no deadlock. - Message-passing with
Queue
/Pipe
when used correctly (for example, always send sentinel values, use timeout); avoids deadlock.
Debugging multiprocessing
Debugging multiprocessing is harder than threading because:
-Processes do not share memory, so print statements do not always flush.
- Exceptions inside a child process do not crash the parent process.
- Pickling errors often show up only when the process is started.
Tips
- Always guard
if __name__ == "__main__":
. Without this, child processes can re-import the main module and start new processes endlessly (common on Windows/macOS withspawn
). -
Flush logs/prints:
print("Message", flush=True)
-
Use the
logging
module instead ofprint
. Each process can log to a file or queue for centralised debugging. -
Check exit codes:
p.exitcode # None=still running, 0=success, >0=error
-
Use timeouts on
join()
,Queue.get()
, etc. to prevent infinite blocking. -
Set
daemon=False
. Daemon processes are killed abruptly without cleanup, so, hard to debug.
Synchronisation in Multiprocessing
When multiple processes access shared resources (like Value
, Array
, Manager
objects, or shared_memory
), synchronisation tools are needed to avoid race conditions and deadlocks.
Pythonās multiprocessing
module provides several primitives:
1.Lock
- Simplest synchronisation primitive.
- Only one process can acquire the lock at a time.
- Used to protect critical sections.
Example:
from multiprocessing import Process, Lock, Value
def increment(counter, lock):
for _ in range(100_000):
with lock: # critical section
counter.value += 1
if __name__ == "__main__":
lock = Lock()
counter = Value('i', 0)
p1 = Process(target=increment, args=(counter, lock))
p2 = Process(target=increment, args=(counter, lock))
p1.start(); p2.start()
p1.join(); p2.join()
print("Final counter:", counter.value)
Output:
Final counter: 200000
Important functions
-
lock.acquire([timeout])
: Block until the lock is acquired. -
lock.release()
: Release the lock. -
with lock:
: Context manager form (preferred).
2. RLock
(Reentrant Lock)
- Like
Lock
, but the same process can acquire it multiple times. - Useful when functions that already hold a lock call another function that also acquires the same lock.
Example:
from multiprocessing import RLock, Process
def nested(lock):
with lock:
print("First acquire")
with lock: # same process can acquire again
print("Second acquire")
if __name__ == "__main__":
lock = RLock()
p = Process(target=nested, args=(lock,))
p.start(); p.join()
Output:
First acquire
Second acquire
3. Semaphore
- Allows a limited number of processes to access a resource at once (not just one).
- Has a counter that decrements on acquire and increments on release.
- For example, limit database connections to 5 at a time.
Example:
from multiprocessing import Semaphore, Process
import time, random
def worker(sem, i):
with sem: # only 2 workers at a time
print(f"Worker {i} starting")
time.sleep(random.uniform(1, 2))
print(f"Worker {i} done")
if __name__ == "__main__":
sem = Semaphore(2) # allow 2 concurrent
processes = [Process(target=worker, args=(sem, i)) for i in range(5)]
for p in processes: p.start()
for p in processes: p.join()
Output:
Worker 0 starting
Worker 4 starting
Worker 4 done
Worker 1 starting
Worker 0 done
Worker 2 starting
Worker 1 done
Worker 3 starting
Worker 2 done
Worker 3 done
Important functions
-
sem.acquire([timeout])
: Decrement counter (block if 0). -
sem.release()
: Increment counter (free one slot).
4. BoundedSemaphore
Same as Semaphore
, but prevents releasing more times than acquired (avoids counter overflow).
Example:
from multiprocessing import BoundedSemaphore, Process
import time
def worker(sem, i):
sem.acquire()
print(f"Worker {i} entered")
time.sleep(1)
sem.release()
if __name__ == "__main__":
sem = BoundedSemaphore(2)
processes = [Process(target=worker, args=(sem, i)) for i in range(4)]
for p in processes: p.start()
for p in processes: p.join()
Output:
Worker 1 entered
Worker 3 entered
Worker 2 entered
Worker 0 entered
5. Event
- A simple flag for communication between processes.
- One process sets the event, others can wait until itās set.
Example:
from multiprocessing import Event, Process
import time
def waiter(e):
print("Waiting for event...")
e.wait() # blocks until event.set()
print("Event received!")
if __name__ == "__main__":
e = Event()
p = Process(target=waiter, args=(e,))
p.start()
time.sleep(2)
print("Main: setting event")
e.set()
p.join()
Output:
Waiting for event...
Main: setting event
Event received!
Important functions
-
event.set()
: Set flag (wake up waiting processes). -
event.clear()
: Clear flag. -
event.is_set()
: Check if flag is set. -
event.wait([timeout])
: Block until flag is set.
6. Condition
(optional, but good to know)
- More advanced synchronisation primitive.
- Let processes wait until some condition is met.
- Usually used with a
Lock
.
Example:
from multiprocessing import Condition, Process
import time
def consumer(cond):
with cond:
print("Consumer waiting...")
cond.wait() # wait until notified
print("Consumer proceeding")
def producer(cond):
time.sleep(2)
with cond:
print("Producer notifies")
cond.notify()
if __name__ == "__main__":
cond = Condition()
c = Process(target=consumer, args=(cond,))
p = Process(target=producer, args=(cond,))
c.start(); p.start()
c.join(); p.join()
Output:
Consumer waiting...
Producer notifies
Consumer proceeding
Important functions
-
cond.wait()
: Wait until notified. -
cond.notify()
: Wake up one waiting process. -
cond.notify_all()
: Wake up all waiting processes.
Most commonly used in multiprocessing
-
Lock
: for critical sections. -
Semaphore
: for limiting resources. -
Event
: for signalling.
Pool of Workers
When there are many tasks and they have to run in parallel across processes, usually manual process creation via Process
objects is not preferred. Instead, using a pool of worker processes that execute tasks is preferred.
Two main options in Python:
multiprocessing.Pool
concurrent.futures.ProcessPoolExecutor
1. multiprocessing.Pool
- Part of the
multiprocessing
module. - Provides functions like
map
,imap
,apply
, andapply_async
. - Interact with the pool directly.
Example:
from multiprocessing import Pool
import os
def square(x):
print(f"Worker {os.getpid()} processing {x}")
return x * x
if __name__ == "__main__":
with Pool(4) as pool: # 4 worker processes
results = pool.map(square, [1, 2, 3, 4, 5])
print("Results:", results)
Output:
Worker 12345 processing 1
Worker 12346 processing 2
Worker 12347 processing 3
Worker 12345 processing 4
Worker 12346 processing 5
Results: [1, 4, 9, 16, 25]
2. concurrent.futures.ProcessPoolExecutor
It is a part of concurrent.futures
(introduced in Python 3.2). It is a higher-level, more modern API than Pool
. It uses submit()
(returns a Future
) and map()
(similar to built-in map). It is often considered easier to use.
Example:
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
def cube(x):
print(f"Worker {os.getpid()} processing {x}")
return x ** 3
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(cube, i) for i in range(1, 6)]
for f in as_completed(futures): # results as they finish
print("Got result:", f.result())
Output:
Worker 22341 processing 1
Worker 22342 processing 2
Worker 22343 processing 3
Worker 22341 processing 4
Worker 22342 processing 5
Got result: 1
Got result: 27
Got result: 8
Got result: 64
Got result: 125
Comparison
Feature | Pool |
ProcessPoolExecutor |
---|---|---|
API Style | Function-based (map , apply ) |
Future-based (submit , map , as_completed ) |
Futures Support | No (uses callbacks) | Yes (with .result() , .done() , .add_done_callback() ) |
Ease of Use | Older, lower-level | Modern, more Pythonic |
Cancellation | Not supported |
.cancel() possible |
Preferred For | Compatibility with older code | New projects (recommended) |
Why is using concurrent.futures
better than using multithreading
and multiprocessing
?
The concurrent.futures
module is like a modern wrapper around both threading
and multiprocessing
. It does not replace them; it simplifies and unifies their usage.
Why is it better?
1. Unified API for threads and processes
With concurrent.futures
, the same API (Executor
, submit
, map
, Future
) can be used, whether running threads (ThreadPoolExecutor
) or processes (ProcessPoolExecutor
).
Example (Same API, different executors):
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def work(x):
return x*x
# Thread pool
with ThreadPoolExecutor(max_workers=3) as executor:
print(list(executor.map(work, range(5))))
# Process pool
with ProcessPoolExecutor(max_workers=3) as executor:
print(list(executor.map(work, range(5))))
2. Future-based API
Both Thread
and Process
normally require join()
to wait for results. concurrent.futures
gives a Future object which:
- Represents the running task
- Allows checking
.done()
,.result()
,.cancel()
,.add_done_callback()
. - This is much more flexible.
3. Easier error handling
- In raw
multiprocessing
, if a child crashes, the error may not appear immediately. - With
futures
, calling.result()
will re-raise exceptions from the worker in the main process, making debugging easier.
4. Cleaner code, less boilerplate
- Without it:
- manually start processes, pass arguments, handle
Queue/Pipe
, join them, etc.
- manually start processes, pass arguments, handle
- With it:
- Just call
executor.submit(func, arg)
and wait for the result. - It feels much closer to normal function calls.
- Just call
Cancel and timeout support
- Can
.cancel()
pending tasks (if not started yet). - Can
.result(timeout=5)
to wait only 5 seconds, otherwise getTimeoutError
. - Raw
multiprocessing
does not give this out of the box.
Comparison
Aspect |
threading / multiprocessing
|
concurrent.futures |
---|---|---|
API | Low-level, different for threads vs processes | Unified & high-level |
Result Handling | Need Queues/pipes or shared memory | Futures (.result() , .done() ) |
Exceptions | Harder to propagate | Propagates to main process |
Task Management | Manual start() , join()
|
Automatic via Executor
|
Cancellation | Not built-in |
.cancel() supported |
Best For | Fine control, special cases | Clean, modern, most everyday use |