Connectors Overview
Connectors Overview
Connectors are pre-built nodes that make it easy to read data from and write data to external systems. They are specialized ISourceNode and ISinkNode implementations that handle the specifics of communicating with systems like databases, file formats, message queues, and cloud services.
Using connectors, you can quickly assemble pipelines that integrate with your existing infrastructure without having to write boilerplate code for file I/O or network communication.
Core Concepts
Storage Abstraction
All connectors work through the IStorageProvider abstraction from the NPipeline.StorageProviders project, which enables them to work with multiple backend systems:
- Storage Provider Interface - Learn about the abstraction layer that powers connectors
- Works with filesystems, cloud storage (S3, Azure), databases, and custom backends
- Unified API for read, write, delete, list, and metadata operations
- Built-in support for filesystem with resilient directory traversal
Note: Connectors depend on
NPipeline.StorageProvidersfor storage abstractions. The storage provider interfaces and implementations have been extracted fromNPipeline.Connectorsinto a separateNPipeline.StorageProvidersproject.
Common Attributes
All connectors support common attributes from NPipeline.Connectors.Attributes that provide a unified way to map properties across different data sources:
[Column]: Specify column names and control property mapping[IgnoreColumn]: Exclude properties from mapping
These common attributes work across all connectors (CSV, Excel, PostgreSQL, SQL Server, etc.) and are recommended for new code. Each connector also provides connector-specific attributes for backward compatibility and advanced features. See individual connector documentation for details and examples.
Available Connectors
The following connectors are available:
- CSV: Read from and write to Comma-Separated Values (CSV) files.
- Works with any storage backend via the
IStorageProviderabstraction fromNPipeline.StorageProviders
- Works with any storage backend via the
- Excel: Read from and write to Excel files (XLS and XLSX formats).
- Supports both legacy XLS (binary) and modern XLSX (Open XML) formats
- Configurable sheet selection, header handling, and type detection
- Works with any storage backend via the
IStorageProviderabstraction fromNPipeline.StorageProviders
- JSON: Read from and write to JSON files (Array and NDJSON formats).
- Supports both JSON array and newline-delimited JSON (NDJSON) formats
- Configurable property naming policies, indentation, and error handling
- Uses System.Text.Json for efficient streaming with minimal dependencies
- Works with any storage backend via the
IStorageProviderabstraction fromNPipeline.StorageProviders
- PostgreSQL: Read from and write to PostgreSQL databases.
- Supports streaming reads, per-row and batched writes, and in-memory checkpointing
- Uses Npgsql library for reliable database operations
- SQL Server: Read from and write to Microsoft SQL Server databases.
- Supports streaming reads, per-row and batched writes, and in-memory checkpointing
- Uses Microsoft.Data.SqlClient for reliable database operations
- Supports Windows Authentication and SQL Server Authentication
- Snowflake: Read from and write to Snowflake cloud data warehouses.
- Supports streaming reads, per-row writes, batched writes, and bulk loading via PUT + COPY INTO
- Three write strategies: PerRow, Batch, and StagedCopy for optimal throughput
- MERGE-based upsert support with configurable merge actions
- Uses Snowflake.Data official ADO.NET driver for reliable operations
- Supports password and key-pair (JWT) authentication
- Azure Cosmos DB: Read from and write to Azure Cosmos DB databases.
- Supports SQL API with parameterized queries and change feed streaming
- Multi-API support for Mongo and Cassandra APIs
- Multiple write strategies (PerRow, Batch, TransactionalBatch, Bulk)
- Azure AD authentication and connection pooling
- Uses Azure.Cosmos SDK for reliable operations
- AWS SQS: Read from and write to Amazon Simple Queue Service (SQS).
- Supports multiple acknowledgment strategies (AutoOnSinkSuccess, Manual, Delayed, None)
- Includes batch acknowledgment for performance optimization
- Configurable long polling, parallel processing, and retry logic
- Uses AWSSDK.SQS for reliable SQS operations
- Kafka: Read from and write to Apache Kafka topics.
- Supports multiple delivery semantics (at-least-once, exactly-once)
- Configurable batching, retry strategies, and error handling
- Multiple serialization formats (JSON, Avro, Protobuf)
- Transaction support with proper offset management
- Uses Confluent.Kafka for reliable Kafka operations
- RabbitMQ: Read from and write to RabbitMQ message queues.
- Push-based consumers with backpressure and configurable prefetch
- Publisher confirms and automatic topology declaration
- Dead-letter handling at both broker and pipeline levels
- Support for Classic, Quorum, and Stream queue types
- Uses RabbitMQ.Client 7.x for fully asynchronous operations
- Parquet: Read from and write to Apache Parquet files.
- Columnar storage optimised for analytical workloads
- Row-group streaming with bounded memory usage
- Configurable compression (Snappy, Gzip, None), column projection, and parallel reads
- Attribute-based or explicit row mapping
- Schema evolution via
SchemaCompatibilityMode(Strict, Additive, NameOnly) - Atomic writes and observability hooks via
IParquetConnectorObserver - Works with any storage backend via
IStorageProvider
- Data Lake: Write and read partitioned Parquet tables with snapshot tracking.
- Hive-style partitioning (
column=value/directories) compatible with Spark, Athena, Trino, DuckDB - NDJSON manifest with per-snapshot file inventory for auditability
- Time travel — query table state as of any timestamp or snapshot ID
- Small-file compaction to optimise query engine performance
- Format adapter interface for Iceberg, Delta Lake, or custom table formats
- Built on
NPipeline.Connectors.Parquet
- Hive-style partitioning (
- DuckDB: Read from and write to DuckDB databases and query files directly.
- In-process analytical database — zero config, no separate server
- Direct queries on Parquet, CSV, and JSON files (local or S3)
- High-performance Appender API for bulk inserts
- Auto-create tables from CLR types with
[DuckDBColumn]attributes - Export pipeline data to Parquet/CSV via COPY TO
- Dependency injection support with named databases
General Usage Pattern
Most source connectors are added to a pipeline using AddSource() and sink connectors are added using AddSink().
When you need to pass configuration (file path, resolver, etc.), instantiate the connector and register it with the builder using the overloads that accept a preconfigured node instance. These helpers automatically call AddPreconfiguredNodeInstance() and track disposal for you.
// Example of using a source and sink connector
var pipeline = new PipelineBuilder()
// Read data from a source connector
.AddSource(new CsvSourceNode<User>(
StorageUri.FromFilePath("users.csv"),
row => new User(
row.Get<int>("Id") ?? 0,
row.Get<string>("Name") ?? string.Empty,
row.Get<string>("Email") ?? string.Empty)), "user_source")
// ... add transforms ...
// Write data to a sink connector
.AddSink(new CsvSinkNode<UserSummary>(StorageUri.FromFilePath("summaries.csv")), "summary_sink")
.Build();
Note: NPipeline uses a storage abstraction layer from
NPipeline.StorageProvidersthat requiresStorageUriobjects instead of plain file paths. UseStorageUri.FromFilePath()for local files orStorageUri.Parse()for absolute URIs (e.g., "s3://bucket/key"). For local files, the resolver is optional. For custom providers or cloud storage, create a resolver viaStorageProviderFactory.CreateResolver()and pass it explicitly.
Explore the documentation for each specific connector to learn about its installation, configuration options, and usage examples.
Next Steps
- CSV Connector: Learn how to read from and write to CSV files
- Excel Connector: Learn how to read from and write to Excel files (XLS and XLSX)
- JSON Connector: Learn how to read from and write to JSON files (Array and NDJSON)
- PostgreSQL Connector: Learn how to read from and write to PostgreSQL databases
- SQL Server Connector: Learn how to read from and write to Microsoft SQL Server databases
- Snowflake Connector: Learn how to read from and write to Snowflake cloud data warehouses
- Azure Cosmos DB Connector: Learn how to read from and write to Azure Cosmos DB
- AWS SQS Connector: Learn how to read from and write to Amazon SQS queues
- Kafka Connector: Learn how to read from and write to Apache Kafka topics
- RabbitMQ Connector: Learn how to read from and write to RabbitMQ message queues
- Parquet Connector: Learn how to read from and write to Apache Parquet files
- Data Lake Connector: Learn how to write partitioned tables, use time travel, and compact small files
- DuckDB Connector: Learn how to read from and write to DuckDB databases and query files directly
- Common Patterns: See connectors in practical examples
- Installation: Review installation options for connector packages