Implement Producer/Consumer patterns using Channel in C#
What is a channel?
Channels in C# are based on the concept of message passing, which means that the producer sends messages to the channel, and the consumer receives them. The messages can be of any type and they are stored in the channel until the consumer is ready to receive them. Channels can also have a bounded capacity, meaning that they can hold a fixed number of messages before blocking the producer until the consumer makes room for new messages.
In this way, channels provide a simple yet powerful way to implement asynchronous communication and coordination between different parts of a program, and they are becoming increasingly popular in C# and other modern programming languages.
Channels are implemented using the System.Threading.Channels namespace, which provides 2 types of Channels, including:
- UnboundedChannel: This type of Channel can hold unlimited messages.
- BoundedChannel: This type of Channel can only hold a limited amount of messages. We can configure the behavior when the queue is full using BoundedChannelOptions.FullMode. Besides, the library also let us configure more properties such as:
- SingleWriter: This doesn't force the runtime to accept only one writer and throw an exception if there is more than one writer to the Channel. If this property is set to true, the channel may be able to optimize some certain operations based on the assumption that there is only one writer.
- SingleReader: Same as SingleWriter but for reading side
- AllowSynchronousContinuations: Indicates that continuation could be invoked synchronously or not. The option is false by default. We should be careful and perform some measurements before and after turning this flag on.
In order to use the Channel. First, we create a Channel instance via Channel type’s static method like below.
Figure 01: Creating a Channel instance
In the preceding example, we created an unbounded channel, it can send messages of type int.
Figure 02: Sending messages using Channel.
Then, we can write some data on the writer part of the channel and read it using the reader part. We can also use channels for synchronization in a multi-threaded context.
Figure 03: using Channel in a multi-threaded context.
The preceding code created 2 threads, the first one reads from the channel and the second one writes to the channel. Only after the value is written into the channel by the writer thread, the reader thread can continue its work. We can think of Channel as a queue in this case, the reader attempts to read the message. At the time of reading, there's no message yet. Once the message arrives from the writer, the reader could continue to work on its task.
UnboundedChannel is a type of channel that provides an unbounded buffer for messages of type T. It is available in the System.Threading.Channels namespace and can be used to create a channel that can hold an unlimited number of messages. An UnboundedChannel can be useful in scenarios where the number of messages being produced is not known in advance, or where the producer and consumer are processing at different speeds. However, it is important to note that an unbounded channel can potentially consume a large amount of memory if the rate of production exceeds the rate of consumption, so it should be used with care.
Figure 04: UnboundedChannel example
The example code above should print out "Received item 0", .... "Received item 100" as the same order messages were written.
Same as UnboundedChannel, BoundedChannel is also a type of channel in C#. The only difference is that it provides a buffer of a fixed size for messages of type T. A bounded channel can be used in scenarios where there is a producer that is generating data at a faster rate than a consumer is processing it, or where a consumer is able to handle a fixed number of items at a time. By specifying a limit on the buffer size, a bounded channel can help to control the amount of memory that is consumed by the channel.
Figure 05: BoundedChannel example
If you notice, the example for BoundedChannel is exactly the same as UnboundedChannel in the previous section. The output is also exactly the same as the UnboundedChannel section.
The same behavior as UnboundedChannel occurred because the consumer is so fast that it could process all the messages as fast as the producer's speed. Let's tweak our code a bit to get some different behaviors. First, we could tweak the BoundedChannelOptions upon creating the channel and decide what happens to our messages when our buffer is full. Instead of using the constructor that takes an int as parameter.
Figure 06: BoundedChannel which takes int as param in constructor
We use the other overload which takes BoundedChannelOptions
Figure 07: Using the other overload for constructor
Then, we also tweak the speed of our consumer to make the queue full
Figure 08: Tweaking the consumer’s consumption speed
With the changes applied, the code should output only "Receive item: 0"..."Received item: 9". The BoundedChannelFullMode.DropWrite configures the channel to drop any new messages when the channel is full. Because the consumer is 10 times slower than the consumer, the queue is still holding 0...9 and being processed by our consumer. All the messages from 10..99 are dropped.
We can also use other options to configure FullMode like DropOldest. Using this option will drop the first oldest message in the queue. Our code now should output only "90...99". We have another slightly different option is DropNewest. The newest message will be dropped when the queue is full and more messages arrive. In this case, the output is 0...8,99.
In the next sections, I will give you some patterns that could be useful for real world scenarios.
Single producer, Single consumer
Figure 09: Single producer single consumer
In this example, we create a producer and a consumer that barely keep up with each other. Just like the example in the previous section, we let them run concurrently and consume all the messages till complete. Once completed, we mark the writer of the channel as completed.
Multiple producer, single consumers
Figure 10: Multi producer single consumer
In this example, we first create 1 producer and 1 consumer. Since the consumer's speed is much faster than the producer's speed, we add another producer to balance things better. In a real world scenario, we could use this pattern to utilize resources and increase throughput because the consumer is faster and could be idle most of the time.
Single producer, Multiple consumer
Figure 11: Single producer multiple consumers
This example demonstrates a quite common scenario, the producer is fast and the consumer is much slower. In such cases, we could add more consumers to balance the differences between the consumer and producer. In our case above, by scaling up to 3 consumers, we could process 3 messages in parallel. Note that when we scale the consumer up to 3 instances, the consumer can't keep up the pace entirely with the producer. Since the channel is unbounded, running this for a long time could overwhelm the consumers.
In conclusion, channels in C# provide a powerful mechanism for implementing concurrent and asynchronous communication between different parts of a program. They allow for safe and efficient sharing of data between threads or asynchronous operations, without the need for complex synchronization mechanisms.
Using channels, we can create flexible and responsive applications that can handle a high degree of concurrency and parallelism, while avoiding common pitfalls such as race conditions or deadlocks.
The source code used in this post can be found here.