A while ago, I was involved in a project that needed to push messages to a Kafka topic. I found that while the .NET code required for that implementation was relatively straight-forward – thanks to the Confluent’s .NET client for Kafka – configuring the server was a nightmare. The IT team at the client site was supposed to get the kafka cluster sorted and dragged the issue for a month or so. And I understood why when I tried to setup a cluster myself – configuring a kafka cluster is not a walk in the park. If only we had a managed, one click solution to implement a event streaming solution, based on kafka protocol… 😀
When Microsoft announced last month Event Hubs support for the Kafka protocol – I thought that a great way to prove that this was really interoperable, was to use part of the original code I wrote and see if I could connect to Event Hubs without any significant changes. And I was pleasantly surprised! The only changes required was some additions to the producer/consumer configuration. This post shows how I managed to get this working, and show one of the main gotchas I found along the way.
Setting Up Kafka Support on Event Hubs
First I needed an Event Hubs namespace with Kafka support enabled – this correlates to a kafka cluster. This can be achieved by simply enabling Kafka support when you create a new Event Hubs namespace. It is that simple to provision!
For a step-by-step guide, you can follow Microsoft Docs documentation.
Once the Event Hub namespace with kafka support is deployed, you will need to create a new event hub – which relates to a kafka topic – and setup an access policy for that event hub (you can always use the RootManageShareAccessKey, but that is not good practice).
As far as I understand this must be done when the namespace is created. So bear that in mind when creating a new event hub namespace.
Creating a New Event Hub Instance
Next I needed a new event hub instance. The simplest way to create a new event hub instance is to navigate to your event hub namespace and click on the (+) Event Hub at the top of the overview blade.
Once the create blade appears, fill in the required information and click create:
A couple of things to take in consideration while creating an event hub:
- Partition count – partition count indicate the number of parallel consumers you can have processing your events. You can find more information about partition here.
- Message retention – define the number of days a message is retained on Event Hubs – messages are not deleted explicitly on event hubs, but expiry after the message retention period.
Partition count is another design consideration as it cannot change after you provision your namespace. So make sure you plan ahead when setting that up.
Setup a Shared Access Policy
Finally I needed a way to connect to the event hubs. For that I needed to create a Shared Access Policy – I decided to create a key for a single event hub instance, so I navigated to that instance and selected Shared access policies.
Within Shared access policies you just need to click on (+) Add
At the Add SAS Policy, choose a name for your policy and what rights you want to give for that policy.
Acquiring the SAS connection strings
Once a new SAS policy is created, you need access to the connection string – this will be used to configure the confluent .NET client.
To capture the SAS connection string, click the SAS policy just created. You will see the following properties blade, which will allow you to copy the SAS policy connection string, by clicking on the copy button besides connection string – primary key or connection string – secondary key.
Setting up the .NET Clinet
Once you get the event hubs namespace configured, makes sure you take note of:
- Namespace endpoint (<mynamespace>.servicebus.windows.net
- SAS policy connection string for either primary or secondary key.
You will also need:
- A .NET Console Application project (4.x or Core).
- The Confluent.Kafka component added to the project (you can just add it via Nuget).
Configuring Confluent’s .NET Client
Confluent’s .NET Client for Apache Kafka is an open source library that allow developers to send (produce) and receive (consume) messages to a event streaming cluster using the Apache Kafka protocol (like Event Hubs).
The library can be found on Nuget for bot .net 4.x and .net standard.
Configuring a Confluent Kafka Producer
Publishing messages to an Event Hub instance using Confluent.Kafka requires very little code:
public static async Task Producer() { try { // <eventhubnamespace>.servicebus.windows.net:9093 string brokerList = ConfigurationManager.AppSettings["eventHubsNamespaceURL"]; // connectionstring - primary or secondary key string password = ConfigurationManager.AppSettings["eventHubsConnStr"]; // <youreventhubinstance> string topicName = ConfigurationManager.AppSettings["eventHubName"]; // a location to a cache of ca certificates string caCertLocation = ConfigurationManager.AppSettings["caCertLocation"]; var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList }, { "security.protocol","SASL_SSL" }, { "sasl.mechanism","PLAIN" }, { "sasl.username", "$ConnectionString"}, { "sasl.password", password }, { "ssl.ca.location",caCertLocation }, { "broker.version.fallback ","0.10.0.0" }, { "api.version.fallback.ms","0" } { "debug", "security,broker,protocol" } }; //instantiates the producer using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) { Console.WriteLine("Initiating Execution"); for (int x = 0; x < 100; x++) { var msg = string.Format("This is a sample message - msg # {0} at {1}", x, DateTime.Now.ToString('yyyMMdd_HHmmSSfff')); // publishes the message to Event Hubs var deliveryReport = await producer.ProduceAsync(topicName, null, msg); Console.WriteLine(string.Format("Message {0} sent.", x)); } } } catch (Exception e) { Console.WriteLine(string.Format("Exception Ocurred - {0}", e.Message)); } }
The config variable does all the magic here, defining how the producer will connect to a kafka server (or in our case an event hubs instance using the kafka protocol). The kafka protocol available for event hubs uses SASL(Simple Authentication and Security Layer) over SSL (SASL_SSL) as the security protocol, using plain username and password as the authentication method. For this configuration to work, the following configuration items have to be properly defined:
- bootstrap.server – this reprsents a list of servers in the cluster that you want to connect to. In our case this is a single endpoint URL using the following format: <eventhubsnamespace>.servicebus.windows.net:9093 – where <eventhubsnamespace> is the name of your namespace (e.g. eventhubskafka). Another thing to notice here is that it requires connectivity via port 9093 – this port indicates that event hubs will connect using SSL – normal kafka servers usualy also provide a “clear text” connection (defaulted to 9092), but that is not available for event hubs for security reasons.
- sasl.username – this represents the username that should be authenticated before the connection is secured. Event hubs require this value to be set to $ConnectionString so it understands that will be using SAS policy connection string to secure the session.
- sasl.password – this represents the password associated to that username. In this case the password is actual a SAS policy connection string, which usually have the following format: Endpoint=sb://<eventhubsnamespace>.servicebus.windows.net/;SharedAccessKeyName=<policyname>;SharedAccessKey=<sharedaccesskey>;EntityPath=<eventhubname>. This is the connection string for the SAS Policy defined earlier.
- ssl.ca.location – this represents the location of the ca certificate associated to the endpoint that you are connected to. This is required because Confluent.Kafka can’t access the windows certificate store to verify that the endpoint comes from a verified source. This tripped me over for a while, until I found that I could download a “cache” of CA certificates from the cURL Mozilla project (this was a tip from the Confuent.Kafka maintainers when I open an issue in the project). You can download this cache defined in pem format from here.
Once the producer is created, sending a message is as executing the following command:
var deliveryReport = await producer.ProduceAsync(topicName, null, msg);
where topicName is the name of the event hub instance you created before.
Configuring a Confluent.Kafka Consumer
Similarly, consuming messages from event hubs via Confluent.Kafka requires very little code.
public static void Consumer(string brokerlist, string password, string consumergroup, string topicname, string cacertlocation) { var config = new Dictionary<string, object> { { "bootstrap.servers", brokerlist }, { "security.protocol","SASL_SSL" }, { "sasl.mechanism","PLAIN" }, { "sasl.username", "$ConnectionString"}, { "sasl.password", password }, { "ssl.ca.location",cacertlocation }, { "debug", "security,broker,protocol" }, { "group.id", consumergroup }, { "auto.commit.interval.ms", 5000 }, { "auto.offset.reset", "earliest" }, { "broker.version.fallback","0.10.0.0" }, { "api.version.fallback.ms","0" } }; using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) { consumer.OnMessage += (_, msg) => Console.WriteLine($"Read '{msg.Value}' from: {msg.TopicPartitionOffset}"); consumer.OnError += (_, error) => Console.WriteLine($"Error: {error}"); consumer.OnConsumeError += (_, msg) => Console.WriteLine($"Consume error ({msg.TopicPartitionOffset}): {msg.Error}"); consumer.Subscribe(topicname); while (true) { consumer.Poll(TimeSpan.FromMilliseconds(100)); } } }
It also uses a configuration settings approach to define consumer, with the same authentication protocols. When setting up a consumer, you must also define a consumer group, you can think of a consumer group as a view of the event hubs, allowing you to consume data from an event hub instance at your own pace, without having to read it all the way from the start again. You can find more info about consumers and consumer groups here. You configure which consumer group your consumer will user by setting up the group.id property of the config settings.
The consumer code deals with new messages arriving by setting a series of event handlers:
- consumer.OnMessage – this handler is triggered when a new message is ready to be processed.
- consumer.OnError – this handler is triggered when any error other then consuming a message occurs.
- consumer.OnConsumeError – this handler is triggered when an error occurs while consuming a message from the event hub.
The code snippet below bind the event handlers to the consumer client:
consumer.OnMessage += (_, msg) => Console.WriteLine($"Read '{msg.Value}' from: {msg.TopicPartitionOffset}"); consumer.OnError += (_, error) => Console.WriteLine($"Error: {error}"); consumer.OnConsumeError += (_, msg) => Console.WriteLine($"Consume error ({msg.TopicPartitionOffset}): {msg.Error}");
once the consumer is setup, it needs to subscribe to an event hub instance, which is achieved by the following command:
consumer.Subscribe(topicname);
Finally, we just need to poll event hubs for new messages:
consumer.Poll(TimeSpan.FromMilliseconds(100));
In Summary
Connecting to Event Hubs via a kafka protocol is extremely simple. If you are developing in .NET You can leverage from the Confluent.Kafka client, which will allow you to connect to it with a few lines of code.
If you, like me already had that code running and pointing to an existing Kafka Cluster, you can definitely reuse the existing code, with a few additional configuration changes.
In case you need a starter, you can find the sample code for this post on my GitHub – just go here.