Skip to main content

Parallelism

NPipeline is designed for high performance, and a key aspect of this is its ability to execute parts of your pipeline in parallel. The NPipeline.Extensions.Parallelism package provides tools and extensions to easily introduce parallel processing into your data flows, allowing you to scale out your computations and maximize throughput.

Understanding Parallelism in NPipeline

Parallelism in NPipeline typically means processing multiple data items concurrently, either within a single node or across multiple independent branches of a pipeline. This is distinct from concurrency, which is about managing multiple tasks that may or may not run simultaneously.

NPipeline.Extensions.Parallelism

This extension package provides the ParallelExecutionStrategy class and builder extensions to enable and manage parallel execution.

Example: Parallel Transform

Imagine you have a computationally intensive transformation that can be applied independently to each item. You can use parallel execution to process multiple items simultaneously.

using NPipeline.Extensions.Parallelism;
using NPipeline.Extensions.Testing;
using NPipeline.Execution;
using NPipeline.Nodes;

public sealed class IntensiveTransform : TransformNode<int, int>
{
public override async Task<int> ExecuteAsync(
int item,
PipelineContext context,
CancellationToken cancellationToken)
{
var logger = context.LoggerFactory.CreateLogger("IntensiveTransform");
logger.Log(NPipeline.Observability.Logging.LogLevel.Information,
$"Processing item {item} on Thread {Environment.CurrentManagedThreadId}");
await Task.Delay(100, cancellationToken); // Simulate intensive work
return item * 2;
}
}

public sealed class ParallelPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddInMemorySource<int>();
var transform = builder.AddTransform<IntensiveTransform, int, int>();
var sink = builder.AddInMemorySink<int>();

builder.Connect(source, transform);
builder.Connect(transform, sink);

// Configure parallel execution for the transform
builder.WithParallelOptions(transform,
new ParallelOptions { MaxDegreeOfParallelism = 4 });

// Set the execution strategy to ParallelExecutionStrategy
transform.ExecutionStrategy = new ParallelExecutionStrategy();
}
}

public static class Program
{
public static async Task Main(string[] args)
{
// Set up test data
var testData = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var context = new PipelineContext();
context.SetSourceData(testData);

Console.WriteLine("Starting parallel pipeline...");
var runner = new PipelineRunner();
await runner.RunAsync<ParallelPipeline>(context);
Console.WriteLine("Parallel pipeline finished.");
}
}

In this example, we:

  1. Use WithParallelOptions(transform, new ParallelOptions { MaxDegreeOfParallelism = 4 }) to configure parallel execution options for the transform
  2. Set the ExecutionStrategy property to ParallelExecutionStrategy to enable parallel processing

Non-Ordered Parallel Execution for Maximum Throughput

By default, NPipeline preserves the order of items even when processing them in parallel. While this ensures predictable output, it can introduce overhead that reduces throughput. When the order of output items is not important for your use case, you can configure parallel execution to not preserve order, which can significantly increase throughput.

Example: Non-Ordered Parallel Processing

using NPipeline.Extensions.Parallelism;
using NPipeline.Extensions.Testing;
using NPipeline.Execution;
using NPipeline.Nodes;
using NPipeline.Pipeline;

public sealed class IntensiveTransform : TransformNode<int, int>
{
public override async Task<int> ExecuteAsync(
int item,
PipelineContext context,
CancellationToken cancellationToken)
{
var logger = context.LoggerFactory.CreateLogger("IntensiveTransform");
logger.Log(NPipeline.Observability.Logging.LogLevel.Information,
$"Processing item {item} on Thread {Environment.CurrentManagedThreadId}");
await Task.Delay(new Random().Next(50, 150), cancellationToken); // Simulate variable work
return item * 2;
}
}

public sealed class NonOrderedParallelPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddInMemorySource<int>();
var transform = builder.AddTransform<IntensiveTransform, int, int>();
var sink = builder.AddInMemorySink<int>();

builder.Connect(source, transform);
builder.Connect(transform, sink);

// Configure parallel execution with non-ordered output for maximum throughput
builder.WithParallelOptions(transform,
new ParallelOptions
{
MaxDegreeOfParallelism = 4,
PreserveOrdering = false // Disable ordering to maximize throughput
});

// Set the execution strategy to ParallelExecutionStrategy
transform.ExecutionStrategy = new ParallelExecutionStrategy();
}
}

public static class Program
{
public static async Task Main(string[] args)
{
// Set up test data
var testData = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var context = new PipelineContext();
context.SetSourceData(testData);

Console.WriteLine("Starting non-ordered parallel pipeline...");
var runner = new PipelineRunner();
await runner.RunAsync<NonOrderedParallelPipeline>(context);
Console.WriteLine("Non-ordered parallel pipeline finished.");
}
}

In this example, we explicitly set PreserveOrdering = false in the ParallelOptions. This configuration:

  1. Allows items to be emitted as soon as they are processed, without waiting for slower items
  2. Eliminates the overhead of tracking and reordering items
  3. Can significantly increase throughput, especially when processing times vary widely
  4. Results in output that may not match the input order

When to Use Non-Ordered Execution

Consider using PreserveOrdering = false when:

  • Order is irrelevant: Your downstream processing doesn't depend on the input order
  • Maximum throughput is critical: You need to process as many items as possible per unit of time
  • Processing times vary significantly: Some items take much longer to process than others
  • You're aggregating results: You're collecting statistics or aggregating data where order doesn't matter

Trade-offs

AspectPreserveOrdering: true (Default)PreserveOrdering: false
ThroughputGoodExcellent
Output OrderMatches input orderMay be out of order
Memory UsageHigher (needs to buffer)Lower
LatencyHigher (waits for slow items)Lower (emits immediately)
Use CaseOrder-sensitive processingMaximum throughput scenarios

Advanced Parallel Options

The ParallelOptions class provides additional configuration options for fine-tuning parallel execution:

var options = new ParallelOptions
{
MaxDegreeOfParallelism = 8, // Maximum concurrent operations
MaxQueueLength = 1000, // Bounded input queue for backpressure
QueuePolicy = BoundedQueuePolicy.Block, // Behavior when queue is full
OutputBufferCapacity = 500, // Bounded output buffer
PreserveOrdering = true // Whether to preserve input order
};

Queue Policies

When MaxQueueLength is specified, you can control the behavior when the queue reaches its capacity:

  • BoundedQueuePolicy.Block: Wait until space becomes available (default)
  • BoundedQueuePolicy.DropNewest: Drop the incoming item
  • BoundedQueuePolicy.DropOldest: Remove the oldest item to make space

Considerations for Parallelism

  • Degree of Parallelism: Carefully choose the MaxDegreeOfParallelism. Too high a value can lead to excessive resource consumption (CPU, memory, threads) and diminish returns due to context switching overhead. Too low a value might underutilize available resources.

  • Thread Safety: Ensure that any shared state or external resources accessed by your parallel nodes are thread-safe. If your nodes are pure functions (operating only on their input and producing output without side effects), this is less of a concern.

  • Order Preservation: By default, NPipeline maintains the order of items even when processing them in parallel. If order is not critical and you need maximum throughput, you can configure nodes to not preserve order by setting PreserveOrdering = false.

  • Resource Contention: Be aware of potential bottlenecks when multiple parallel tasks try to access the same limited resource (e.g., a single database connection, a slow API).

  • Debugging: Debugging parallel code can be more complex. Ensure you have good logging and monitoring in place.

Best Practices

  • Identify Parallelizable Work: Apply parallelism to parts of your pipeline where operations are independent and computationally intensive.

  • Start Small: Begin with a low degree of parallelism and incrementally increase it while monitoring performance metrics (CPU, memory, throughput) to find the optimal balance.

  • Profile: Use profiling tools to identify bottlenecks and ensure that parallelism is indeed improving performance.

By strategically applying parallelism, you can significantly boost the processing capabilities of your NPipelines for demanding workloads.

Next Steps