Every application eventually needs to move data in bulk — importing a CSV into a database, transforming records between systems, or running nightly data clean-ups. Writing this from scratch means dealing with chunking, error handling, progress tracking, and restartability. That’s a lot of infrastructure code before you even touch your business logic.
NBatch is a lightweight batch processing framework for .NET, inspired by Spring Batch, that handles all of this for you. You define what to read, how to transform, and where to write — NBatch takes care of the rest.
In this post, we’ll build a real ETL pipeline that imports a CSV file of products into a SQL Server database, complete with error handling, chunking, and restart-from-failure support.
What We’re Building
Imagine you receive a daily CSV file from a supplier containing product data:
Name,Category,Price
Widget A,Electronics,29.99
Widget B,Electronics,49.99
Gadget C,Home,15.00
bad-record,,not-a-number
Gadget D,Home,22.50
Notice that one row is malformed. We want our pipeline to:
- Read rows from the CSV file
- Transform each row (e.g., normalize the product name to uppercase)
- Write the transformed products into a database
- Skip bad records instead of crashing the entire job
- Resume from where it left off if the process crashes mid-import
Let’s do it step by step.
Step 1: Install NBatch
dotnet add package NBatch
dotnet add package NBatch.EntityFrameworkCore # for restart-from-failure support
NBatch targets .NET 8, .NET 9, and .NET 10, so it works with any modern .NET project.
Step 2: Define Your Model
Create a simple Product class that represents a row in your CSV and your database table:
public class Product
{
public int Id { get; set; }
public string Name { get; set; } = string.Empty;
public string Category { get; set; } = string.Empty;
public decimal Price { get; set; }
}
Step 3: Set Up Your DbContext
If you’re using Entity Framework Core, set up a standard DbContext:
public class AppDbContext : DbContext
{
public DbSet Products => Set();
public AppDbContext(DbContextOptions options)
: base(options) { }
}
Step 4: Build the Pipeline
Here’s where NBatch shines. The entire pipeline is configured with a fluent builder API:
var dbContext = serviceProvider.GetRequiredService();
var job = Job.CreateBuilder("product-import")
.UseJobStore(connectionString, DatabaseProvider.SqlServer)
.AddStep("import-products", step => step
.ReadFrom(new CsvReader("products.csv", row => new Product
{
Name = row["Name"],
Category = row["Category"],
Price = decimal.Parse(row["Price"])
}))
.ProcessWith(p => new Product
{
Name = p.Name.ToUpperInvariant(),
Category = p.Category,
Price = p.Price
})
.WriteTo(new DbWriter(dbContext))
.WithSkipPolicy(SkipPolicy.For(maxSkips: 5))
.WithChunkSize(100))
.Build();
var result = await job.RunAsync();
Let’s break down what each piece does.
Understanding the Pipeline
Job.CreateBuilder("product-import")
Creates a named job. The name is used for logging and for the job store to track progress across runs.
.UseJobStore(connectionString, DatabaseProvider.SqlServer)
This is optional but powerful. It tells NBatch to persist progress to a SQL Server database. If your application crashes after importing 5,000 of 10,000 records, the next run will pick up at record 5,001 — not start over from scratch. NBatch also supports PostgreSQL, SQLite, and MySQL for the job store.
.ReadFrom(new CsvReader(...))
CsvReader reads delimited text files with automatic header detection. The lambda maps each CsvRow to your domain object. You access columns by header name (row["Name"]), keeping the mapping clean and readable.
.ProcessWith(p => new Product { ... })
The processor transforms each item. Here we’re normalizing the product name to uppercase. Processors can be simple lambdas, async lambdas, or full IProcessor implementations for complex transformations. If you don’t need a transformation, you can skip this step entirely and go straight from reader to writer.
.WriteTo(new DbWriter(dbContext))
DbWriter uses EF Core to batch-insert records. It’s provider-agnostic — whatever database your DbContext targets, DbWriter works with it.
.WithSkipPolicy(SkipPolicy.For(maxSkips: 5))
This is the error-handling strategy. Instead of aborting the entire job when a FormatException occurs (like that malformed CSV row), NBatch skips the bad record and continues. The maxSkips parameter is your safety net — if more than 5 records fail, something is probably wrong with the file, and the job will stop.
.WithChunkSize(100)
NBatch processes data in chunks. Instead of reading the entire file into memory, it reads 100 records at a time, processes them, writes them, and then moves on to the next 100. This keeps memory usage predictable regardless of file size.
Step 5: Add a Notification Step
Real-world pipelines rarely stop at just importing data. You might want to send an email or call an API when the job finishes. NBatch supports tasklet steps for exactly this:
var job = Job.CreateBuilder("product-import")
.UseJobStore(connectionString, DatabaseProvider.SqlServer)
.AddStep("import-products", step => step
.ReadFrom(new CsvReader("products.csv", row => new Product
{
Name = row["Name"],
Category = row["Category"],
Price = decimal.Parse(row["Price"])
}))
.ProcessWith(p => new Product
{
Name = p.Name.ToUpperInvariant(),
Category = p.Category,
Price = p.Price
})
.WriteTo(new DbWriter(dbContext))
.WithSkipPolicy(SkipPolicy.For(maxSkips: 5))
.WithChunkSize(100))
.AddStep("send-notification", step => step
.Execute(() => Console.WriteLine("Import complete!")))
.Build();
Tasklet steps are fire-and-forget units of work — no reader or writer needed. Steps execute sequentially, so the notification only fires after the import succeeds.
Step 6: Run It as a Background Service
For production workloads, you’ll typically want your import job running on a schedule inside a hosted service. NBatch integrates directly with Microsoft’s dependency injection and IHostedService:
builder.Services.AddNBatch(nbatch =>
{
nbatch.AddJob("product-import", (sp, job) => job
.AddStep("import-products", step => step
.ReadFrom(new CsvReader("products.csv", row => new Product
{
Name = row["Name"],
Category = row["Category"],
Price = decimal.Parse(row["Price"])
}))
.WriteTo(new DbWriter(
sp.GetRequiredService()))
.WithChunkSize(100)))
.RunEvery(TimeSpan.FromHours(1));
});
AddNBatch registers the job with the DI container. RunEvery(TimeSpan.FromHours(1)) schedules it to run every hour as a background service. You can also use RunOnce() for one-time startup jobs like database seeding.
You can even trigger jobs on-demand from an API endpoint:
app.MapPost("/jobs/product-import", async (IJobRunner runner, CancellationToken ct) =>
Results.Ok(await runner.RunAsync("product-import", ct)));
Inspecting Job Results
RunAsync returns a JobResult that gives you visibility into what happened:
var result = await job.RunAsync();
Console.WriteLine($"Job succeeded: {result.IsSuccessful}");
foreach (var stepResult in result.StepResults)
{
Console.WriteLine($" Step: {stepResult.StepName}");
Console.WriteLine($" Items read: {stepResult.ItemsRead}");
Console.WriteLine($" Items written: {stepResult.ItemsWritten}");
Console.WriteLine($" Errors skipped: {stepResult.ErrorsSkipped}");
}
This makes it straightforward to log metrics, send alerts, or build dashboards around your batch jobs.
Final Thoughts
What I like about NBatch is that it gives you the structure of a batch processing framework without the weight. You’re not configuring XML files or learning a new DSL — it’s just C# with a fluent API. The reader → processor → writer pattern keeps your pipeline stages cleanly separated, and features like skip policies and restart-from-failure mean you’re building production-grade resilience from the start.
If you’re currently writing ad-hoc while loops to process CSV files or foreach loops to bulk-insert records, NBatch gives you a more structured approach with very little ceremony.
Resources:
