.NET Concurrency Mastery
A deep-dive guide to Tasks, Channels & IObservable — building reliable, high-throughput .NET 6+ systems where no event is ever lost.
Part 1 — The Task Abstraction
A Task is not a thread. This is the single most important mental model shift. A Task represents a promise of a future value (or completion). The runtime decides where and when the work executes. Under the hood, a Task is a state machine node that the ThreadPool can schedule, suspend, and resume.
The Three Task Types
| Type | Returns | Allocation | When to Use |
|---|---|---|---|
Task | Nothing (void-like) | Heap alloc | Async operations without return values |
Task<T> | A value of type T | Heap alloc | Most async operations |
ValueTask<T> | A value of type T | Stack (if sync) | Hot paths where result is often cached/sync |
Task Lifecycle
The Async State Machine — What the Compiler Actually Generates
When you write async Task, the C# compiler rewrites your method into a state machine struct. Understanding this is crucial because it tells you exactly where your code runs, what gets captured, and why exceptions behave the way they do.
Your Code public async Task<string> GetDataAsync(string url, CancellationToken ct) { var client = new HttpClient(); var response = await client.GetAsync(url, ct); var body = await response.Content.ReadAsStringAsync(ct); return body; }
What the compiler actually generates (simplified)
Compiler Output (Simplified) // The compiler creates a struct that implements IAsyncStateMachine [CompilerGenerated] private struct <GetDataAsync>d__0 : IAsyncStateMachine { public int <>1__state; // current position in method public AsyncTaskMethodBuilder<string> <>t__builder; // Your local variables become fields public string url; public CancellationToken ct; private HttpClient <client>5__1; private HttpResponseMessage <response>5__2; // Awaiters for each await point private TaskAwaiter<HttpResponseMessage> <>u__1; private TaskAwaiter<string> <>u__2; void IAsyncStateMachine.MoveNext() { int num = <>1__state; string result; try { if (num == 0) goto Label_Await1; if (num == 1) goto Label_Await2; // State -1: First entry — code before first await <client>5__1 = new HttpClient(); <>u__1 = <client>5__1.GetAsync(url, ct).GetAwaiter(); if (!<>u__1.IsCompleted) { <>1__state = 0; <>t__builder.AwaitUnsafeOnCompleted(ref <>u__1, ref this); return; // ← YIELDS HERE. Thread is free. } Label_Await1: <response>5__2 = <>u__1.GetResult(); // ... similar pattern for second await ... Label_Await2: result = <>u__2.GetResult(); } catch (Exception ex) { <>t__builder.SetException(ex); // ← exception stored in Task return; } <>t__builder.SetResult(result); } }
1) Your local variables become fields on a struct — they live on the heap once boxed.
2) Each await is a potential yield point — the thread returns to the pool.
3) Exceptions are captured into the Task, not thrown on the spot.
4) The struct is initially stack-allocated but gets boxed to heap on first real async suspension.
5) MoveNext() is called by the scheduler to resume — your code runs on whatever thread the pool picks.
Every await that actually suspends causes a heap allocation (boxing the state machine + creating a continuation delegate). If the awaited task is already completed (e.g., cached result), the state machine stays on the stack and MoveNext() just falls through. This is why ValueTask<T> exists.
TaskCompletionSource<T> — Manual Task Control
TaskCompletionSource<T> lets you create a Task that you control. You decide when it completes, faults, or cancels. This is the bridge between callback-based APIs and the async world, and it's essential for building reliable event-driven pipelines.
Pattern: Wrapping a callback API public static Task<byte[]> ReadAllBytesAsync(Stream stream, CancellationToken ct) { var tcs = new TaskCompletionSource<byte[]>( TaskCreationOptions.RunContinuationsAsynchronously // ← CRITICAL ); // Register cancellation BEFORE starting the operation ct.Register(() => tcs.TrySetCanceled(ct)); try { var ms = new MemoryStream(); stream.BeginRead(buffer, 0, buffer.Length, ar => { try { var bytesRead = stream.EndRead(ar); tcs.TrySetResult(ms.ToArray()); } catch (Exception ex) { tcs.TrySetException(ex); // ← never loses the error } }, null); } catch (Exception ex) { tcs.TrySetException(ex); } return tcs.Task; }
Always pass TaskCreationOptions.RunContinuationsAsynchronously when creating a TaskCompletionSource. Without it, calling SetResult() synchronously runs all await continuations on the calling thread. In a high-throughput scenario, this means your producer thread gets hijacked to run consumer code, leading to deadlocks and unpredictable behavior. This is the #1 TCS mistake in production systems.
TCS as a One-Shot Signal
Pattern: Async signal between components public class AsyncSignal { private TaskCompletionSource _tcs = new( TaskCreationOptions.RunContinuationsAsynchronously); // Waiter side — suspends until signaled public Task WaitAsync() => _tcs.Task; // Signaler side — wakes up all waiters public void Signal() => _tcs.TrySetResult(); // Resettable version (use with caution — race conditions!) public void Reset() => Interlocked.Exchange(ref _tcs, new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); }
ValueTask<T> — Zero-Allocation Hot Paths
ValueTask<T> is a discriminated union: it holds either a T value directly or a Task<T>. When the result is available synchronously, there's zero heap allocation — the value stays on the stack.
When ValueTask shines private readonly ConcurrentDictionary<string, byte[]> _cache = new(); public ValueTask<byte[]> GetDataAsync(string key, CancellationToken ct) { // 99% of calls hit the cache — no Task allocated if (_cache.TryGetValue(key, out var data)) return new ValueTask<byte[]>(data); // ← stack only! // Rare: go to network return new ValueTask<byte[]>(FetchFromNetworkAsync(key, ct)); }
1) Await it exactly once. Never store a ValueTask and await it twice.
2) Never use .Result or .GetAwaiter().GetResult() unless IsCompletedSuccessfully is true.
3) Never await concurrently. No WhenAll with ValueTasks — convert to Task first via .AsTask().
4) If you need multiple consumers, call .AsTask() once and share the Task.
Exception Handling — Where Errors Actually Go
This is where most developers get burned. An exception in an async method doesn't throw on the calling thread. It gets stored in the Task, and only re-thrown when someone awaits or accesses .Result.
Exception Flow
The Exception Trap — Unawaited Tasks // THIS SILENTLY SWALLOWS THE EXCEPTION! void StartWork() { DoWorkAsync(); // ← No await! Exception vanishes into the void. } // THIS CATCHES IT async Task StartWorkAsync() { await DoWorkAsync(); // ← Exception re-thrown here } // LAST RESORT: Global observer for unobserved exceptions TaskScheduler.UnobservedTaskException += (sender, e) => { // Log the error — this fires on GC, not immediately! Log.Error(e.Exception, "Unobserved task exception"); e.SetObserved(); // prevents crash in .NET Framework };
An unawaited Task that throws is a silently lost error. In a reliability-focused system, every Task must be awaited, or routed through an explicit fire-and-forget handler (shown later in the Fire-and-Forget section).
Multiple Exceptions (WhenAll)
WhenAll flattens to first exception on await try { await Task.WhenAll(task1, task2, task3); } catch (Exception ex) { // 'ex' is only the FIRST exception! // To get ALL exceptions: } // Reliable pattern: capture the WhenAll task var allTask = Task.WhenAll(task1, task2, task3); try { await allTask; } catch { // Now inspect ALL exceptions foreach (var ex in allTask.Exception.InnerExceptions) { Log.Error(ex, "Task failed"); } throw; // re-throw if needed }
Cancellation Patterns
CancellationToken is cooperative — nothing gets forcefully killed. The code must check the token and decide to stop. This is by design: forced thread aborts corrupt state.
Three ways to respond to cancellation async Task ProcessBatchAsync(IEnumerable<Item> items, CancellationToken ct) { foreach (var item in items) { // Option 1: ThrowIfCancellationRequested — throws OperationCanceledException ct.ThrowIfCancellationRequested(); // Option 2: Check and exit gracefully if (ct.IsCancellationRequested) { Log.Warning("Batch processing cancelled at item {Id}", item.Id); return; // task completes as RanToCompletion, not Canceled! } // Option 3: Pass token to async operations (they handle it) await ProcessItemAsync(item, ct); } } // Creating linked tokens for timeouts using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); using var linkedCts = CancellationTokenSource .CreateLinkedTokenSource(ct, timeoutCts.Token); await ProcessBatchAsync(items, linkedCts.Token);
In a reliable system, distinguish between cancellation (expected shutdown) and failure (unexpected error). Catch OperationCanceledException separately from other exceptions. When shutting down a pipeline, cancel producers first, let channels drain, then cancel consumers.
TaskScheduler & ThreadPool Internals
Every await continuation gets scheduled on a TaskScheduler. The default scheduler uses the .NET ThreadPool, which is a work-stealing thread pool. Understanding this tells you why your code performs the way it does.
ThreadPool Work-Stealing Architecture
Calling .Result or .Wait() on an async method blocks a ThreadPool thread. The pool adds new threads slowly (1 per 500ms). In a high-throughput scenario, this causes cascading starvation. Never do sync-over-async on the ThreadPool. If you must, use Task.Run(() => method().Result) to at least isolate the block.
SynchronizationContext
After an await, the continuation tries to run on the captured SynchronizationContext. In WPF, that's the UI thread's dispatcher. In ASP.NET Core, there's no SynchronizationContext — continuations run on any pool thread. For your WPF + service architecture, this is critical:
ConfigureAwait in libraries vs UI code // In library/service code — ALWAYS use ConfigureAwait(false) public async Task<Data> FetchDataAsync(CancellationToken ct) { var raw = await _http.GetAsync("/api/data", ct) .ConfigureAwait(false); // ← Don't capture SyncContext return Parse(raw); } // In WPF ViewModel — DON'T use ConfigureAwait(false) for UI updates public async Task LoadCommand() { var data = await _service.FetchDataAsync(_cts.Token); // Back on UI thread — safe to update bindings Items = new ObservableCollection<Data>(data); }
Fire-and-Forget Done Right
Sometimes you genuinely need to start a Task and not await it. The key: never let exceptions escape silently.
Safe fire-and-forget extension public static class TaskExtensions { /// <summary> /// Safely fires a task without awaiting. /// Guarantees: exceptions are logged, never lost. /// </summary> public static void SafeFireAndForget( this Task task, ILogger logger, [CallerMemberName] string caller = "") { task.ContinueWith(t => { if (t.IsFaulted) logger.LogError(t.Exception, "Fire-and-forget task faulted in {Caller}", caller); else if (t.IsCanceled) logger.LogWarning( "Fire-and-forget task canceled in {Caller}", caller); }, TaskContinuationOptions.NotOnRanToCompletion); } } // Usage PublishEventAsync(evt, ct).SafeFireAndForget(_logger);
In your service pack tool and agent architectures, use SafeFireAndForget only for truly optional work (telemetry, cache warming). For business-critical operations, always await or route through a Channel for guaranteed processing.
Part 2 — Channels: The Modern Producer-Consumer
System.Threading.Channels is .NET's answer to Go channels. It replaces BlockingCollection<T> with a fully async, high-performance, bounded/unbounded message passing primitive. Think of it as a pipe: producers write to one end, consumers read from the other, and the pipe handles all the synchronization.
Why Channels Over BlockingCollection
| Feature | BlockingCollection<T> | Channel<T> |
|---|---|---|
| Async support | ❌ Blocks threads | ✅ Full async/await |
| Backpressure | Blocking only | Async wait, drop, or error |
| Performance | Lock-based | Lock-free (single reader/writer) |
| Completion | CompleteAdding() | Complete() + error propagation |
| IAsyncEnumerable | ❌ | ✅ ReadAllAsync() |
Channel Anatomy — What's Inside
Creating Channels // Unbounded — grows forever, be careful var unbounded = Channel.CreateUnbounded<Event>(new UnboundedChannelOptions { SingleReader = true, // enables lock-free fast path SingleWriter = false, // multiple producers }); // Bounded — backpressure built in var bounded = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.Wait, // ← backpressure strategy });
Bounded vs Unbounded — The Reliability Trade-Off
An unbounded channel is an OOM exception waiting to happen. If your producer is faster than your consumer (and in high-throughput systems, it will be during spikes), the buffer grows without limit. Bounded channels force you to make an explicit decision about what happens when the pipe is full.
BoundedChannelFullMode — Your Four Options
| Mode | Behavior | Events Lost? | Use Case |
|---|---|---|---|
Wait |
WriteAsync suspends until space |
❌ Never | Reliable pipelines — your default choice |
DropOldest |
Removes oldest item, writes new | ✅ Old items | Latest-value-wins (sensor data, UI updates) |
DropNewest |
Drops the item being written | ✅ New items | Rate limiting (discard excess) |
DropWrite |
Same as DropNewest | ✅ New items | Alias for DropNewest |
Detecting dropped messages (DropOldest/DropNewest) // TryWrite returns false when the channel is full in drop modes // BUT WriteAsync always succeeds (it drops silently!) // For monitoring, use TryWrite and track failures: if (!channel.Writer.TryWrite(item)) { Interlocked.Increment(ref _droppedCount); _logger.LogWarning("Channel full, dropped event {Id}", item.Id); }
Backpressure — The Art of Slowing Down Gracefully
Backpressure means: when the consumer can't keep up, the producer slows down rather than the system failing. With BoundedChannelFullMode.Wait, the producer's WriteAsync suspends (yields the thread!) until space is available. No threads are blocked. No events are lost.
Backpressure in action var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(100) { FullMode = BoundedChannelFullMode.Wait }); // Producer — will slow down when buffer hits 100 async Task ProduceAsync(ChannelWriter<WorkItem> writer, CancellationToken ct) { try { await foreach (var item in GetItemsAsync(ct)) { // This will asynchronously wait if the channel is full // The thread is NOT blocked — it yields back to the pool await writer.WriteAsync(item, ct); } } finally { writer.Complete(); // ← Signal: no more items coming } } // Consumer — reads at its own pace async Task ConsumeAsync(ChannelReader<WorkItem> reader, CancellationToken ct) { await foreach (var item in reader.ReadAllAsync(ct)) { await ProcessAsync(item, ct); } // ReadAllAsync completes when writer calls Complete() }
Producer-Consumer Patterns
Pattern: Single Producer → Multiple Consumers (Fan-Out)
Multiple consumers competing for work async Task RunPipelineAsync(CancellationToken ct) { var channel = Channel.CreateBounded<WorkItem>(500); // 1 producer var producer = ProduceAsync(channel.Writer, ct); // N consumers (all reading from same reader = competing consumers) var consumers = Enumerable.Range(0, Environment.ProcessorCount) .Select(_ => ConsumeAsync(channel.Reader, ct)) .ToArray(); // Wait for everything await producer; // completes the writer await Task.WhenAll(consumers); // drain remaining items }
Pattern: Multiple Producers → Single Consumer (Fan-In)
Merging multiple event sources var channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000) { SingleReader = true, SingleWriter = false, // ← multiple writers }); // Multiple sources write to the same channel var producer1 = WatchFileSystemAsync(channel.Writer, ct); var producer2 = WatchNetworkAsync(channel.Writer, ct); var producer3 = WatchTimerAsync(channel.Writer, ct); // Single consumer processes all events in order var consumer = ProcessEventsAsync(channel.Reader, ct); // When ALL producers are done, complete the writer await Task.WhenAll(producer1, producer2, producer3); channel.Writer.Complete(); await consumer;
Pattern: Pipeline (Channel Chaining)
Multi-stage processing pipeline // Stage 1: Parse raw data var rawChannel = Channel.CreateBounded<byte[]>(1000); var parsedChannel = Channel.CreateBounded<ParsedEvent>(500); var enrichedChannel = Channel.CreateBounded<EnrichedEvent>(200); // Each stage reads from one channel and writes to the next var parse = TransformAsync(rawChannel.Reader, parsedChannel.Writer, ParseEvent, ct); var enrich = TransformAsync(parsedChannel.Reader, enrichedChannel.Writer, EnrichEvent, ct); var store = ConsumeAsync(enrichedChannel.Reader, StoreEvent, ct); // Generic transform stage async Task TransformAsync<TIn, TOut>( ChannelReader<TIn> reader, ChannelWriter<TOut> writer, Func<TIn, TOut> transform, CancellationToken ct) { try { await foreach (var item in reader.ReadAllAsync(ct)) { var result = transform(item); await writer.WriteAsync(result, ct); } } finally { writer.Complete(); } }
Completion & Error Propagation
Channel completion is a one-way gate: once you call Complete(), no more writes are allowed. The reader sees completion when the buffer drains. You can also pass an exception to signal a fatal error.
Error propagation through channels // Producer encounters a fatal error try { await foreach (var item in source) await writer.WriteAsync(item, ct); writer.Complete(); // normal completion } catch (Exception ex) { writer.Complete(ex); // ← error propagation! } // Consumer side — the error surfaces here try { await foreach (var item in reader.ReadAllAsync(ct)) await ProcessAsync(item, ct); } catch (ChannelClosedException ex) { // The inner exception is what the producer passed to Complete() _logger.LogError(ex.InnerException, "Pipeline upstream failed"); } // You can also observe completion as a Task await reader.Completion; // throws if Complete(exception) was called
Performance Internals
How Channels achieve lock-free performance
When you set SingleReader = true or SingleWriter = true, the Channel implementation switches to a specialized internal class that uses Interlocked operations instead of locks. Here's what happens under the hood:
| Scenario | Throughput (approx) |
|---|---|
| Unbounded, Single R/W | ~50M items/sec |
| Unbounded, Multi R/W | ~15M items/sec |
| Bounded (1000), Single R/W | ~30M items/sec |
| Bounded (1000), Multi R/W | ~8M items/sec |
| BlockingCollection (for comparison) | ~2M items/sec |
At thousands of events per second, you're nowhere near Channel limits. The real bottleneck will be your processing logic. But the lock-free path matters when you have many channels in a system — less contention means more predictable latency.
Part 3 — IObservable<T>: The Observer Contract
IObservable<T> and IObserver<T> are built into the BCL (no NuGet needed). They define a push-based notification pattern with a strict contract. Think of it as the inverse of IEnumerable<T>: instead of you pulling items, items are pushed to you.
The two interfaces — that's it public interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); } public interface IObserver<in T> { void OnNext(T value); // here's a new item void OnError(Exception error); // something broke void OnCompleted(); // no more items }
The Observable Contract (You MUST Follow This)
1) Zero or more OnNext calls, followed by at most one OnError or OnCompleted.
2) After OnError or OnCompleted, no more calls to the observer. Ever.
3) Calls must be serialized — no concurrent OnNext calls from different threads.
4) Subscribe returns an IDisposable — disposing it unsubscribes.
Breaking these rules leads to undefined behavior. There is no runtime enforcement.
Building Observables From Scratch (No Library)
Without System.Reactive, you build everything yourself. This is actually great for understanding what Rx does — and for keeping your deployment footprint small in air-gapped environments.
Base observable implementation public sealed class EventSource<T> : IObservable<T>, IDisposable { private readonly ConcurrentDictionary<Guid, IObserver<T>> _observers = new(); private volatile bool _completed; private Exception? _error; public IDisposable Subscribe(IObserver<T> observer) { if (_completed) { // Late subscriber sees terminal event immediately if (_error != null) observer.OnError(_error); else observer.OnCompleted(); return Disposable.Empty; } var id = Guid.NewGuid(); _observers.TryAdd(id, observer); return new Unsubscriber(() => _observers.TryRemove(id, out _)); } public void Publish(T value) { if (_completed) return; foreach (var observer in _observers.Values) { try { observer.OnNext(value); } catch (Exception ex) { // Observer threw — remove it, don't kill the source Log.Error(ex, "Observer faulted, removing"); } } } public void Complete() { _completed = true; foreach (var observer in _observers.Values) observer.OnCompleted(); _observers.Clear(); } public void Error(Exception ex) { _error = ex; _completed = true; foreach (var observer in _observers.Values) observer.OnError(ex); _observers.Clear(); } public void Dispose() => Complete(); } // Helpers internal sealed class Unsubscriber : IDisposable { private Action? _unsubscribe; public Unsubscriber(Action unsubscribe) => _unsubscribe = unsubscribe; public void Dispose() => Interlocked.Exchange(ref _unsubscribe, null)?.Invoke(); } internal static class Disposable { public static readonly IDisposable Empty = new EmptyDisposable(); private sealed class EmptyDisposable : IDisposable { public void Dispose() { } } }
Custom Operators — Building a Mini-Rx
Without the System.Reactive library, you build operators as extension methods that return new observables. Each operator subscribes to the source and transforms notifications.
Where (filter) operator public static IObservable<T> Where<T>( this IObservable<T> source, Func<T, bool> predicate) { return new FilterObservable<T>(source, predicate); } internal sealed class FilterObservable<T> : IObservable<T> { private readonly IObservable<T> _source; private readonly Func<T, bool> _predicate; public FilterObservable(IObservable<T> source, Func<T, bool> predicate) { _source = source; _predicate = predicate; } public IDisposable Subscribe(IObserver<T> observer) => _source.Subscribe(new FilterObserver(observer, _predicate)); private sealed class FilterObserver : IObserver<T> { private readonly IObserver<T> _downstream; private readonly Func<T, bool> _predicate; public FilterObserver(IObserver<T> downstream, Func<T, bool> predicate) { _downstream = downstream; _predicate = predicate; } public void OnNext(T value) { try { if (_predicate(value)) _downstream.OnNext(value); } catch (Exception ex) { _downstream.OnError(ex); // predicate threw → error } } public void OnError(Exception error) => _downstream.OnError(error); public void OnCompleted() => _downstream.OnCompleted(); } }
Select (map) operator public static IObservable<TOut> Select<TIn, TOut>( this IObservable<TIn> source, Func<TIn, TOut> selector) { return new SelectObservable<TIn, TOut>(source, selector); } // Implementation follows same pattern as FilterObservable // — wrap source, intercept OnNext, transform value
Buffer operator — batches items by count public static IObservable<IReadOnlyList<T>> Buffer<T>( this IObservable<T> source, int count) { return new BufferObservable<T>(source, count); } internal sealed class BufferObservable<T> : IObservable<IReadOnlyList<T>> { private readonly IObservable<T> _source; private readonly int _count; public BufferObservable(IObservable<T> source, int count) { _source = source; _count = count; } public IDisposable Subscribe(IObserver<IReadOnlyList<T>> observer) => _source.Subscribe(new BufferObserver(observer, _count)); private sealed class BufferObserver : IObserver<T> { private readonly IObserver<IReadOnlyList<T>> _downstream; private readonly int _count; private List<T> _buffer; public BufferObserver(IObserver<IReadOnlyList<T>> downstream, int count) { _downstream = downstream; _count = count; _buffer = new List<T>(count); } public void OnNext(T value) { _buffer.Add(value); if (_buffer.Count >= _count) { _downstream.OnNext(_buffer); _buffer = new List<T>(_count); } } public void OnCompleted() { // Flush remaining items if (_buffer.Count > 0) _downstream.OnNext(_buffer); _downstream.OnCompleted(); } public void OnError(Exception error) => _downstream.OnError(error); } }
Throttle operator — rate limits by time window public static IObservable<T> Throttle<T>( this IObservable<T> source, TimeSpan window) { return new ThrottleObservable<T>(source, window); } internal sealed class ThrottleObservable<T> : IObservable<T> { private readonly IObservable<T> _source; private readonly TimeSpan _window; public ThrottleObservable(IObservable<T> source, TimeSpan window) { _source = source; _window = window; } public IDisposable Subscribe(IObserver<T> observer) => _source.Subscribe(new ThrottleObserver(observer, _window)); private sealed class ThrottleObserver : IObserver<T> { private readonly IObserver<T> _downstream; private readonly long _windowTicks; private long _lastEmitTimestamp; public ThrottleObserver(IObserver<T> downstream, TimeSpan window) { _downstream = downstream; _windowTicks = window.Ticks; } public void OnNext(T value) { var now = Stopwatch.GetTimestamp(); var last = Interlocked.Read(ref _lastEmitTimestamp); var elapsed = Stopwatch.GetElapsedTime(last, now); if (elapsed.Ticks >= _windowTicks) { Interlocked.Exchange(ref _lastEmitTimestamp, now); _downstream.OnNext(value); } // else: swallowed (throttled) } public void OnError(Exception error) => _downstream.OnError(error); public void OnCompleted() => _downstream.OnCompleted(); } }
Thread Safety — Serializing Observer Calls
The Observable contract requires serialized OnNext calls. If your source fires from multiple threads, you must serialize. Here's a gate that enforces this:
Serializing gate for thread-safe observation public sealed class SerializedObserver<T> : IObserver<T> { private readonly IObserver<T> _inner; private readonly object _gate = new(); private bool _done; public SerializedObserver(IObserver<T> inner) => _inner = inner; public void OnNext(T value) { lock (_gate) { if (!_done) _inner.OnNext(value); } } public void OnError(Exception error) { lock (_gate) { if (_done) return; _done = true; _inner.OnError(error); } } public void OnCompleted() { lock (_gate) { if (_done) return; _done = true; _inner.OnCompleted(); } } }
Hot vs Cold Observables
| Property | Cold | Hot |
|---|---|---|
| Data produced | Per subscriber (lazy) | Regardless of subscribers |
| Subscribe timing | Gets all items from start | Gets items from now on |
| Example | File read, HTTP request | Mouse events, sensor data |
| Missed events? | Never | Yes, if subscribed late |
Cold observable — each subscriber gets its own sequence public class FileLineObservable : IObservable<string> { private readonly string _path; public FileLineObservable(string path) => _path = path; public IDisposable Subscribe(IObserver<string> observer) { var cts = new CancellationTokenSource(); // Each subscriber triggers a new read Task.Run(async () => { try { await foreach (var line in File.ReadLinesAsync(_path, cts.Token)) observer.OnNext(line); observer.OnCompleted(); } catch (OperationCanceledException) { /* unsubscribed */ } catch (Exception ex) { observer.OnError(ex); } }, cts.Token); return new Unsubscriber(() => cts.Cancel()); } }
Error Handling in Observables
Once OnError is called, the sequence is dead. No more OnNext calls. In a high-throughput system where you want to keep processing despite individual item failures, you need an error isolation pattern.
Error isolation — don't let one bad item kill the stream public static IObservable<T> CatchAndContinue<T>( this IObservable<T> source, Action<Exception> onError) { return new CatchContinueObservable<T>(source, onError); } // This wraps each OnNext in a try-catch so the observer's // exception doesn't propagate back to the source internal sealed class SafeObserver<T> : IObserver<T> { private readonly Action<T> _onNext; private readonly Action<Exception> _onError; private readonly Action _onCompleted; public SafeObserver( Action<T> onNext, Action<Exception>? onError = null, Action? onCompleted = null) { _onNext = onNext; _onError = onError ?? (ex => { }); _onCompleted = onCompleted ?? (() => { }); } public void OnNext(T value) { try { _onNext(value); } catch (Exception ex) { _onError(ex); } } public void OnError(Exception error) => _onError(error); public void OnCompleted() => _onCompleted(); }
Part 4 — Reliability: Never Lose an Event
This is the core of your question. Let's build up from first principles. There are exactly three places where an event can be lost in a .NET async pipeline:
Exception Walls — Isolating Failures
An "exception wall" is a try-catch around each unit of work that never lets the processing loop die. The loop itself is sacred — individual items can fail, but the consumer keeps consuming.
The Indestructible Consumer Loop async Task ConsumeForeverAsync( ChannelReader<Event> reader, ChannelWriter<Event> deadLetterWriter, ILogger logger, CancellationToken ct) { await foreach (var evt in reader.ReadAllAsync(ct)) { try { await ProcessEventAsync(evt, ct); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { // Shutdown requested — re-queue unprocessed item deadLetterWriter.TryWrite(evt); break; } catch (Exception ex) { // EXCEPTION WALL: log, dead-letter, continue logger.LogError(ex, "Failed to process event {Id}", evt.Id); evt.FailCount++; evt.LastError = ex.Message; deadLetterWriter.TryWrite(evt); // ← never lost } } }
Every event that enters the system has exactly one of three outcomes:
1) Successfully processed
2) Sent to dead letter queue for retry/investigation
3) Explicitly dropped with a log entry explaining why
There is no fourth option. No event silently disappears.
Guaranteed Processing Pattern
Acknowledge-based processing — know when an event is handled public readonly struct Envelope<T> { public T Payload { get; } public TaskCompletionSource Acknowledgment { get; } public CancellationToken CancellationToken { get; } public Envelope(T payload, CancellationToken ct) { Payload = payload; Acknowledgment = new TaskCompletionSource( TaskCreationOptions.RunContinuationsAsynchronously); CancellationToken = ct; } public void Ack() => Acknowledgment.TrySetResult(); public void Nack(Exception ex) => Acknowledgment.TrySetException(ex); } // Producer: sends and WAITS for acknowledgment async Task SendReliableAsync<T>( ChannelWriter<Envelope<T>> writer, T payload, CancellationToken ct) { var envelope = new Envelope<T>(payload, ct); await writer.WriteAsync(envelope, ct); // Now wait until the consumer acknowledges await envelope.Acknowledgment.Task; // If we get here, the event was successfully processed! } // Consumer: processes and acknowledges async Task ConsumeReliableAsync<T>( ChannelReader<Envelope<T>> reader, Func<T, CancellationToken, Task> handler, CancellationToken ct) { await foreach (var envelope in reader.ReadAllAsync(ct)) { try { await handler(envelope.Payload, envelope.CancellationToken); envelope.Ack(); // ← producer gets unblocked } catch (Exception ex) { envelope.Nack(ex); // ← producer gets the exception } } }
Dead Letter Queues
DLQ with retry capability public sealed class DeadLetterQueue<T> : IDisposable { private readonly Channel<FailedEvent<T>> _channel; private readonly ILogger _logger; private readonly int _maxRetries; public DeadLetterQueue(ILogger logger, int maxRetries = 3, int capacity = 10_000) { _logger = logger; _maxRetries = maxRetries; _channel = Channel.CreateBounded<FailedEvent<T>>( new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.DropOldest // sacrifice oldest failures }); } public void Enqueue(T item, Exception ex, int attempt) { var failed = new FailedEvent<T>(item, ex, attempt, DateTime.UtcNow); if (!_channel.Writer.TryWrite(failed)) _logger.LogError("DLQ full, dropping failed event"); } public async Task RetryAsync( Func<T, CancellationToken, Task> handler, ChannelWriter<T>? requeue, CancellationToken ct) { await foreach (var failed in _channel.Reader.ReadAllAsync(ct)) { if (failed.Attempt >= _maxRetries) { _logger.LogError(failed.Error, "Permanently failed after {Attempts} attempts", failed.Attempt); continue; // truly dead — log and move on } // Exponential backoff var delay = TimeSpan.FromSeconds(Math.Pow(2, failed.Attempt)); await Task.Delay(delay, ct); if (requeue != null) { // Re-inject into the main pipeline await requeue.WriteAsync(failed.Item, ct); } } } public void Dispose() => _channel.Writer.TryComplete(); } public record FailedEvent<T>(T Item, Exception Error, int Attempt, DateTime FailedAt);
Retry & Circuit Breaker (No External Libraries)
Simple retry with exponential backoff public static async Task<T> RetryAsync<T>( Func<CancellationToken, Task<T>> operation, int maxRetries, CancellationToken ct, ILogger? logger = null) { for (int attempt = 0; ; attempt++) { try { return await operation(ct); } catch (Exception ex) when (attempt < maxRetries && !ct.IsCancellationRequested) { var delay = TimeSpan.FromMilliseconds( Math.Pow(2, attempt) * 100 + Random.Shared.Next(50)); // jitter logger?.LogWarning(ex, "Attempt {Attempt} failed, retrying in {Delay}ms", attempt + 1, delay.TotalMilliseconds); await Task.Delay(delay, ct); } } }
Lightweight circuit breaker public sealed class CircuitBreaker { private readonly int _failureThreshold; private readonly TimeSpan _resetTimeout; private int _failureCount; private DateTime _lastFailure; private volatile bool _isOpen; public CircuitBreaker(int failureThreshold = 5, int resetSeconds = 30) { _failureThreshold = failureThreshold; _resetTimeout = TimeSpan.FromSeconds(resetSeconds); } public bool IsOpen => _isOpen && (DateTime.UtcNow - _lastFailure) < _resetTimeout; public async Task<T> ExecuteAsync<T>( Func<CancellationToken, Task<T>> operation, CancellationToken ct) { if (IsOpen) throw new CircuitBreakerOpenException( "Circuit open, try again later"); try { var result = await operation(ct); Interlocked.Exchange(ref _failureCount, 0); // reset on success _isOpen = false; return result; } catch { _lastFailure = DateTime.UtcNow; if (Interlocked.Increment(ref _failureCount) >= _failureThreshold) _isOpen = true; throw; } } } public class CircuitBreakerOpenException : Exception { public CircuitBreakerOpenException(string msg) : base(msg) { } }
Graceful Shutdown
In a reliable system, shutting down means: stop accepting new events → drain all queues → complete in-flight processing → exit.
Hosted service with graceful shutdown public sealed class EventProcessingService : BackgroundService { private readonly Channel<Event> _channel; private readonly ILogger<EventProcessingService> _logger; public EventProcessingService(ILogger<EventProcessingService> logger) { _logger = logger; _channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(5000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true, }); } // Public API for producers public ChannelWriter<Event> Writer => _channel.Writer; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Event processor starting"); try { await foreach (var evt in _channel.Reader.ReadAllAsync(stoppingToken)) { try { await ProcessAsync(evt, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // Shutdown requested mid-processing _logger.LogWarning("Shutdown during processing of {Id}", evt.Id); break; } catch (Exception ex) { _logger.LogError(ex, "Failed {Id}", evt.Id); } } } catch (OperationCanceledException) { // ReadAllAsync was canceled — drain remaining items _logger.LogInformation("Draining remaining events..."); while (_channel.Reader.TryRead(out var evt)) { try { // Process with a short timeout using var drainCts = new CancellationTokenSource( TimeSpan.FromSeconds(5)); await ProcessAsync(evt, drainCts.Token); } catch (Exception ex) { _logger.LogError(ex, "Failed during drain: {Id}", evt.Id); } } } _logger.LogInformation("Event processor stopped"); } public override async Task StopAsync(CancellationToken ct) { // Signal no more writes _channel.Writer.TryComplete(); await base.StopAsync(ct); } }
Part 5 — The Full Pipeline: Observable → Channel → Task
Here's where everything comes together. The pattern is: Observable produces events → Channel provides backpressure → Task consumers process reliably.
The Bridge: Observable → Channel public sealed class ObservableToChannelBridge<T> : IObserver<T>, IDisposable { private readonly ChannelWriter<T> _writer; private readonly ILogger _logger; private readonly IDisposable _subscription; private long _received; private long _dropped; public ObservableToChannelBridge( IObservable<T> source, ChannelWriter<T> writer, ILogger logger) { _writer = writer; _logger = logger; _subscription = source.Subscribe(this); } public void OnNext(T value) { Interlocked.Increment(ref _received); // TryWrite is non-blocking — critical for high-frequency sources! // If the channel is full, we log and count the drop. if (!_writer.TryWrite(value)) { var dropped = Interlocked.Increment(ref _dropped); if (dropped % 1000 == 0) // don't spam logs _logger.LogWarning( "Channel backpressure: {Dropped} events dropped", dropped); } } public void OnError(Exception error) { _logger.LogError(error, "Observable source faulted"); _writer.TryComplete(error); } public void OnCompleted() { _logger.LogInformation( "Observable completed. Received: {Received}, Dropped: {Dropped}", _received, _dropped); _writer.TryComplete(); } public void Dispose() => _subscription.Dispose(); // Diagnostics public long Received => Interlocked.Read(ref _received); public long Dropped => Interlocked.Read(ref _dropped); }
OnNext is synchronous — you cannot call await inside it. If the channel is full, TryWrite returns false immediately. You have two choices: drop the event (with a log) or use a secondary unbounded buffer as a spillover. For thousands of events per second, dropping with monitoring is the pragmatic choice — trying to buffer everything just delays the OOM.
Fan-Out / Fan-In With Observables and Channels
Observable → Multiple typed channels → Merged consumer public sealed class EventRouter<T> : IObserver<T> { private readonly Dictionary<Type, Action<T>> _routes = new(); private readonly ILogger _logger; private Action<T>? _defaultRoute; public EventRouter(ILogger logger) => _logger = logger; public EventRouter<T> Route<TEvent>( ChannelWriter<T> writer) where TEvent : T { _routes[typeof(TEvent)] = item => { if (!writer.TryWrite(item)) _logger.LogWarning("Route for {Type} full", typeof(TEvent).Name); }; return this; } public EventRouter<T> Default(ChannelWriter<T> writer) { _defaultRoute = item => writer.TryWrite(item); return this; } public void OnNext(T value) { if (_routes.TryGetValue(value!.GetType(), out var route)) route(value); else _defaultRoute?.Invoke(value); } public void OnError(Exception error) => _logger.LogError(error, "Source faulted"); public void OnCompleted() => _logger.LogInformation("Source completed"); }
High-Frequency Scenarios — Thousands of Events/Second
When your observable fires thousands of times per second, you need batching and throttling between the observable and the channel. Processing items one-by-one at that rate creates too much async overhead.
Time-windowed batch bridge public sealed class BatchingBridge<T> : IObserver<T>, IDisposable { private readonly ChannelWriter<IReadOnlyList<T>> _writer; private readonly ILogger _logger; private readonly IDisposable _subscription; private readonly Timer _flushTimer; private readonly int _maxBatchSize; private readonly object _lock = new(); private List<T> _buffer; public BatchingBridge( IObservable<T> source, ChannelWriter<IReadOnlyList<T>> writer, ILogger logger, int maxBatchSize = 100, TimeSpan? flushInterval = null) { _writer = writer; _logger = logger; _maxBatchSize = maxBatchSize; _buffer = new List<T>(maxBatchSize); var interval = flushInterval ?? TimeSpan.FromMilliseconds(100); _flushTimer = new Timer(_ => Flush(), null, interval, interval); _subscription = source.Subscribe(this); } public void OnNext(T value) { List<T>? batchToSend = null; lock (_lock) { _buffer.Add(value); if (_buffer.Count >= _maxBatchSize) { batchToSend = _buffer; _buffer = new List<T>(_maxBatchSize); } } if (batchToSend != null) SendBatch(batchToSend); } private void Flush() { List<T>? batchToSend = null; lock (_lock) { if (_buffer.Count > 0) { batchToSend = _buffer; _buffer = new List<T>(_maxBatchSize); } } if (batchToSend != null) SendBatch(batchToSend); } private void SendBatch(List<T> batch) { if (!_writer.TryWrite(batch)) _logger.LogWarning("Batch channel full, dropped {Count} items", batch.Count); } public void OnError(Exception error) { Flush(); // flush remaining before error _writer.TryComplete(error); } public void OnCompleted() { Flush(); // flush remaining _writer.TryComplete(); } public void Dispose() { _subscription.Dispose(); _flushTimer.Dispose(); Flush(); } }
Real-World Example: High-Frequency Sensor Processing
Imagine a deployment agent receiving status updates from multiple machines — thousands per second. Each update needs to be validated, enriched, and stored. Here's the complete, production-ready pipeline:
Complete pipeline: Observable → Channel → Processing → DLQ public sealed class SensorPipeline : IDisposable { private readonly Channel<IReadOnlyList<SensorReading>> _ingestChannel; private readonly Channel<EnrichedReading> _processChannel; private readonly DeadLetterQueue<SensorReading> _dlq; private readonly BatchingBridge<SensorReading> _bridge; private readonly CancellationTokenSource _cts = new(); private readonly ILogger _logger; private readonly List<Task> _workers = new(); public SensorPipeline( IObservable<SensorReading> sensorSource, ILogger<SensorPipeline> logger) { _logger = logger; // Stage 1: Batched ingest (observable → batched channel) _ingestChannel = Channel.CreateBounded<IReadOnlyList<SensorReading>>( new BoundedChannelOptions(100) // 100 batches × 100 items = 10K buffer { SingleReader = false, // multiple enrichment workers SingleWriter = true, // only the bridge writes FullMode = BoundedChannelFullMode.Wait, }); // Stage 2: Enriched processing channel _processChannel = Channel.CreateBounded<EnrichedReading>( new BoundedChannelOptions(5000) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.Wait, }); // Dead letter queue for failed items _dlq = new DeadLetterQueue<SensorReading>(logger); // Bridge: Observable → Batched Channel _bridge = new BatchingBridge<SensorReading>( sensorSource, _ingestChannel.Writer, logger, maxBatchSize: 100, flushInterval: TimeSpan.FromMilliseconds(50)); } public Task StartAsync() { var ct = _cts.Token; // Enrichment workers (fan-out from ingest channel) for (int i = 0; i < Environment.ProcessorCount; i++) { _workers.Add(Task.Run(() => EnrichWorkerAsync(_ingestChannel.Reader, _processChannel.Writer, ct), ct)); } // Storage worker (single writer for DB consistency) _workers.Add(Task.Run(() => StoreWorkerAsync(_processChannel.Reader, ct), ct)); // DLQ retry worker _workers.Add(Task.Run(() => _dlq.RetryAsync(ReprocessAsync, null, ct), ct)); _logger.LogInformation( "Pipeline started with {Workers} enrichment workers", Environment.ProcessorCount); return Task.CompletedTask; } private async Task EnrichWorkerAsync( ChannelReader<IReadOnlyList<SensorReading>> reader, ChannelWriter<EnrichedReading> writer, CancellationToken ct) { await foreach (var batch in reader.ReadAllAsync(ct)) { foreach (var reading in batch) { try { var enriched = Enrich(reading); await writer.WriteAsync(enriched, ct); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { return; } catch (Exception ex) { // Exception wall — item goes to DLQ, loop continues _dlq.Enqueue(reading, ex, 0); } } } } private async Task StoreWorkerAsync( ChannelReader<EnrichedReading> reader, CancellationToken ct) { var batch = new List<EnrichedReading>(200); await foreach (var item in reader.ReadAllAsync(ct)) { batch.Add(item); // Micro-batch: accumulate up to 200 or until channel is empty while (batch.Count < 200 && reader.TryRead(out var more)) batch.Add(more); try { await BulkStoreAsync(batch, ct); } catch (Exception ex) { _logger.LogError(ex, "Bulk store failed for {Count} items", batch.Count); } batch.Clear(); } } public async Task StopAsync() { _logger.LogInformation("Pipeline shutting down..."); // 1. Stop the observable bridge (no new events) _bridge.Dispose(); // 2. Complete the ingest channel (let workers drain) _ingestChannel.Writer.TryComplete(); // 3. Wait for enrichment workers to finish // They'll complete the process channel when done try { await Task.WhenAll(_workers) .WaitAsync(TimeSpan.FromSeconds(30)); } catch (TimeoutException) { _logger.LogWarning("Shutdown timed out, canceling workers"); _cts.Cancel(); } _dlq.Dispose(); _logger.LogInformation("Pipeline stopped"); } // Placeholder methods private EnrichedReading Enrich(SensorReading r) => new(r); private Task BulkStoreAsync(List<EnrichedReading> batch, CancellationToken ct) => Task.CompletedTask; private Task ReprocessAsync(SensorReading r, CancellationToken ct) => Task.CompletedTask; public void Dispose() { _bridge.Dispose(); _cts.Dispose(); _dlq.Dispose(); } } public record SensorReading(string SensorId, double Value, DateTime Timestamp); public record EnrichedReading(SensorReading Raw);
Production Checklist
Tasks
☐ Every Task is awaited or routed through SafeFireAndForget
☐ TaskScheduler.UnobservedTaskException handler is registered
☐ ConfigureAwait(false) in all library/service code
☐ CancellationToken threaded through every async method
☐ TaskCompletionSource uses RunContinuationsAsynchronously
☐ No sync-over-async (.Result, .Wait()) on ThreadPool threads
Channels
☐ All channels are Bounded with explicit FullMode
☐ SingleReader/SingleWriter set correctly (and truthfully!)
☐ Writer completion is guaranteed (in finally blocks)
☐ Consumers handle ChannelClosedException
☐ Dropped message counter exposed for monitoring
Observables
☐ Observer contract followed: serialized OnNext, terminal is final
☐ IDisposable subscriptions tracked and disposed on shutdown
☐ Exception in observer doesn't kill the source (exception wall)
☐ Multi-threaded sources wrapped with SerializedObserver
Reliability
☐ Every consumer loop has an exception wall (try-catch per item)
☐ Failed items go to a Dead Letter Queue, not nowhere
☐ Graceful shutdown drains all channels before exit
☐ Circuit breaker on external dependencies
☐ Backpressure strategy documented and monitored
☐ Log dropped/failed event counts as metrics