.NET Concurrency Mastery

A deep-dive guide to Tasks, Channels & IObservable — building reliable, high-throughput .NET 6+ systems where no event is ever lost.

Part 1 — The Task Abstraction

A Task is not a thread. This is the single most important mental model shift. A Task represents a promise of a future value (or completion). The runtime decides where and when the work executes. Under the hood, a Task is a state machine node that the ThreadPool can schedule, suspend, and resume.

The Three Task Types

TypeReturnsAllocationWhen to Use
TaskNothing (void-like)Heap allocAsync operations without return values
Task<T>A value of type THeap allocMost async operations
ValueTask<T>A value of type TStack (if sync)Hot paths where result is often cached/sync

Task Lifecycle

Created ──→ WaitingForActivation ──→ Running ──┬──→ RanToCompletion ├──→ Faulted └──→ Canceled Key insight: an async method returns a task in WaitingForActivation state. The task moves to Running only when the scheduler picks it up. Most tasks you interact with are already running when you get them.

The Async State Machine — What the Compiler Actually Generates

When you write async Task, the C# compiler rewrites your method into a state machine struct. Understanding this is crucial because it tells you exactly where your code runs, what gets captured, and why exceptions behave the way they do.

Your Code
public async Task<string> GetDataAsync(string url, CancellationToken ct)
{
    var client = new HttpClient();
    var response = await client.GetAsync(url, ct);
    var body = await response.Content.ReadAsStringAsync(ct);
    return body;
}
What the compiler actually generates (simplified)
Compiler Output (Simplified)
// The compiler creates a struct that implements IAsyncStateMachine
[CompilerGenerated]
private struct <GetDataAsync>d__0 : IAsyncStateMachine
{
    public int <>1__state;           // current position in method
    public AsyncTaskMethodBuilder<string> <>t__builder;
    
    // Your local variables become fields
    public string url;
    public CancellationToken ct;
    private HttpClient <client>5__1;
    private HttpResponseMessage <response>5__2;
    
    // Awaiters for each await point
    private TaskAwaiter<HttpResponseMessage> <>u__1;
    private TaskAwaiter<string> <>u__2;

    void IAsyncStateMachine.MoveNext()
    {
        int num = <>1__state;
        string result;
        try
        {
            if (num == 0) goto Label_Await1;
            if (num == 1) goto Label_Await2;
            
            // State -1: First entry — code before first await
            <client>5__1 = new HttpClient();
            <>u__1 = <client>5__1.GetAsync(url, ct).GetAwaiter();
            
            if (!<>u__1.IsCompleted)
            {
                <>1__state = 0;
                <>t__builder.AwaitUnsafeOnCompleted(ref <>u__1, ref this);
                return; // ← YIELDS HERE. Thread is free.
            }
            
            Label_Await1:
            <response>5__2 = <>u__1.GetResult();
            // ... similar pattern for second await ...
            
            Label_Await2:
            result = <>u__2.GetResult();
        }
        catch (Exception ex)
        {
            <>t__builder.SetException(ex); // ← exception stored in Task
            return;
        }
        <>t__builder.SetResult(result);
    }
}
Key Takeaways

1) Your local variables become fields on a struct — they live on the heap once boxed.
2) Each await is a potential yield point — the thread returns to the pool.
3) Exceptions are captured into the Task, not thrown on the spot.
4) The struct is initially stack-allocated but gets boxed to heap on first real async suspension.
5) MoveNext() is called by the scheduler to resume — your code runs on whatever thread the pool picks.

Performance Implication

Every await that actually suspends causes a heap allocation (boxing the state machine + creating a continuation delegate). If the awaited task is already completed (e.g., cached result), the state machine stays on the stack and MoveNext() just falls through. This is why ValueTask<T> exists.

TaskCompletionSource<T> — Manual Task Control

TaskCompletionSource<T> lets you create a Task that you control. You decide when it completes, faults, or cancels. This is the bridge between callback-based APIs and the async world, and it's essential for building reliable event-driven pipelines.

Pattern: Wrapping a callback API
public static Task<byte[]> ReadAllBytesAsync(Stream stream, CancellationToken ct)
{
    var tcs = new TaskCompletionSource<byte[]>(
        TaskCreationOptions.RunContinuationsAsynchronously // ← CRITICAL
    );

    // Register cancellation BEFORE starting the operation
    ct.Register(() => tcs.TrySetCanceled(ct));

    try
    {
        var ms = new MemoryStream();
        stream.BeginRead(buffer, 0, buffer.Length, ar =>
        {
            try
            {
                var bytesRead = stream.EndRead(ar);
                tcs.TrySetResult(ms.ToArray());
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);  // ← never loses the error
            }
        }, null);
    }
    catch (Exception ex)
    {
        tcs.TrySetException(ex);
    }

    return tcs.Task;
}
Critical: RunContinuationsAsynchronously

Always pass TaskCreationOptions.RunContinuationsAsynchronously when creating a TaskCompletionSource. Without it, calling SetResult() synchronously runs all await continuations on the calling thread. In a high-throughput scenario, this means your producer thread gets hijacked to run consumer code, leading to deadlocks and unpredictable behavior. This is the #1 TCS mistake in production systems.

TCS as a One-Shot Signal

Pattern: Async signal between components
public class AsyncSignal
{
    private TaskCompletionSource _tcs = new(
        TaskCreationOptions.RunContinuationsAsynchronously);

    // Waiter side — suspends until signaled
    public Task WaitAsync() => _tcs.Task;

    // Signaler side — wakes up all waiters
    public void Signal() => _tcs.TrySetResult();

    // Resettable version (use with caution — race conditions!)
    public void Reset() =>
        Interlocked.Exchange(ref _tcs,
            new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously));
}

ValueTask<T> — Zero-Allocation Hot Paths

ValueTask<T> is a discriminated union: it holds either a T value directly or a Task<T>. When the result is available synchronously, there's zero heap allocation — the value stays on the stack.

When ValueTask shines
private readonly ConcurrentDictionary<string, byte[]> _cache = new();

public ValueTask<byte[]> GetDataAsync(string key, CancellationToken ct)
{
    // 99% of calls hit the cache — no Task allocated
    if (_cache.TryGetValue(key, out var data))
        return new ValueTask<byte[]>(data);  // ← stack only!

    // Rare: go to network
    return new ValueTask<byte[]>(FetchFromNetworkAsync(key, ct));
}
ValueTask Rules — Break These and You Get Corruption

1) Await it exactly once. Never store a ValueTask and await it twice.
2) Never use .Result or .GetAwaiter().GetResult() unless IsCompletedSuccessfully is true.
3) Never await concurrently. No WhenAll with ValueTasks — convert to Task first via .AsTask().
4) If you need multiple consumers, call .AsTask() once and share the Task.

Exception Handling — Where Errors Actually Go

This is where most developers get burned. An exception in an async method doesn't throw on the calling thread. It gets stored in the Task, and only re-thrown when someone awaits or accesses .Result.

Exception Flow

throw inside async method ↓ Caught by compiler-generated try/catch ↓ Stored in Task.Exception (as AggregateException) ↓ Task status → Faulted ↓ On await: first inner exception is unwrapped and re-thrown On .Result: AggregateException is thrown (with ALL inner exceptions) On .Wait(): same as .Result — AggregateException ⚠ If nobody awaits the task, the exception is SILENTLY SWALLOWED (since .NET 4.5+ — no longer crashes the process)
The Exception Trap — Unawaited Tasks
// THIS SILENTLY SWALLOWS THE EXCEPTION!
void StartWork()
{
    DoWorkAsync(); // ← No await! Exception vanishes into the void.
}

// THIS CATCHES IT
async Task StartWorkAsync()
{
    await DoWorkAsync(); // ← Exception re-thrown here
}

// LAST RESORT: Global observer for unobserved exceptions
TaskScheduler.UnobservedTaskException += (sender, e) =>
{
    // Log the error — this fires on GC, not immediately!
    Log.Error(e.Exception, "Unobserved task exception");
    e.SetObserved(); // prevents crash in .NET Framework
};
Reliability Rule #1: Never Fire-and-Forget Without a Safety Net

An unawaited Task that throws is a silently lost error. In a reliability-focused system, every Task must be awaited, or routed through an explicit fire-and-forget handler (shown later in the Fire-and-Forget section).

Multiple Exceptions (WhenAll)

WhenAll flattens to first exception on await
try
{
    await Task.WhenAll(task1, task2, task3);
}
catch (Exception ex)
{
    // 'ex' is only the FIRST exception!
    // To get ALL exceptions:
}

// Reliable pattern: capture the WhenAll task
var allTask = Task.WhenAll(task1, task2, task3);
try
{
    await allTask;
}
catch
{
    // Now inspect ALL exceptions
    foreach (var ex in allTask.Exception.InnerExceptions)
    {
        Log.Error(ex, "Task failed");
    }
    throw; // re-throw if needed
}

Cancellation Patterns

CancellationToken is cooperative — nothing gets forcefully killed. The code must check the token and decide to stop. This is by design: forced thread aborts corrupt state.

Three ways to respond to cancellation
async Task ProcessBatchAsync(IEnumerable<Item> items, CancellationToken ct)
{
    foreach (var item in items)
    {
        // Option 1: ThrowIfCancellationRequested — throws OperationCanceledException
        ct.ThrowIfCancellationRequested();

        // Option 2: Check and exit gracefully
        if (ct.IsCancellationRequested)
        {
            Log.Warning("Batch processing cancelled at item {Id}", item.Id);
            return; // task completes as RanToCompletion, not Canceled!
        }

        // Option 3: Pass token to async operations (they handle it)
        await ProcessItemAsync(item, ct);
    }
}

// Creating linked tokens for timeouts
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var linkedCts = CancellationTokenSource
    .CreateLinkedTokenSource(ct, timeoutCts.Token);

await ProcessBatchAsync(items, linkedCts.Token);
Cancellation + Reliability

In a reliable system, distinguish between cancellation (expected shutdown) and failure (unexpected error). Catch OperationCanceledException separately from other exceptions. When shutting down a pipeline, cancel producers first, let channels drain, then cancel consumers.

TaskScheduler & ThreadPool Internals

Every await continuation gets scheduled on a TaskScheduler. The default scheduler uses the .NET ThreadPool, which is a work-stealing thread pool. Understanding this tells you why your code performs the way it does.

ThreadPool Work-Stealing Architecture
ThreadPool Architecture (.NET 6+) ┌─────────────────────────────────────────────────────┐ │ Global Queue (ConcurrentQueue<WorkItem>) │ │ └─ Used for Task.Run(), ThreadPool.QueueUserWorkItem │ └────────────────────┬────────────────────────────────┘ │ dequeue ┌────────────┼────────────┐ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Thread 1 │ │ Thread 2 │ │ Thread N │ │ Local Queue │ │ Local Queue │ │ Local Queue │ │ (LIFO push/ │ │ (LIFO push/ │ │ (LIFO push/ │ │ pop by self)│ │ pop by self)│ │ pop by self)│ │ (FIFO steal │ │ (FIFO steal │ │ (FIFO steal │ │ by others) │ │ by others) │ │ by others) │ └──────────────┘ └──────────────┘ └──────────────┘ Work-Stealing: 1. Thread checks its own local queue (LIFO — cache-friendly) 2. If empty, steals from another thread's queue (FIFO — fair) 3. If all empty, dequeues from global queue 4. If still empty, thread sleeps Thread Injection: .NET monitors throughput and adds threads when work is queuing up. Hill-climbing algorithm adjusts thread count to maximize throughput. Too many blocking calls = thread starvation!
The #1 Performance Killer: Sync-over-Async

Calling .Result or .Wait() on an async method blocks a ThreadPool thread. The pool adds new threads slowly (1 per 500ms). In a high-throughput scenario, this causes cascading starvation. Never do sync-over-async on the ThreadPool. If you must, use Task.Run(() => method().Result) to at least isolate the block.

SynchronizationContext

After an await, the continuation tries to run on the captured SynchronizationContext. In WPF, that's the UI thread's dispatcher. In ASP.NET Core, there's no SynchronizationContext — continuations run on any pool thread. For your WPF + service architecture, this is critical:

ConfigureAwait in libraries vs UI code
// In library/service code — ALWAYS use ConfigureAwait(false)
public async Task<Data> FetchDataAsync(CancellationToken ct)
{
    var raw = await _http.GetAsync("/api/data", ct)
        .ConfigureAwait(false); // ← Don't capture SyncContext
    return Parse(raw);
}

// In WPF ViewModel — DON'T use ConfigureAwait(false) for UI updates
public async Task LoadCommand()
{
    var data = await _service.FetchDataAsync(_cts.Token);
    // Back on UI thread — safe to update bindings
    Items = new ObservableCollection<Data>(data);
}

Fire-and-Forget Done Right

Sometimes you genuinely need to start a Task and not await it. The key: never let exceptions escape silently.

Safe fire-and-forget extension
public static class TaskExtensions
{
    /// <summary>
    /// Safely fires a task without awaiting.
    /// Guarantees: exceptions are logged, never lost.
    /// </summary>
    public static void SafeFireAndForget(
        this Task task,
        ILogger logger,
        [CallerMemberName] string caller = "")
    {
        task.ContinueWith(t =>
        {
            if (t.IsFaulted)
                logger.LogError(t.Exception, 
                    "Fire-and-forget task faulted in {Caller}", caller);
            else if (t.IsCanceled)
                logger.LogWarning(
                    "Fire-and-forget task canceled in {Caller}", caller);
        }, TaskContinuationOptions.NotOnRanToCompletion);
    }
}

// Usage
PublishEventAsync(evt, ct).SafeFireAndForget(_logger);
Reliability Pattern

In your service pack tool and agent architectures, use SafeFireAndForget only for truly optional work (telemetry, cache warming). For business-critical operations, always await or route through a Channel for guaranteed processing.


Part 2 — Channels: The Modern Producer-Consumer

System.Threading.Channels is .NET's answer to Go channels. It replaces BlockingCollection<T> with a fully async, high-performance, bounded/unbounded message passing primitive. Think of it as a pipe: producers write to one end, consumers read from the other, and the pipe handles all the synchronization.

Why Channels Over BlockingCollection

FeatureBlockingCollection<T>Channel<T>
Async support❌ Blocks threads✅ Full async/await
BackpressureBlocking onlyAsync wait, drop, or error
PerformanceLock-basedLock-free (single reader/writer)
CompletionCompleteAdding()Complete() + error propagation
IAsyncEnumerableReadAllAsync()

Channel Anatomy — What's Inside

Channel<T> Internal Structure ChannelWriter<T> ChannelReader<T> ┌─────────────────┐ ┌──────────────────┐ │ TryWrite(item) │ │ TryRead(out item)│ │ WriteAsync(item)│──→ ┌────────┐ ──→│ ReadAsync() │ │ WaitToWriteAsync│ │ Buffer │ │ WaitToReadAsync()│ │ TryComplete() │ └────────┘ │ ReadAllAsync() │ │ Complete(ex?) │ │ Completion (Task)│ └─────────────────┘ └──────────────────┘ Key design: Writer and Reader are separate objects. You can hand out only the writer to producers and only the reader to consumers. This enforces the pattern at the type system level.
Creating Channels
// Unbounded — grows forever, be careful
var unbounded = Channel.CreateUnbounded<Event>(new UnboundedChannelOptions
{
    SingleReader = true,   // enables lock-free fast path
    SingleWriter = false,  // multiple producers
});

// Bounded — backpressure built in
var bounded = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000)
{
    SingleReader = true,
    SingleWriter = false,
    FullMode = BoundedChannelFullMode.Wait,  // ← backpressure strategy
});

Bounded vs Unbounded — The Reliability Trade-Off

Rule: In reliability-focused systems, ALWAYS use Bounded channels

An unbounded channel is an OOM exception waiting to happen. If your producer is faster than your consumer (and in high-throughput systems, it will be during spikes), the buffer grows without limit. Bounded channels force you to make an explicit decision about what happens when the pipe is full.

BoundedChannelFullMode — Your Four Options

ModeBehaviorEvents Lost?Use Case
Wait WriteAsync suspends until space ❌ Never Reliable pipelines — your default choice
DropOldest Removes oldest item, writes new ✅ Old items Latest-value-wins (sensor data, UI updates)
DropNewest Drops the item being written ✅ New items Rate limiting (discard excess)
DropWrite Same as DropNewest ✅ New items Alias for DropNewest
Detecting dropped messages (DropOldest/DropNewest)
// TryWrite returns false when the channel is full in drop modes
// BUT WriteAsync always succeeds (it drops silently!)
// For monitoring, use TryWrite and track failures:

if (!channel.Writer.TryWrite(item))
{
    Interlocked.Increment(ref _droppedCount);
    _logger.LogWarning("Channel full, dropped event {Id}", item.Id);
}

Backpressure — The Art of Slowing Down Gracefully

Backpressure means: when the consumer can't keep up, the producer slows down rather than the system failing. With BoundedChannelFullMode.Wait, the producer's WriteAsync suspends (yields the thread!) until space is available. No threads are blocked. No events are lost.

Backpressure in action
var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait
});

// Producer — will slow down when buffer hits 100
async Task ProduceAsync(ChannelWriter<WorkItem> writer, CancellationToken ct)
{
    try
    {
        await foreach (var item in GetItemsAsync(ct))
        {
            // This will asynchronously wait if the channel is full
            // The thread is NOT blocked — it yields back to the pool
            await writer.WriteAsync(item, ct);
        }
    }
    finally
    {
        writer.Complete(); // ← Signal: no more items coming
    }
}

// Consumer — reads at its own pace
async Task ConsumeAsync(ChannelReader<WorkItem> reader, CancellationToken ct)
{
    await foreach (var item in reader.ReadAllAsync(ct))
    {
        await ProcessAsync(item, ct);
    }
    // ReadAllAsync completes when writer calls Complete()
}

Producer-Consumer Patterns

Pattern: Single Producer → Multiple Consumers (Fan-Out)

Multiple consumers competing for work
async Task RunPipelineAsync(CancellationToken ct)
{
    var channel = Channel.CreateBounded<WorkItem>(500);

    // 1 producer
    var producer = ProduceAsync(channel.Writer, ct);

    // N consumers (all reading from same reader = competing consumers)
    var consumers = Enumerable.Range(0, Environment.ProcessorCount)
        .Select(_ => ConsumeAsync(channel.Reader, ct))
        .ToArray();

    // Wait for everything
    await producer;  // completes the writer
    await Task.WhenAll(consumers); // drain remaining items
}

Pattern: Multiple Producers → Single Consumer (Fan-In)

Merging multiple event sources
var channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(1000)
{
    SingleReader = true,
    SingleWriter = false,  // ← multiple writers
});

// Multiple sources write to the same channel
var producer1 = WatchFileSystemAsync(channel.Writer, ct);
var producer2 = WatchNetworkAsync(channel.Writer, ct);
var producer3 = WatchTimerAsync(channel.Writer, ct);

// Single consumer processes all events in order
var consumer = ProcessEventsAsync(channel.Reader, ct);

// When ALL producers are done, complete the writer
await Task.WhenAll(producer1, producer2, producer3);
channel.Writer.Complete();
await consumer;

Pattern: Pipeline (Channel Chaining)

Multi-stage processing pipeline
// Stage 1: Parse raw data
var rawChannel = Channel.CreateBounded<byte[]>(1000);
var parsedChannel = Channel.CreateBounded<ParsedEvent>(500);
var enrichedChannel = Channel.CreateBounded<EnrichedEvent>(200);

// Each stage reads from one channel and writes to the next
var parse = TransformAsync(rawChannel.Reader, parsedChannel.Writer,
    ParseEvent, ct);
var enrich = TransformAsync(parsedChannel.Reader, enrichedChannel.Writer,
    EnrichEvent, ct);
var store = ConsumeAsync(enrichedChannel.Reader,
    StoreEvent, ct);

// Generic transform stage
async Task TransformAsync<TIn, TOut>(
    ChannelReader<TIn> reader,
    ChannelWriter<TOut> writer,
    Func<TIn, TOut> transform,
    CancellationToken ct)
{
    try
    {
        await foreach (var item in reader.ReadAllAsync(ct))
        {
            var result = transform(item);
            await writer.WriteAsync(result, ct);
        }
    }
    finally
    {
        writer.Complete();
    }
}

Completion & Error Propagation

Channel completion is a one-way gate: once you call Complete(), no more writes are allowed. The reader sees completion when the buffer drains. You can also pass an exception to signal a fatal error.

Error propagation through channels
// Producer encounters a fatal error
try
{
    await foreach (var item in source)
        await writer.WriteAsync(item, ct);
    
    writer.Complete(); // normal completion
}
catch (Exception ex)
{
    writer.Complete(ex); // ← error propagation!
}

// Consumer side — the error surfaces here
try
{
    await foreach (var item in reader.ReadAllAsync(ct))
        await ProcessAsync(item, ct);
}
catch (ChannelClosedException ex)
{
    // The inner exception is what the producer passed to Complete()
    _logger.LogError(ex.InnerException, "Pipeline upstream failed");
}

// You can also observe completion as a Task
await reader.Completion; // throws if Complete(exception) was called

Performance Internals

How Channels achieve lock-free performance

When you set SingleReader = true or SingleWriter = true, the Channel implementation switches to a specialized internal class that uses Interlocked operations instead of locks. Here's what happens under the hood:

BoundedChannel<T> Internals SingleWriter=false, SingleReader=false (default): └─ Uses lock on every read/write └─ Deque<T> as ring buffer └─ Deque<AsyncOperation<T>> for blocked readers └─ Deque<AsyncOperation<VoidResult>> for blocked writers SingleWriter=true, SingleReader=true: └─ Lock-free fast path └─ Uses ConcurrentQueue<T> internally └─ AsyncOperation pooling to reduce allocations └─ Significantly less contention The SingleReader/SingleWriter flags are TRUST-BASED. .NET does NOT enforce them at runtime. If you lie, you get data corruption, not exceptions.
ScenarioThroughput (approx)
Unbounded, Single R/W~50M items/sec
Unbounded, Multi R/W~15M items/sec
Bounded (1000), Single R/W~30M items/sec
Bounded (1000), Multi R/W~8M items/sec
BlockingCollection (for comparison)~2M items/sec
Practical Impact

At thousands of events per second, you're nowhere near Channel limits. The real bottleneck will be your processing logic. But the lock-free path matters when you have many channels in a system — less contention means more predictable latency.


Part 3 — IObservable<T>: The Observer Contract

IObservable<T> and IObserver<T> are built into the BCL (no NuGet needed). They define a push-based notification pattern with a strict contract. Think of it as the inverse of IEnumerable<T>: instead of you pulling items, items are pushed to you.

The two interfaces — that's it
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

public interface IObserver<in T>
{
    void OnNext(T value);       // here's a new item
    void OnError(Exception error); // something broke
    void OnCompleted();         // no more items
}

The Observable Contract (You MUST Follow This)

The Grammar: OnNext* (OnError | OnCompleted)?

1) Zero or more OnNext calls, followed by at most one OnError or OnCompleted.
2) After OnError or OnCompleted, no more calls to the observer. Ever.
3) Calls must be serialized — no concurrent OnNext calls from different threads.
4) Subscribe returns an IDisposable — disposing it unsubscribes.
Breaking these rules leads to undefined behavior. There is no runtime enforcement.

Observable Sequence Grammar OnNext(1)OnNext(2)OnNext(3)OnCompleted() ✅ Valid OnNext(1)OnError(ex) ✅ Valid OnCompleted() ✅ Valid (empty) OnNext(1)OnCompleted()OnNext(2) ❌ VIOLATION OnNext(1) from Thread A ║ OnNext(2) from Thread B ❌ VIOLATION

Building Observables From Scratch (No Library)

Without System.Reactive, you build everything yourself. This is actually great for understanding what Rx does — and for keeping your deployment footprint small in air-gapped environments.

Base observable implementation
public sealed class EventSource<T> : IObservable<T>, IDisposable
{
    private readonly ConcurrentDictionary<Guid, IObserver<T>> _observers = new();
    private volatile bool _completed;
    private Exception? _error;

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (_completed)
        {
            // Late subscriber sees terminal event immediately
            if (_error != null)
                observer.OnError(_error);
            else
                observer.OnCompleted();
            return Disposable.Empty;
        }

        var id = Guid.NewGuid();
        _observers.TryAdd(id, observer);
        return new Unsubscriber(() => _observers.TryRemove(id, out _));
    }

    public void Publish(T value)
    {
        if (_completed) return;

        foreach (var observer in _observers.Values)
        {
            try
            {
                observer.OnNext(value);
            }
            catch (Exception ex)
            {
                // Observer threw — remove it, don't kill the source
                Log.Error(ex, "Observer faulted, removing");
            }
        }
    }

    public void Complete()
    {
        _completed = true;
        foreach (var observer in _observers.Values)
            observer.OnCompleted();
        _observers.Clear();
    }

    public void Error(Exception ex)
    {
        _error = ex;
        _completed = true;
        foreach (var observer in _observers.Values)
            observer.OnError(ex);
        _observers.Clear();
    }

    public void Dispose() => Complete();
}

// Helpers
internal sealed class Unsubscriber : IDisposable
{
    private Action? _unsubscribe;
    public Unsubscriber(Action unsubscribe) => _unsubscribe = unsubscribe;
    public void Dispose() => Interlocked.Exchange(ref _unsubscribe, null)?.Invoke();
}

internal static class Disposable
{
    public static readonly IDisposable Empty = new EmptyDisposable();
    private sealed class EmptyDisposable : IDisposable { public void Dispose() { } }
}

Custom Operators — Building a Mini-Rx

Without the System.Reactive library, you build operators as extension methods that return new observables. Each operator subscribes to the source and transforms notifications.

Where (filter) operator
public static IObservable<T> Where<T>(
    this IObservable<T> source,
    Func<T, bool> predicate)
{
    return new FilterObservable<T>(source, predicate);
}

internal sealed class FilterObservable<T> : IObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly Func<T, bool> _predicate;

    public FilterObservable(IObservable<T> source, Func<T, bool> predicate)
    {
        _source = source;
        _predicate = predicate;
    }

    public IDisposable Subscribe(IObserver<T> observer) =>
        _source.Subscribe(new FilterObserver(observer, _predicate));

    private sealed class FilterObserver : IObserver<T>
    {
        private readonly IObserver<T> _downstream;
        private readonly Func<T, bool> _predicate;

        public FilterObserver(IObserver<T> downstream, Func<T, bool> predicate)
        {
            _downstream = downstream;
            _predicate = predicate;
        }

        public void OnNext(T value)
        {
            try
            {
                if (_predicate(value))
                    _downstream.OnNext(value);
            }
            catch (Exception ex)
            {
                _downstream.OnError(ex); // predicate threw → error
            }
        }

        public void OnError(Exception error) => _downstream.OnError(error);
        public void OnCompleted() => _downstream.OnCompleted();
    }
}
Select (map) operator
public static IObservable<TOut> Select<TIn, TOut>(
    this IObservable<TIn> source,
    Func<TIn, TOut> selector)
{
    return new SelectObservable<TIn, TOut>(source, selector);
}

// Implementation follows same pattern as FilterObservable
// — wrap source, intercept OnNext, transform value
Buffer operator — batches items by count
public static IObservable<IReadOnlyList<T>> Buffer<T>(
    this IObservable<T> source, int count)
{
    return new BufferObservable<T>(source, count);
}

internal sealed class BufferObservable<T> : IObservable<IReadOnlyList<T>>
{
    private readonly IObservable<T> _source;
    private readonly int _count;

    public BufferObservable(IObservable<T> source, int count)
    {
        _source = source;
        _count = count;
    }

    public IDisposable Subscribe(IObserver<IReadOnlyList<T>> observer) =>
        _source.Subscribe(new BufferObserver(observer, _count));

    private sealed class BufferObserver : IObserver<T>
    {
        private readonly IObserver<IReadOnlyList<T>> _downstream;
        private readonly int _count;
        private List<T> _buffer;

        public BufferObserver(IObserver<IReadOnlyList<T>> downstream, int count)
        {
            _downstream = downstream;
            _count = count;
            _buffer = new List<T>(count);
        }

        public void OnNext(T value)
        {
            _buffer.Add(value);
            if (_buffer.Count >= _count)
            {
                _downstream.OnNext(_buffer);
                _buffer = new List<T>(_count);
            }
        }

        public void OnCompleted()
        {
            // Flush remaining items
            if (_buffer.Count > 0)
                _downstream.OnNext(_buffer);
            _downstream.OnCompleted();
        }

        public void OnError(Exception error) => _downstream.OnError(error);
    }
}
Throttle operator — rate limits by time window
public static IObservable<T> Throttle<T>(
    this IObservable<T> source, TimeSpan window)
{
    return new ThrottleObservable<T>(source, window);
}

internal sealed class ThrottleObservable<T> : IObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly TimeSpan _window;

    public ThrottleObservable(IObservable<T> source, TimeSpan window)
    {
        _source = source;
        _window = window;
    }

    public IDisposable Subscribe(IObserver<T> observer) =>
        _source.Subscribe(new ThrottleObserver(observer, _window));

    private sealed class ThrottleObserver : IObserver<T>
    {
        private readonly IObserver<T> _downstream;
        private readonly long _windowTicks;
        private long _lastEmitTimestamp;

        public ThrottleObserver(IObserver<T> downstream, TimeSpan window)
        {
            _downstream = downstream;
            _windowTicks = window.Ticks;
        }

        public void OnNext(T value)
        {
            var now = Stopwatch.GetTimestamp();
            var last = Interlocked.Read(ref _lastEmitTimestamp);
            var elapsed = Stopwatch.GetElapsedTime(last, now);

            if (elapsed.Ticks >= _windowTicks)
            {
                Interlocked.Exchange(ref _lastEmitTimestamp, now);
                _downstream.OnNext(value);
            }
            // else: swallowed (throttled)
        }

        public void OnError(Exception error) => _downstream.OnError(error);
        public void OnCompleted() => _downstream.OnCompleted();
    }
}

Thread Safety — Serializing Observer Calls

The Observable contract requires serialized OnNext calls. If your source fires from multiple threads, you must serialize. Here's a gate that enforces this:

Serializing gate for thread-safe observation
public sealed class SerializedObserver<T> : IObserver<T>
{
    private readonly IObserver<T> _inner;
    private readonly object _gate = new();
    private bool _done;

    public SerializedObserver(IObserver<T> inner) => _inner = inner;

    public void OnNext(T value)
    {
        lock (_gate)
        {
            if (!_done) _inner.OnNext(value);
        }
    }

    public void OnError(Exception error)
    {
        lock (_gate)
        {
            if (_done) return;
            _done = true;
            _inner.OnError(error);
        }
    }

    public void OnCompleted()
    {
        lock (_gate)
        {
            if (_done) return;
            _done = true;
            _inner.OnCompleted();
        }
    }
}

Hot vs Cold Observables

PropertyColdHot
Data producedPer subscriber (lazy)Regardless of subscribers
Subscribe timingGets all items from startGets items from now on
ExampleFile read, HTTP requestMouse events, sensor data
Missed events?NeverYes, if subscribed late
Cold observable — each subscriber gets its own sequence
public class FileLineObservable : IObservable<string>
{
    private readonly string _path;

    public FileLineObservable(string path) => _path = path;

    public IDisposable Subscribe(IObserver<string> observer)
    {
        var cts = new CancellationTokenSource();

        // Each subscriber triggers a new read
        Task.Run(async () =>
        {
            try
            {
                await foreach (var line in File.ReadLinesAsync(_path, cts.Token))
                    observer.OnNext(line);
                observer.OnCompleted();
            }
            catch (OperationCanceledException) { /* unsubscribed */ }
            catch (Exception ex) { observer.OnError(ex); }
        }, cts.Token);

        return new Unsubscriber(() => cts.Cancel());
    }
}

Error Handling in Observables

The Big Problem: OnError Terminates the Subscription

Once OnError is called, the sequence is dead. No more OnNext calls. In a high-throughput system where you want to keep processing despite individual item failures, you need an error isolation pattern.

Error isolation — don't let one bad item kill the stream
public static IObservable<T> CatchAndContinue<T>(
    this IObservable<T> source,
    Action<Exception> onError)
{
    return new CatchContinueObservable<T>(source, onError);
}

// This wraps each OnNext in a try-catch so the observer's
// exception doesn't propagate back to the source
internal sealed class SafeObserver<T> : IObserver<T>
{
    private readonly Action<T> _onNext;
    private readonly Action<Exception> _onError;
    private readonly Action _onCompleted;

    public SafeObserver(
        Action<T> onNext,
        Action<Exception>? onError = null,
        Action? onCompleted = null)
    {
        _onNext = onNext;
        _onError = onError ?? (ex => { });
        _onCompleted = onCompleted ?? (() => { });
    }

    public void OnNext(T value)
    {
        try { _onNext(value); }
        catch (Exception ex) { _onError(ex); }
    }

    public void OnError(Exception error) => _onError(error);
    public void OnCompleted() => _onCompleted();
}

Part 4 — Reliability: Never Lose an Event

This is the core of your question. Let's build up from first principles. There are exactly three places where an event can be lost in a .NET async pipeline:

Where events get lost: 1. Unawaited Task throws → exception swallowed, event processing abandoned 2. Unbounded queue overflow → OOM crash, all in-flight events lost 3. Observer OnError terminates subscription → all future events lost 4. Unhandled exception in consumer → consumer dies, channel backs up, producers block 5. Application crash with items in memory → all in-flight events lost The defense layers: Layer 1: Exception Walls — isolate failures, never kill the loop Layer 2: Bounded Channels — backpressure, not overflow Layer 3: Dead Letter Queues — failed items go somewhere, not nowhere Layer 4: Graceful Shutdown — drain queues before exit Layer 5: Persistence — for true durability (out of scope for in-memory)

Exception Walls — Isolating Failures

An "exception wall" is a try-catch around each unit of work that never lets the processing loop die. The loop itself is sacred — individual items can fail, but the consumer keeps consuming.

The Indestructible Consumer Loop
async Task ConsumeForeverAsync(
    ChannelReader<Event> reader,
    ChannelWriter<Event> deadLetterWriter,
    ILogger logger,
    CancellationToken ct)
{
    await foreach (var evt in reader.ReadAllAsync(ct))
    {
        try
        {
            await ProcessEventAsync(evt, ct);
        }
        catch (OperationCanceledException) when (ct.IsCancellationRequested)
        {
            // Shutdown requested — re-queue unprocessed item
            deadLetterWriter.TryWrite(evt);
            break;
        }
        catch (Exception ex)
        {
            // EXCEPTION WALL: log, dead-letter, continue
            logger.LogError(ex, "Failed to process event {Id}", evt.Id);

            evt.FailCount++;
            evt.LastError = ex.Message;
            deadLetterWriter.TryWrite(evt); // ← never lost
        }
    }
}
Pattern: The Processing Contract

Every event that enters the system has exactly one of three outcomes:
1) Successfully processed
2) Sent to dead letter queue for retry/investigation
3) Explicitly dropped with a log entry explaining why
There is no fourth option. No event silently disappears.

Guaranteed Processing Pattern

Acknowledge-based processing — know when an event is handled
public readonly struct Envelope<T>
{
    public T Payload { get; }
    public TaskCompletionSource Acknowledgment { get; }
    public CancellationToken CancellationToken { get; }

    public Envelope(T payload, CancellationToken ct)
    {
        Payload = payload;
        Acknowledgment = new TaskCompletionSource(
            TaskCreationOptions.RunContinuationsAsynchronously);
        CancellationToken = ct;
    }

    public void Ack() => Acknowledgment.TrySetResult();
    public void Nack(Exception ex) => Acknowledgment.TrySetException(ex);
}

// Producer: sends and WAITS for acknowledgment
async Task SendReliableAsync<T>(
    ChannelWriter<Envelope<T>> writer,
    T payload,
    CancellationToken ct)
{
    var envelope = new Envelope<T>(payload, ct);
    await writer.WriteAsync(envelope, ct);

    // Now wait until the consumer acknowledges
    await envelope.Acknowledgment.Task;
    // If we get here, the event was successfully processed!
}

// Consumer: processes and acknowledges
async Task ConsumeReliableAsync<T>(
    ChannelReader<Envelope<T>> reader,
    Func<T, CancellationToken, Task> handler,
    CancellationToken ct)
{
    await foreach (var envelope in reader.ReadAllAsync(ct))
    {
        try
        {
            await handler(envelope.Payload, envelope.CancellationToken);
            envelope.Ack(); // ← producer gets unblocked
        }
        catch (Exception ex)
        {
            envelope.Nack(ex); // ← producer gets the exception
        }
    }
}

Dead Letter Queues

DLQ with retry capability
public sealed class DeadLetterQueue<T> : IDisposable
{
    private readonly Channel<FailedEvent<T>> _channel;
    private readonly ILogger _logger;
    private readonly int _maxRetries;

    public DeadLetterQueue(ILogger logger, int maxRetries = 3, int capacity = 10_000)
    {
        _logger = logger;
        _maxRetries = maxRetries;
        _channel = Channel.CreateBounded<FailedEvent<T>>(
            new BoundedChannelOptions(capacity)
            {
                FullMode = BoundedChannelFullMode.DropOldest // sacrifice oldest failures
            });
    }

    public void Enqueue(T item, Exception ex, int attempt)
    {
        var failed = new FailedEvent<T>(item, ex, attempt, DateTime.UtcNow);
        if (!_channel.Writer.TryWrite(failed))
            _logger.LogError("DLQ full, dropping failed event");
    }

    public async Task RetryAsync(
        Func<T, CancellationToken, Task> handler,
        ChannelWriter<T>? requeue,
        CancellationToken ct)
    {
        await foreach (var failed in _channel.Reader.ReadAllAsync(ct))
        {
            if (failed.Attempt >= _maxRetries)
            {
                _logger.LogError(failed.Error,
                    "Permanently failed after {Attempts} attempts",
                    failed.Attempt);
                continue; // truly dead — log and move on
            }

            // Exponential backoff
            var delay = TimeSpan.FromSeconds(Math.Pow(2, failed.Attempt));
            await Task.Delay(delay, ct);

            if (requeue != null)
            {
                // Re-inject into the main pipeline
                await requeue.WriteAsync(failed.Item, ct);
            }
        }
    }

    public void Dispose() => _channel.Writer.TryComplete();
}

public record FailedEvent<T>(T Item, Exception Error, int Attempt, DateTime FailedAt);

Retry & Circuit Breaker (No External Libraries)

Simple retry with exponential backoff
public static async Task<T> RetryAsync<T>(
    Func<CancellationToken, Task<T>> operation,
    int maxRetries,
    CancellationToken ct,
    ILogger? logger = null)
{
    for (int attempt = 0; ; attempt++)
    {
        try
        {
            return await operation(ct);
        }
        catch (Exception ex) when (attempt < maxRetries && !ct.IsCancellationRequested)
        {
            var delay = TimeSpan.FromMilliseconds(
                Math.Pow(2, attempt) * 100 + Random.Shared.Next(50)); // jitter
            logger?.LogWarning(ex,
                "Attempt {Attempt} failed, retrying in {Delay}ms",
                attempt + 1, delay.TotalMilliseconds);
            await Task.Delay(delay, ct);
        }
    }
}
Lightweight circuit breaker
public sealed class CircuitBreaker
{
    private readonly int _failureThreshold;
    private readonly TimeSpan _resetTimeout;
    private int _failureCount;
    private DateTime _lastFailure;
    private volatile bool _isOpen;

    public CircuitBreaker(int failureThreshold = 5, int resetSeconds = 30)
    {
        _failureThreshold = failureThreshold;
        _resetTimeout = TimeSpan.FromSeconds(resetSeconds);
    }

    public bool IsOpen => _isOpen &&
        (DateTime.UtcNow - _lastFailure) < _resetTimeout;

    public async Task<T> ExecuteAsync<T>(
        Func<CancellationToken, Task<T>> operation,
        CancellationToken ct)
    {
        if (IsOpen)
            throw new CircuitBreakerOpenException(
                "Circuit open, try again later");

        try
        {
            var result = await operation(ct);
            Interlocked.Exchange(ref _failureCount, 0); // reset on success
            _isOpen = false;
            return result;
        }
        catch
        {
            _lastFailure = DateTime.UtcNow;
            if (Interlocked.Increment(ref _failureCount) >= _failureThreshold)
                _isOpen = true;
            throw;
        }
    }
}

public class CircuitBreakerOpenException : Exception
{
    public CircuitBreakerOpenException(string msg) : base(msg) { }
}

Graceful Shutdown

In a reliable system, shutting down means: stop accepting new events → drain all queues → complete in-flight processing → exit.

Hosted service with graceful shutdown
public sealed class EventProcessingService : BackgroundService
{
    private readonly Channel<Event> _channel;
    private readonly ILogger<EventProcessingService> _logger;

    public EventProcessingService(ILogger<EventProcessingService> logger)
    {
        _logger = logger;
        _channel = Channel.CreateBounded<Event>(new BoundedChannelOptions(5000)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = true,
        });
    }

    // Public API for producers
    public ChannelWriter<Event> Writer => _channel.Writer;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Event processor starting");

        try
        {
            await foreach (var evt in _channel.Reader.ReadAllAsync(stoppingToken))
            {
                try
                {
                    await ProcessAsync(evt, stoppingToken);
                }
                catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
                {
                    // Shutdown requested mid-processing
                    _logger.LogWarning("Shutdown during processing of {Id}", evt.Id);
                    break;
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed {Id}", evt.Id);
                }
            }
        }
        catch (OperationCanceledException)
        {
            // ReadAllAsync was canceled — drain remaining items
            _logger.LogInformation("Draining remaining events...");

            while (_channel.Reader.TryRead(out var evt))
            {
                try
                {
                    // Process with a short timeout
                    using var drainCts = new CancellationTokenSource(
                        TimeSpan.FromSeconds(5));
                    await ProcessAsync(evt, drainCts.Token);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Failed during drain: {Id}", evt.Id);
                }
            }
        }

        _logger.LogInformation("Event processor stopped");
    }

    public override async Task StopAsync(CancellationToken ct)
    {
        // Signal no more writes
        _channel.Writer.TryComplete();
        await base.StopAsync(ct);
    }
}

Part 5 — The Full Pipeline: Observable → Channel → Task

Here's where everything comes together. The pattern is: Observable produces events → Channel provides backpressure → Task consumers process reliably.

The Integrated Architecture ┌───────────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ IObservable<T> │ │ Channel<T> │ │ Task Consumers │ │ │ │ │ │ │ │ • Hot event source │────▸│ • Bounded │────▸│ • Exception │ │ • Thousands/sec │ │ • Backpressure │ │ walls │ │ • No backpressure │ │ • Buffer spikes │ │ • DLQ on fail │ │ • Fire and forget │ │ • Drain on exit │ │ • Ack/Nack │ └───────────────────────┘ └──────────────────┘ └─────────────────┘ │ │ │ │ Bridge: Observer writesReadAllAsync loop │ │ to ChannelWriter │ │ ▼ ▼ ▼ No events lost Spikes absorbed Failures isolated (or explicitly (bounded buffer) (dead letter queue) dropped with log)
The Bridge: Observable → Channel
public sealed class ObservableToChannelBridge<T> : IObserver<T>, IDisposable
{
    private readonly ChannelWriter<T> _writer;
    private readonly ILogger _logger;
    private readonly IDisposable _subscription;
    private long _received;
    private long _dropped;

    public ObservableToChannelBridge(
        IObservable<T> source,
        ChannelWriter<T> writer,
        ILogger logger)
    {
        _writer = writer;
        _logger = logger;
        _subscription = source.Subscribe(this);
    }

    public void OnNext(T value)
    {
        Interlocked.Increment(ref _received);

        // TryWrite is non-blocking — critical for high-frequency sources!
        // If the channel is full, we log and count the drop.
        if (!_writer.TryWrite(value))
        {
            var dropped = Interlocked.Increment(ref _dropped);
            if (dropped % 1000 == 0) // don't spam logs
                _logger.LogWarning(
                    "Channel backpressure: {Dropped} events dropped", dropped);
        }
    }

    public void OnError(Exception error)
    {
        _logger.LogError(error, "Observable source faulted");
        _writer.TryComplete(error);
    }

    public void OnCompleted()
    {
        _logger.LogInformation(
            "Observable completed. Received: {Received}, Dropped: {Dropped}",
            _received, _dropped);
        _writer.TryComplete();
    }

    public void Dispose() => _subscription.Dispose();

    // Diagnostics
    public long Received => Interlocked.Read(ref _received);
    public long Dropped => Interlocked.Read(ref _dropped);
}
Why TryWrite, Not WriteAsync?

OnNext is synchronous — you cannot call await inside it. If the channel is full, TryWrite returns false immediately. You have two choices: drop the event (with a log) or use a secondary unbounded buffer as a spillover. For thousands of events per second, dropping with monitoring is the pragmatic choice — trying to buffer everything just delays the OOM.

Fan-Out / Fan-In With Observables and Channels

Observable → Multiple typed channels → Merged consumer
public sealed class EventRouter<T> : IObserver<T>
{
    private readonly Dictionary<Type, Action<T>> _routes = new();
    private readonly ILogger _logger;
    private Action<T>? _defaultRoute;

    public EventRouter(ILogger logger) => _logger = logger;

    public EventRouter<T> Route<TEvent>(
        ChannelWriter<T> writer) where TEvent : T
    {
        _routes[typeof(TEvent)] = item =>
        {
            if (!writer.TryWrite(item))
                _logger.LogWarning("Route for {Type} full", typeof(TEvent).Name);
        };
        return this;
    }

    public EventRouter<T> Default(ChannelWriter<T> writer)
    {
        _defaultRoute = item => writer.TryWrite(item);
        return this;
    }

    public void OnNext(T value)
    {
        if (_routes.TryGetValue(value!.GetType(), out var route))
            route(value);
        else
            _defaultRoute?.Invoke(value);
    }

    public void OnError(Exception error) =>
        _logger.LogError(error, "Source faulted");
    public void OnCompleted() =>
        _logger.LogInformation("Source completed");
}

High-Frequency Scenarios — Thousands of Events/Second

When your observable fires thousands of times per second, you need batching and throttling between the observable and the channel. Processing items one-by-one at that rate creates too much async overhead.

Time-windowed batch bridge
public sealed class BatchingBridge<T> : IObserver<T>, IDisposable
{
    private readonly ChannelWriter<IReadOnlyList<T>> _writer;
    private readonly ILogger _logger;
    private readonly IDisposable _subscription;
    private readonly Timer _flushTimer;
    private readonly int _maxBatchSize;
    private readonly object _lock = new();
    private List<T> _buffer;

    public BatchingBridge(
        IObservable<T> source,
        ChannelWriter<IReadOnlyList<T>> writer,
        ILogger logger,
        int maxBatchSize = 100,
        TimeSpan? flushInterval = null)
    {
        _writer = writer;
        _logger = logger;
        _maxBatchSize = maxBatchSize;
        _buffer = new List<T>(maxBatchSize);

        var interval = flushInterval ?? TimeSpan.FromMilliseconds(100);
        _flushTimer = new Timer(_ => Flush(), null, interval, interval);
        _subscription = source.Subscribe(this);
    }

    public void OnNext(T value)
    {
        List<T>? batchToSend = null;

        lock (_lock)
        {
            _buffer.Add(value);
            if (_buffer.Count >= _maxBatchSize)
            {
                batchToSend = _buffer;
                _buffer = new List<T>(_maxBatchSize);
            }
        }

        if (batchToSend != null)
            SendBatch(batchToSend);
    }

    private void Flush()
    {
        List<T>? batchToSend = null;

        lock (_lock)
        {
            if (_buffer.Count > 0)
            {
                batchToSend = _buffer;
                _buffer = new List<T>(_maxBatchSize);
            }
        }

        if (batchToSend != null)
            SendBatch(batchToSend);
    }

    private void SendBatch(List<T> batch)
    {
        if (!_writer.TryWrite(batch))
            _logger.LogWarning("Batch channel full, dropped {Count} items", batch.Count);
    }

    public void OnError(Exception error)
    {
        Flush(); // flush remaining before error
        _writer.TryComplete(error);
    }

    public void OnCompleted()
    {
        Flush(); // flush remaining
        _writer.TryComplete();
    }

    public void Dispose()
    {
        _subscription.Dispose();
        _flushTimer.Dispose();
        Flush();
    }
}

Real-World Example: High-Frequency Sensor Processing

Imagine a deployment agent receiving status updates from multiple machines — thousands per second. Each update needs to be validated, enriched, and stored. Here's the complete, production-ready pipeline:

Complete pipeline: Observable → Channel → Processing → DLQ
public sealed class SensorPipeline : IDisposable
{
    private readonly Channel<IReadOnlyList<SensorReading>> _ingestChannel;
    private readonly Channel<EnrichedReading> _processChannel;
    private readonly DeadLetterQueue<SensorReading> _dlq;
    private readonly BatchingBridge<SensorReading> _bridge;
    private readonly CancellationTokenSource _cts = new();
    private readonly ILogger _logger;
    private readonly List<Task> _workers = new();

    public SensorPipeline(
        IObservable<SensorReading> sensorSource,
        ILogger<SensorPipeline> logger)
    {
        _logger = logger;

        // Stage 1: Batched ingest (observable → batched channel)
        _ingestChannel = Channel.CreateBounded<IReadOnlyList<SensorReading>>(
            new BoundedChannelOptions(100) // 100 batches × 100 items = 10K buffer
            {
                SingleReader = false, // multiple enrichment workers
                SingleWriter = true,  // only the bridge writes
                FullMode = BoundedChannelFullMode.Wait,
            });

        // Stage 2: Enriched processing channel
        _processChannel = Channel.CreateBounded<EnrichedReading>(
            new BoundedChannelOptions(5000)
            {
                SingleReader = true,
                SingleWriter = false,
                FullMode = BoundedChannelFullMode.Wait,
            });

        // Dead letter queue for failed items
        _dlq = new DeadLetterQueue<SensorReading>(logger);

        // Bridge: Observable → Batched Channel
        _bridge = new BatchingBridge<SensorReading>(
            sensorSource, _ingestChannel.Writer, logger,
            maxBatchSize: 100,
            flushInterval: TimeSpan.FromMilliseconds(50));
    }

    public Task StartAsync()
    {
        var ct = _cts.Token;

        // Enrichment workers (fan-out from ingest channel)
        for (int i = 0; i < Environment.ProcessorCount; i++)
        {
            _workers.Add(Task.Run(() =>
                EnrichWorkerAsync(_ingestChannel.Reader, _processChannel.Writer, ct), ct));
        }

        // Storage worker (single writer for DB consistency)
        _workers.Add(Task.Run(() =>
            StoreWorkerAsync(_processChannel.Reader, ct), ct));

        // DLQ retry worker
        _workers.Add(Task.Run(() =>
            _dlq.RetryAsync(ReprocessAsync, null, ct), ct));

        _logger.LogInformation(
            "Pipeline started with {Workers} enrichment workers",
            Environment.ProcessorCount);

        return Task.CompletedTask;
    }

    private async Task EnrichWorkerAsync(
        ChannelReader<IReadOnlyList<SensorReading>> reader,
        ChannelWriter<EnrichedReading> writer,
        CancellationToken ct)
    {
        await foreach (var batch in reader.ReadAllAsync(ct))
        {
            foreach (var reading in batch)
            {
                try
                {
                    var enriched = Enrich(reading);
                    await writer.WriteAsync(enriched, ct);
                }
                catch (OperationCanceledException) when (ct.IsCancellationRequested)
                {
                    return;
                }
                catch (Exception ex)
                {
                    // Exception wall — item goes to DLQ, loop continues
                    _dlq.Enqueue(reading, ex, 0);
                }
            }
        }
    }

    private async Task StoreWorkerAsync(
        ChannelReader<EnrichedReading> reader,
        CancellationToken ct)
    {
        var batch = new List<EnrichedReading>(200);

        await foreach (var item in reader.ReadAllAsync(ct))
        {
            batch.Add(item);

            // Micro-batch: accumulate up to 200 or until channel is empty
            while (batch.Count < 200 && reader.TryRead(out var more))
                batch.Add(more);

            try
            {
                await BulkStoreAsync(batch, ct);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Bulk store failed for {Count} items", batch.Count);
            }

            batch.Clear();
        }
    }

    public async Task StopAsync()
    {
        _logger.LogInformation("Pipeline shutting down...");

        // 1. Stop the observable bridge (no new events)
        _bridge.Dispose();

        // 2. Complete the ingest channel (let workers drain)
        _ingestChannel.Writer.TryComplete();

        // 3. Wait for enrichment workers to finish
        //    They'll complete the process channel when done
        try
        {
            await Task.WhenAll(_workers)
                .WaitAsync(TimeSpan.FromSeconds(30));
        }
        catch (TimeoutException)
        {
            _logger.LogWarning("Shutdown timed out, canceling workers");
            _cts.Cancel();
        }

        _dlq.Dispose();
        _logger.LogInformation("Pipeline stopped");
    }

    // Placeholder methods
    private EnrichedReading Enrich(SensorReading r) => new(r);
    private Task BulkStoreAsync(List<EnrichedReading> batch, CancellationToken ct) => Task.CompletedTask;
    private Task ReprocessAsync(SensorReading r, CancellationToken ct) => Task.CompletedTask;

    public void Dispose()
    {
        _bridge.Dispose();
        _cts.Dispose();
        _dlq.Dispose();
    }
}

public record SensorReading(string SensorId, double Value, DateTime Timestamp);
public record EnrichedReading(SensorReading Raw);

Production Checklist

Before You Ship — Verify These

Tasks
☐ Every Task is awaited or routed through SafeFireAndForget
TaskScheduler.UnobservedTaskException handler is registered
ConfigureAwait(false) in all library/service code
CancellationToken threaded through every async method
TaskCompletionSource uses RunContinuationsAsynchronously
☐ No sync-over-async (.Result, .Wait()) on ThreadPool threads

Channels
☐ All channels are Bounded with explicit FullMode
SingleReader/SingleWriter set correctly (and truthfully!)
☐ Writer completion is guaranteed (in finally blocks)
☐ Consumers handle ChannelClosedException
☐ Dropped message counter exposed for monitoring

Observables
☐ Observer contract followed: serialized OnNext, terminal is final
IDisposable subscriptions tracked and disposed on shutdown
☐ Exception in observer doesn't kill the source (exception wall)
☐ Multi-threaded sources wrapped with SerializedObserver

Reliability
☐ Every consumer loop has an exception wall (try-catch per item)
☐ Failed items go to a Dead Letter Queue, not nowhere
☐ Graceful shutdown drains all channels before exit
☐ Circuit breaker on external dependencies
☐ Backpressure strategy documented and monitored
☐ Log dropped/failed event counts as metrics