Thread Safety in Parallel Execution
One of the most important aspects of parallel processing is understanding and managing thread safety correctly. NPipeline's parallel execution model is designed to be safe by default, but requires careful attention when accessing shared state.
🚨 CRITICAL THREAD SAFETY TRAP
DO NOT access
context.Itemsorcontext.Parametersduring parallel item processing.These dictionaries are NOT thread-safe. If multiple worker threads try to access them simultaneously, you will get data races that cause:
- Silent data corruption
- Non-deterministic crashes
- Impossible-to-reproduce bugs that only show up under production load
The cost of learning this the hard way in production is enormous.
The Unsafe Pattern (DO NOT USE)
// ❌ WRONG - This is a data race
public class UnsafeMetricsTransform : TransformNode<int, int>
{
public override async ValueTask<int> TransformAsync(
int input,
PipelineContext context,
CancellationToken ct)
{
// PROBLEM: Multiple threads access this without synchronization
var count = context.Items.GetValueOrDefault("processed", 0);
context.Items["processed"] = count + 1; // ← DATA RACE!
// PROBLEM: Multiple threads read/write shared state
if (context.Items.ContainsKey("sum"))
context.Items["sum"] = (int)context.Items["sum"] + input;
return input;
}
}
Why this breaks:
- Thread A reads count = 5
- Thread B reads count = 5 (before A writes)
- Thread A writes count = 6
- Thread B writes count = 6 (overwrites A's write!)
- Expected: count = 7; Actual: count = 6 (lost update)
- This happens randomly under load - impossible to debug in development
The Safe Pattern (RECOMMENDED)
// ✅ CORRECT - Use IPipelineStateManager for shared state
public class SafeMetricsTransform : TransformNode<int, int>
{
public override async ValueTask<int> TransformAsync(
int input,
PipelineContext context,
CancellationToken ct)
{
// SAFE: IPipelineStateManager handles all synchronization
var stateManager = context.StateManager;
if (stateManager != null)
{
// These operations are internally synchronized
await stateManager.IncrementCounterAsync("processed", ct);
await stateManager.IncrementCounterAsync("sum", input, ct);
}
return input;
}
}
Why this is safe:
- State manager uses internal locking for all operations
- All updates are atomic and isolated
- No data races - thread-safe by design
- Performance is optimized with fine-grained locking
Key Principles
Independent Item Processing
Each worker thread processes a different data item. The core processing is inherently thread-safe because workers operate on independent data.
// SAFE: Each thread processes different items independently
public override async ValueTask<TOut> TransformAsync(
TIn input, // Each thread gets a different item
PipelineContext context,
CancellationToken ct)
{
// Safe to process input without synchronization
return await ProcessItemAsync(input, ct);
}
Shared State is NOT Thread-Safe
The PipelineContext dictionaries (Items, Parameters, Properties) are NOT thread-safe. If multiple worker threads need to access or modify shared state, you must use explicit synchronization.
// UNSAFE: Multiple threads accessing context.Items without synchronization
context.Items["counter"] = (int)context.Items.GetValueOrDefault("counter", 0) + 1;
// SAFE: Use IPipelineStateManager for thread-safe shared state
var stateManager = context.StateManager;
if (stateManager != null)
{
await stateManager.IncrementCounterAsync("counter", ct);
}
Three Approaches to Shared State
See Thread Safety Guidelines for comprehensive guidance, but here's a quick summary for parallel scenarios:
1. IPipelineStateManager (Recommended)
For complex shared state that needs coordination across parallel workers:
public override async ValueTask<TOut> TransformAsync(
TIn input,
PipelineContext context,
CancellationToken ct)
{
var result = ProcessItem(input);
// Thread-safe state update via state manager
var stateManager = context.StateManager;
if (stateManager != null)
{
await stateManager.RecordMetricAsync("items_processed", 1, ct);
}
return result;
}
2. Node-Level Synchronization
For simple synchronization within a single node:
public class SynchronizedTransform : TransformNode<int, int>
{
private readonly object _syncLock = new();
private int _total = 0;
public override async ValueTask<int> TransformAsync(
int input,
PipelineContext context,
CancellationToken ct)
{
lock (_syncLock)
{
_total += input;
}
return input;
}
}
3. Atomic Operations for Simple Counters
For single-value counters without additional logic:
public class CountingTransform : TransformNode<int, int>
{
private long _processedCount = 0;
public override async ValueTask<int> TransformAsync(
int input,
PipelineContext context,
CancellationToken ct)
{
Interlocked.Increment(ref _processedCount);
return input;
}
}
Thread Safety DO's
- Process independent data items in parallel (inherently safe)
- Use
IPipelineStateManagerfor shared state - Use
lockfor simple critical sections - Use
Interlockedfor atomic counter operations - Keep synchronization scopes small and fast
Thread Safety DON'Ts
- Directly access or modify
context.Itemsfrom multiple threads - Share mutable state between nodes without explicit synchronization
- Assume dictionaries in
PipelineContextare thread-safe - Hold locks across I/O operations (causes contention)
- Create complex multi-step interlocked sequences (use locks instead)
For comprehensive guidance, see Thread Safety Guidelines.