Skip to main content

Quick Start: Your First NPipeline

This quick start guide will walk you through creating a basic "Hello World" pipeline using NPipeline. This example demonstrates the core concepts of defining a source, a transform, and a sink.

Prerequisites

  • NPipeline and its core dependencies installed (see Installation).
  • A .NET project set up (e.g., a Console Application).

Step 1: Define Your Nodes

In NPipeline, a pipeline is composed of interconnected nodes. We'll define three types for our "Hello World":

  • Source: To produce our "Hello World" message.
  • Transform: To modify the message (e.g., convert to uppercase).
  • Sink: To consume and display the final message.

First, let's create a simple console application. If you haven't already, create a new console project:

dotnet new console -n NpipelineHelloWorld
cd NpipelineHelloWorld
dotnet add package NPipeline
dotnet add package NPipeline.Extensions.DependencyInjection

Now, replace the content of your Program.cs file with the following:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NPipeline;
using NPipeline.DataFlow;
using NPipeline.DataFlow.DataPipes;
using NPipeline.Execution;
using NPipeline.Extensions.DependencyInjection;
using NPipeline.Nodes;
using NPipeline.Observability;
using NPipeline.Pipeline;

namespace NpipelineHelloWorld;

// 1. Define a Source Node
// This node will produce a single "Hello World!" string.
public sealed class HelloWorldSource : SourceNode<string>
{
public override IDataPipe<string> ExecuteAsync(
PipelineContext context,
CancellationToken cancellationToken = default)
{
Console.WriteLine("Source: Producing 'Hello World!'");
static IAsyncEnumerable<string> Stream()
{
return GenerateMessage();

async IAsyncEnumerable<string> GenerateMessage()
{
yield return "Hello World!";
}
}

return new StreamingDataPipe<string>(Stream());
}
}

// 2. Define a Transform Node
// This node will convert the incoming string to uppercase.
public sealed class UppercaseTransform : TransformNode<string, string>
{
public override Task<string> ExecuteAsync(
string item,
PipelineContext context,
CancellationToken cancellationToken = default)
{
var result = item.ToUpperInvariant();
Console.WriteLine($"Transform: Transforming '{item}' to '{result}'");
return Task.FromResult(result);
}
}

// 3. Define a Sink Node
// This node will consume the final string and print it to the console.
public sealed class ConsoleSink : SinkNode<string>
{
public override async Task ExecuteAsync(
IDataPipe<string> input,
PipelineContext context,
CancellationToken cancellationToken = default)
{
await foreach (var item in input.WithCancellation(cancellationToken))
{
Console.WriteLine($"Sink: Consuming and displaying: {item}");
}
}
}

// 4. Define the Pipeline
public sealed class HelloWorldPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var sourceHandle = builder.AddSource<HelloWorldSource, string>();
var transformHandle = builder.AddTransform<UppercaseTransform, string, string>();
var sinkHandle = builder.AddSink<ConsoleSink, string>();

builder.Connect(sourceHandle, transformHandle);
builder.Connect(transformHandle, sinkHandle);
}
}

public static class Program
{
public static async Task Main(string[] args)
{
// Set up dependency injection
var services = new ServiceCollection();

// Add NPipeline services
services.AddNPipeline(Assembly.GetExecutingAssembly());

var serviceProvider = services.BuildServiceProvider();

Console.WriteLine("Starting pipeline...");

// Use DI to get the pipeline runner
using var scope = serviceProvider.CreateScope();
var runner = scope.ServiceProvider.GetRequiredService<IPipelineRunner>();

await runner.RunAsync<HelloWorldPipeline>();

Console.WriteLine("Pipeline finished.");
}
}

Step 2: Run the Pipeline

Execute your console application:

dotnet run

Expected Output

You should see output similar to this:

Starting pipeline...
Source: Producing 'Hello World!'
Transform: Transforming 'Hello World!' to 'HELLO WORLD!'
Sink: Consuming and displaying: HELLO WORLD!
Pipeline finished.

This simple example illustrates the fundamental flow of data through an NPipeline: from a source, through a transform, and finally to a sink.

Understanding the Design: Synchronous Pipe Creation + Asynchronous Data Flow

You may have noticed something interesting about our source node:

public override IDataPipe<string> ExecuteAsync(...)  // Notice: Not async!
{
// Returns a pipe synchronously - no await here
return new StreamingDataPipe<string>(Stream());
}

The method is called ExecuteAsync, but it returns synchronously. This is by design, not a mistake! Here's why:

Phase 1 (Synchronous): The source creates a pipe immediately

var pipe = source.ExecuteAsync(context, cancellationToken);  // Returns instantly

Phase 2 (Asynchronous): The sink consumes data asynchronously

await foreach (var item in input.WithCancellation(cancellationToken))  // Async here
{
// Process each item as it arrives
}

Why This Design?

  • Simplicity: Pipe creation is fast and synchronous
  • Type Safety: Direct IDataPipe<T> returns enable better type compatibility
  • Performance: No unnecessary Task allocations
  • Clarity: "ExecuteAsync" signals you're in the async pipeline system, but the pipe is ready to use immediately

Think of it like opening a file: File.OpenRead() is synchronous and returns immediately, but stream.ReadAsync() is asynchronous when you actually read data from it.

Next Steps

  • Core Concepts: Deep dive into the IDataPipe, INode, IPipelineDefinition, and PipelineContext
  • Common Patterns: See practical examples of real-world pipeline implementations
  • Installation: Review the installation options and available extensions