Data processing in .NET shouldn't feel like this
Memory nightmares
Your pipeline loads everything into memory, then crashes at 2 AM when someone uploads a file that's slightly larger than usual. You've added more RAM twice this year.
Spaghetti transforms
Your data processing logic started simple. Now it's 2,000 lines of nested loops and conditional statements that nobody wants to touch - or test.
Silent failures
One malformed record takes down your entire batch. You've wrapped everything in try-catch blocks, but errors still slip through to production.
A better way to process data
NPipeline gives you a graph-based architecture where data flows through discrete, testable nodes. Each piece does one thing well. Complexity emerges from composition, not accumulation.
Sources
Where data enters your pipeline. Read from files, databases, APIs, or message queues. Sources produce streams of typed items that flow downstream.
Transforms
Where data gets processed. Validate, enrich, filter, aggregate, or reshape your data. Each transform is a focused, single-responsibility component.
Sinks
Where data lands. Write to databases, send to APIs, or stream to files. Sinks consume the processed data and handle final delivery.
The Graph
Connect nodes to form a directed acyclic graph. See exactly how data flows through your system. Debug by tracing the path, not by hunting through nested loops.
Process more data than fits in memory
NPipeline is streaming-first. Data flows through your pipeline item by item, so memory usage stays constant regardless of dataset size. Process a million records or a billion - your memory footprint stays the same.
Real numbers, real impact
1M records, 500 bytes each
Eager loading: ~500 MB peak memory
NPipeline streaming: ~1-2 MB peak memory
Sub-millisecond first item
Start processing immediately. Don't wait for your entire dataset to load before seeing results.
Predictable GC behavior
No surprise pauses. Memory usage scales with your pipeline's complexity, not your data volume.
Zero-allocation fast paths for high-throughput scenarios
NPipeline uses ValueTask<T> to eliminate heap allocations for synchronous operations. Cache hits, validation checks, simple calculations - they all run without touching the heap.
100,000 items/second, 90% cache hits
That's 90,000 Task allocations eliminated per second. Your GC pressure drops by up to 90%. Your P99 latency becomes predictable.
Plan-based execution
NPipeline compiles your pipeline structure once. During execution, there's no reflection, no per-item routing decisions - just direct method dispatch.
Fast path
Synchronous result available? Stack allocation, zero GC pressure.
Slow path
I/O required? Seamlessly transitions to true async.
Same code, both paths
Write it once. NPipeline handles the optimization.
Built for the real world, where things fail
Production pipelines encounter bad data, network blips, and overwhelmed dependencies. NPipeline gives you the tools to handle failure gracefully—without bringing down your entire system.
Retry policies
Transient failures get automatic retries with configurable backoff. Persistent failures trigger node restarts or route items to dead-letter queues.
Circuit breakers
Protect downstream systems from cascading failures. When a dependency is struggling, stop hammering it and give it time to recover.
Granular error handling
Handle errors at the item level or the stream level. One bad record doesn't have to poison your entire batch.
Built for these problems
ETL workflows
Extract from databases, APIs, and files. Transform with validation and enrichment. Load to your destination. All with clear, testable code.
Real-time streaming
Process data as it arrives from message queues, webhooks, or IoT devices. Sub-millisecond latency to first item processed.
Data validation
Implement complex validation rules as discrete, testable transforms. Route invalid items to review queues without stopping the pipeline.
Batch processing
Process millions of historical records without running out of memory. Streaming architecture means predictable resource usage.
Event-driven systems
React to events with complex processing logic. Fan out to multiple sinks. Handle backpressure gracefully.
Microservice integration
Transform data between services with different schemas. Enrich with data from multiple sources. Maintain type safety across boundaries.
Modular by design
Start with the core library. Add extensions as you need them.
Composition
Create hierarchical, modular pipelines by treating entire pipelines as reusable transform nodes. Break complex workflows into smaller, well-tested building blocks.
Connectors
Pre-built sources and sinks for common targets. CSV files, storage providers, and more. Unified abstraction layer.
Dependency Injection
Seamlessly integrate with your favorite DI container. Constructor injection in nodes. Proper service lifetimes with RunPipelineAsync extension method.
Lineage
Comprehensive data lineage tracking and provenance capabilities. Track the complete journey of each data item from source to destination for governance and debugging.
Nodes
Pre-built, production-ready nodes for common data processing operations. String, numeric, and datetime cleansing, validation, filtering, and type conversion.
Observability
Comprehensive metrics collection and monitoring capabilities. Track node and pipeline performance, throughput, memory usage, retries, and errors.
Parallelism
Execute pipeline nodes in parallel for improved performance. Configurable concurrency limits, queue policies, and ordering behavior for CPU-bound transforms.
Testing
Comprehensive utilities and helpers for writing efficient tests. In-memory source and sink nodes, pipeline builder extensions, and assertion helpers.
Code that reads like a diagram
NPipeline's fluent API makes your pipeline structure visible in your code. The compiler enforces type safety between nodes—if it compiles, it connects.
public void Define(PipelineBuilder builder, PipelineContext context)
{
// Define your nodes
var source = builder.AddSource<OrderSource, Order>();
var validate = builder.AddTransform<ValidateOrder, Order, Order>();
var enrich = builder.AddTransform<EnrichWithCustomer, Order, EnrichedOrder>();
var sink = builder.AddSink<DatabaseSink, EnrichedOrder>();
// Connect the graph — types must match
builder.Connect(source, validate);
builder.Connect(validate, enrich);
builder.Connect(enrich, sink);
// Add resilience
builder.WithRetryOptions(new PipelineRetryOptions(
MaxItemRetries: 3,
MaxNodeRestartAttempts: 2
));
}Each node is a single class with a single responsibility. Test them in isolation. Compose them into complex workflows.
Designed for testing from day one
Every node is a standalone class. Test your transforms with simple unit tests — no mocking of pipeline infrastructure required.
Isolated nodes
Test each node independently. Pass in test data, assert on outputs. No pipeline ceremony required.
In-memory testing
Use the testing extensions to run entire pipelines in memory. Verify end-to-end behavior without external dependencies.
Assertion libraries
First-class support for FluentAssertions and AwesomeAssertions. Write expressive tests that read like specifications.
Ready to build better pipelines?
Get started in minutes. Build your first pipeline in 15.
dotnet add package NPipeline