Filtering Nodes
Filtering nodes selectively pass through or reject items based on predicates. When an item fails a filter, a FilteringException is thrown with details about why the item was filtered.
Basic Filtering
Filter items using a predicate:
// Single filter
builder.AddFiltering<Order>(x => x.Status == OrderStatus.Pending);
// Multiple filters
builder.AddFiltering<Product>()
.Where(x => x.Price > 0)
.Where(x => x.IsActive);
Constructor with Predicate
Specify the filter predicate in the constructor:
var activeOrders = builder.AddFiltering<Order>(x => x.Status == OrderStatus.Active);
var expensiveProducts = builder.AddFiltering<Product>(
x => x.Price > 100,
reason: "Price must exceed $100");
Fluent Where Syntax
Chain multiple filter conditions:
builder.AddFiltering<Order>()
.Where(x => x.Amount > 100, "Order amount must exceed $100")
.Where(x => x.Status != OrderStatus.Cancelled, "Order must not be cancelled")
.Where(x => x.CreatedDate >= DateTime.Today, "Order must be from today");
Fixed Message Where
Use a string instead of a factory for simple messages:
// When you have a fixed message, this is cleaner than a factory
builder.AddFiltering<Order>()
.Where(x => x.Amount > 100, "Order amount must exceed $100")
.Where(x => x.Status != OrderStatus.Cancelled, "Order must not be cancelled");
Complex Predicates
Use complex conditional logic:
// Complex condition
builder.AddFiltering<Person>(x =>
(x.Age >= 18 && x.Status == "Active") ||
x.IsVip == true);
// With custom message
builder.AddFiltering<Order>(
x => x.Amount > 1000 && x.Customer.CreditScore > 700,
reason: "Order amount must exceed $1000 and customer credit score must be above 700");
// Multiple conditions
builder.AddFiltering<Document>()
.Where(x => x.IsPublished, "Document must be published")
.Where(x => x.Status == "Approved", "Document must be approved")
.Where(x => !x.IsArchived, "Document must not be archived");
Filtering with Collections
Filter based on collection properties:
// Item exists in collection
builder.AddFiltering<Order>(x => x.Items.Count > 0, "Order must contain at least one item");
// Collection contains specific value
builder.AddFiltering<Category>(x => x.Tags.Contains("Featured"), "Category must be tagged as Featured");
// All items satisfy condition
builder.AddFiltering<Order>(x => x.Items.All(i => i.Quantity > 0), "All items must have positive quantity");
// Any item satisfies condition
builder.AddFiltering<Product>(x => x.Variants.Any(v => v.IsAvailable), "Product must have at least one available variant");
Custom Error Messages
Provide descriptive messages when items are filtered:
// Constructor
var filter = builder.AddFiltering<Order>(
x => x.Amount > 0,
reason: "Order amount must be greater than zero");
// With Where method
builder.AddFiltering<Product>()
.Where(x => x.Price > 0, "Price must be positive")
.Where(x => x.StockLevel > 0, "Stock level must be positive");
Default Messages
If no custom message is provided, a default message is used:
// Default: "Item did not meet filtering criteria"
builder.AddFiltering<Order>(x => x.Amount > 0);
Error Handling
Filtered items raise FilteringException:
try
{
await pipeline.ExecuteAsync();
}
catch (FilteringException ex)
{
Console.WriteLine($"Reason: {ex.Reason}");
Console.WriteLine($"Message: {ex.Message}");
}
Node Error Decisions
Control what happens when an item is filtered:
// Skip filtered items (default - no exception)
builder.WithErrorDecision(filterHandle, NodeErrorDecision.Skip);
// Throw exception on first filtered item
builder.WithErrorDecision(filterHandle, NodeErrorDecision.Fail);
// Retry with different criteria
builder.WithErrorDecision(filterHandle, NodeErrorDecision.Retry);
Filtering Pipeline Example
var builder = new PipelineBuilder();
// Define source
var source = builder.AddInMemorySource<Order>();
// Filter 1: Only pending orders
var filterPending = builder.AddFiltering<Order>(
x => x.Status == OrderStatus.Pending,
reason: "Order must be pending");
// Filter 2: Amount constraints
var filterAmount = builder.AddFiltering<Order>()
.Where(x => x.Amount > 50, "Order amount must exceed $50")
.Where(x => x.Amount <= 10000, "Order amount cannot exceed $10,000");
// Filter 3: Customer validation
var filterCustomer = builder.AddFiltering<Order>(
x => x.Customer != null && !x.Customer.IsBlocked,
reason: "Customer must exist and not be blocked");
// Define sink
var sink = builder.AddInMemorySink<Order>();
// Connect filters in sequence
builder.Connect(source, filterPending);
builder.Connect(filterPending, filterAmount);
builder.Connect(filterAmount, filterCustomer);
builder.Connect(filterCustomer, sink);
// Build and execute
var pipeline = builder.Build();
var result = await pipeline.ExecuteAsync();
Performance Characteristics
- Predicate Evaluation: Performed inline on each item (no compilation)
- Exception Creation:
FilteringExceptionis created only when item fails - Memory: No additional allocations for filter state
- Thread Safety: Stateless and thread-safe for parallel execution
Common Patterns
Pre-filter Before Expensive Operations
var builder = new PipelineBuilder();
// Fast filter first
var quickFilter = builder.AddFiltering<Product>(x => x.Price > 0);
// Expensive operation after filtering
var enrichment = builder.AddEnrichment<Product>(x => x.Details = FetchExpensiveDetails());
builder.Connect(quickFilter, enrichment);
Multi-stage Filtering
// Filter stage 1
var stage1 = builder.AddFiltering<Item>(x => x.Status == "Active");
// Filter stage 2
var stage2 = builder.AddFiltering<Item>(x => x.Score >= 5)
.Where(x => x.IsApproved);
// Filter stage 3
var stage3 = builder.AddFiltering<Item>(x => !x.IsExpired);
builder.Connect(stage1, stage2);
builder.Connect(stage2, stage3);
Conditional Filtering
// Only filter if condition met
bool applyStrictFiltering = context.GetSetting("StrictMode");
if (applyStrictFiltering)
{
builder.AddFiltering<Product>(x => x.Quality >= 8);
}
else
{
builder.AddFiltering<Product>(x => x.Quality >= 5);
}
Combining with Other Nodes
var builder = new PipelineBuilder();
// Cleanse first
var cleanse = builder.AddStringCleansing<Product>(x => x.Name)
.Trim()
.ToLower();
// Validate
var validate = builder.AddStringValidation<Product>(x => x.Name)
.HasMinLength(3);
// Filter
var filter = builder.AddFiltering<Product>(x => x.IsActive);
// Connect in order
builder.Connect(cleanse, validate);
builder.Connect(validate, filter);
Testing Filtered Pipelines
[Fact]
public async Task FilterPipeline_ShouldFilterInactiveItems()
{
// Arrange
var items = new[] {
new Order { Id = 1, IsActive = true },
new Order { Id = 2, IsActive = false },
new Order { Id = 3, IsActive = true }
};
var source = new InMemorySourceNode<Order>(items);
var filter = new FilteringNode<Order>(x => x.IsActive);
var sink = new InMemorySinkNode<Order>();
// Act
await filter.ExecuteAsync(source, context);
await sink.ExecuteAsync(filter, context);
// Assert
sink.Items.Should().HaveCount(2);
sink.Items.All(x => x.IsActive).Should().BeTrue();
}