Trong loạt bài này:
Azure Event Hubs là một dịch vụ nhận sự kiện có thể mở rộng theo chiều ngang có khả năng nhận và xử lý hàng triệu sự kiện mỗi giây. Nó hỗ trợ Apache Kafka Producer và Consumer API mà bạn có thể sử dụng thay thế cho việc chạy một cụm Apache Kafka tự quản lý.
Giờ đây, bạn có thể tích hợp các ứng dụng hệ sinh thái Kafka như Kafdrop và nhiều ứng dụng khác với Event Hubs. Vui lòng truy cập trang web tài liệu của Microsoft để đọc chi tiết về cách sử dụng Azure Event Hub làm messaging backplane cho ứng dụng Apache Kafka.
Bây giờ chúng ta sẽ mở rộng ứng dụng TimeOff để sử dụng Kafka chạy trong Docker trong môi trường cục bộ và sử dụng Event Hub trong các môi trường khác. Bằng cách xây dựng các tính năng mới của ứng dụng, bạn cũng sẽ hiểu quá trình di chuyển ứng dụng sử dụng cụm Kafka tự lưu trữ hoặc được quản lý sang Event Hub.
Một trong những thành phần quan trọng của ứng dụng của chúng ta là Schema Registry, lưu trữ và thực thi các lược đồ giữa producer và consumer các sự kiện. Azure Schema Registry là một tính năng mới tùy chọn của Event Hubs hoạt động như một kho lưu trữ trung tâm cho các tài liệu lược đồ. Về mặt chức năng, nó tương tự như Confluent Schema Registry và bạn có thể sử dụng nó để thực thi các ràng buộc trong quá trình serialization và deserialization tin nhắn.
Chúng ta sẽ mở rộng ứng dụng của mình để sử dụng Azure Schema Registry trong môi trường không phải cục bộ và Confluent Schema Registry trong môi trường cục bộ.
Di chuyển đến Azure Management Portal và tạo namespace Event Hubs mới. Tôi đã đặt tên cho namespace Event Hubs của mình là - timeoff-eh. Bạn có thể chọn bất kỳ tên không trùng lặp nào có sẵn cho namespace của mình. Lưu ý rằng dịch vụ đăng ký lược đồ chỉ có sẵn trong các gói tiêu chuẩn hoặc cao hơn. Vì vậy, vui lòng chọn mức giá cho phù hợp.
Hãy sao chép chuỗi kết nối của namespace từ cổng thông tin, mà sau này chúng ta sẽ sử dụng trong ứng dụng của mình.
Chúng ta đã sử dụng Admin API của Kafka trong ứng dụng của mình để tạo các topic Kafka mới một cách nhanh chóng. Vì Event Hubs không hỗ trợ Admin API của Kafka, chúng ta sẽ tạo hai topic mà chúng ta yêu cầu - leave-applications (3 phân vùng) và leave-applications-results (1 phân vùng) dưới dạng các bản sao của Event Hubs như sau:
Hãy nhớ rằng với Event Hubs, một topic Kafka tương ứng với một thể hiện của Event Hubs. Phần còn lại của các khái niệm, chẳng hạn như phân vùng, nhóm consumer và người nhận sự kiện, vẫn giữ nguyên. Bạn có thể thêm nhóm consumer theo cách thủ công hoặc để Consumer API của Kafka tự động tạo nhóm consumer (không cần thay đổi trong ứng dụng của chúng ta).
Bây giờ, bạn sẽ có hai phiên bản Event Hubs với số lượng phân vùng thích hợp trong namespace của bạn như sau:
Bây giờ chúng ta hãy tạo Schema Registry.
Hãy tạo một Schema Registry mới trong namespace Event Hubs của chúng ta. Lưu ý rằng Schema Registry là một tính năng độc lập mà bạn cũng có thể sử dụng với các dịch vụ nhắn tin khác như Service Bus và Azure Queues. Đầu tiên chúng ta sẽ tạo một Schema Group được sử dụng để quản lý một tập hợp các lược đồ. Bạn có thể tạo Schema Group cho mỗi miền hoặc ứng dụng.
Nhấp vào tùy chọn Schema Registry trong namespace Event Hubs của bạn, bên dưới Entities và chọn tùy chọn để tạo Schema Group mới như sau:
Nhập tên Schema Group là time-off-schema, định dạng serialization là Avro và chế độ tương thích.
Bây giờ chúng ta cần cấp quyền truy cập vào ứng dụng của mình để giao tiếp với Schema Registry để thêm các lược đồ còn thiếu và đọc các lược đồ hiện có. Azure khuyên bạn nên sử dụng Role-Based Access Control (RBAC) để cấp quyền truy cập cần thiết vào tài nguyên. Hãy đăng ký ứng dụng TimeOff trong Active Directory của chúng ta. Bạn có thể đọc chi tiết về quy trình đăng ký ứng dụng trên trang web tài liệu của Microsoft .
Di chuyển đến menu Active Directory của bạn và nhấp vào App registrations và chọn tùy chọn New registration như sau:
Nhập tên ứng dụng và bấm vào nút Register.
Bây giờ chúng ta hãy tạo một client secret mới cho ứng dụng của chúng ta bằng cách nhấp vào Certificates and secrets và tiếp theo vào tùy chọn New secret. Điền vào các chi tiết được yêu cầu cho secret như sau:
Ghi lại secret ở màn hình tiếp theo. Secret sẽ được hiển thị một lần, hãy đảm bảo rằng bạn không làm mất giá trị secret.
Ngoài ra, hãy ghi lại Client Id và Tenant Id của ứng dụng từ trang tổng quan như sau:
Bây giờ chúng ta hãy gán quyền Schema Registry cho ứng dụng TimeOff. Gán quyền Schema Registry Contributor cho ứng dụng TimeOff như sau, cho phép thêm lược đồ vào nhóm theo chương trình nếu nó không tồn tại:
Bây giờ, hãy cập nhật mã ứng dụng để chuyển sang Event Hubs dựa trên cài đặt cấu hình.
Mở rộng dự án TimeOff.Eaffee và thêm thiết lập IsLocalEnvironment với giá trị false trong tệp appsettings.json. Giá trị của thiết lập IsLocalEnvironment sẽ xác định xem chúng ta sử dụng Event Hub và Azure Schema Registry hay Kafka và Confluent Schema Registry trong ứng dụng của mình hay không.
Dựa trên giá trị của thiết lập, hãy tạo một đối tượng mới ProducerConfig
được khởi tạo từ các chi tiết của Event Hubs như sau:
var config = new ProducerConfig()
{
BootstrapServers = "<EH namespace>.servicebus.windows.net=9093",
EnableDeliveryReports = true,
SaslUsername = "$ConnectionString",
SaslPassword = "<EH namespace connection string>"
};
config.ClientId = Dns.GetHostName();
config.SecurityProtocol = SecurityProtocol.SaslSsl;
config.SaslMechanism = SaslMechanism.Plain;
Tiếp theo, chúng ta cần khởi tạo Schema Registry client SchemaRegistryClient
mà chúng ta sẽ sử dụng để tương tác với Azure Schema Registry như sau:
var schemaRegistryClientAz = new SchemaRegistryClient("<EH namespace>.servicebus.windows.net", new DefaultAzureCredential());
DefaultAzureCredential
cho phép bạn chuyển đổi định danh dựa trên môi trường. Lớp DefaultAzureCredential
kết hợp vài lớp định danh được sử dụng để lấy định danh Azure Active Directory.
Trong thời gian chạy, DefaultAzureCredential
bắt đầu cố gắng khởi tạo một trong các lớp định danh bắt đầu bằng EnvironmentCredential
và kết thúc bằng InteractiveBrowserCredential
.
Bất kỳ lớp định danh nào được khởi tạo trước sẽ được sử dụng để xác thực các lệnh gọi API tài nguyên. Bạn có thể đọc thêm về thư viện Azure Schema Registry cho .NET từ tệp ReadMe của SDK.
Hãy thêm các biến môi trường sau được sử dụng bởi lớp EnvironmentCredential
vào ứng dụng của chúng ta trong tệp launchsettings.json. Schema Registry client sẽ sử dụng thông tin đăng nhập để xác thực các yêu cầu được gửi để tạo hoặc đọc lược đồ từ sổ đăng ký.
Lưu ý rằng chúng ta đang thêm các thông tin xác thực này vào tệp thiết lập khởi chạy để hỗ trợ chúng ta trong quá trình gỡ lỗi và mô phỏng hành vi của ứng dụng trong môi trường không cục bộ.
{
"profiles": {
"TimeOff.Employee": {
"commandName": "Project",
"environmentVariables": {
"AZURE_CLIENT_SECRET": "<TimeOff application client secret>",
"AZURE_CLIENT_ID": "<TimeOff application client ID>",
"AZURE_TENANT_ID": "<TimeOff application tenant ID>"
}
}
}
}
Nếu bạn quan sát phương thức SetKeySerializer
trong lớp ProducerBuilder
của Kafka Producer API, bạn sẽ nhận thấy rằng nó yêu cầu một đối tượng kiểu IAsyncSerializer
. Việc triển khai serializer nhận một đối tượng và trả về một mảng byte. Hãy tạo một triển khai của interface IAsyncSerializer
để serialize một đối tượng với lược đồ và trả về một mảng byte như sau:
public class KafkaAvroAsyncSerializer<T> : IAsyncSerializer<T>
{
private readonly SchemaRegistryAvroObjectSerializer _serializer;
public KafkaAvroAsyncSerializer(SchemaRegistryClient schemaRegistryClient, string schemaGroup,
bool autoRegisterSchemas = true)
{
_serializer = new SchemaRegistryAvroObjectSerializer(
schemaRegistryClient,
schemaGroup,
new SchemaRegistryAvroObjectSerializerOptions
{
AutoRegisterSchemas = autoRegisterSchemas
});
}
public async Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
if (data == null)
{
return null;
}
// SchemaRegistryAvroObjectSerializer can only serialize GenericRecord or ISpecificRecord.
if (data is string s)
{
return Encoding.ASCII.GetBytes(s);
}
await using var stream = new MemoryStream();
await _serializer.SerializeAsync(stream, data, typeof(T), CancellationToken.None);
return stream.ToArray();
}
}
Lưu ý rằng Azure Schema Registry serializer chỉ có thể serialize các đối tượng của một trong hai kiểu là GenericRecord
hoặc ISpecificRecord
. Vì các khóa tin nhắn của chúng ta là kiểu string
, chúng ta đã xử lý trường hợp đặc biệt là serialize kiểu dữ liệu string
.
Tôi sẽ lạc đề một chút và thu hút sự chú ý của bạn đến phương thức SetKeyDeserializer
của lớp ConsumerBuilder
sử dụng kiểu triển khai của interface IDeserializer
để deserialize các tin nhắn nhận được từ topic Kafka. Hãy viết một triển khai tùy chỉnh của interface IDeserializer
cho ứng dụng của chúng ta như sau:
public class KafkaAvroDeserializer<T> : IDeserializer<T>
{
private readonly SchemaRegistryAvroObjectSerializer _serializer;
public KafkaAvroDeserializer(SchemaRegistryClient schemaRegistryClient, string schemaGroup)
{
_serializer = new SchemaRegistryAvroObjectSerializer(schemaRegistryClient, schemaGroup);
}
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (data.IsEmpty)
{
return default;
}
// SchemaRegistryAvroObjectSerializer can only serialize GenericRecord or ISpecificRecord.
if (typeof(T) == typeof(string))
{
return (T) Convert.ChangeType(Encoding.ASCII.GetString(data.ToArray()), typeof(T));
}
return (T) _serializer.Deserialize(new MemoryStream(data.ToArray()), typeof(T), CancellationToken.None);
}
}
Sau đây là mã hoàn chỉnh tạo một Schema Registry client thích hợp dựa trên môi trường của ứng dụng. Dựa trên Schema Registry đã chọn, IProducer
client được tạo để có thể gửi tin nhắn đến topic Kafka:
CachedSchemaRegistryClient cachedSchemaRegistryClient = null;
KafkaAvroAsyncSerializer<string> kafkaAvroAsyncKeySerializer = null;
KafkaAvroAsyncSerializer<LeaveApplicationReceived> kafkaAvroAsyncValueSerializer = null;
if (Convert.ToBoolean(Configuration["IsLocalEnvironment"]))
{
cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
}
else
{
var schemaRegistryClientAz =
new SchemaRegistryClient(Configuration["SchemaRegistryUrlAz"], new DefaultAzureCredential());
var schemaGroupName = "time-off-schema";
kafkaAvroAsyncKeySerializer =
new KafkaAvroAsyncSerializer<string>(schemaRegistryClientAz, schemaGroupName);
kafkaAvroAsyncValueSerializer =
new KafkaAvroAsyncSerializer<LeaveApplicationReceived>(schemaRegistryClientAz, schemaGroupName);
}
using var producer = new ProducerBuilder<string, LeaveApplicationReceived>(config)
.SetKeySerializer(Convert.ToBoolean(Configuration["IsLocalEnvironment"])
? new AvroSerializer<string>(cachedSchemaRegistryClient)
: kafkaAvroAsyncKeySerializer)
.SetValueSerializer(Convert.ToBoolean(Configuration["IsLocalEnvironment"])
? new AvroSerializer<LeaveApplicationReceived>(cachedSchemaRegistryClient)
: kafkaAvroAsyncValueSerializer)
.Build();
Chúng ta không cần thực hiện bất kỳ thay đổi nào khác đối với phần còn lại của project vì các API Kafka Producer và Client API hoàn toàn tương thích với Event Hubs. Các API Schema Registry là độc quyền của Confluent và do đó chúng ta phải thực hiện các thay đổi đối với việc triển khai serializer và deserializer.
Microsoft đã xác định các vấn đề với các API Schema Registry độc quyền và gửi đặc tả API trung lập với nhà cung cấp cho CNCF. Nếu các nhà cung cấp Schema Registry tuân thủ một thông số kỹ thuật tiêu chuẩn, chúng ta sẽ không cần thực hiện bất kỳ thay đổi nào đối với ứng dụng.
Service Manager quản lý những thay đổi mà tôi đã mô tả trước đây, chúng ta cần thực hiện một thay đổi nhỏ để tích hợp deserializer Schema Registry trong IConsumer
như sau:
CachedSchemaRegistryClient cachedSchemaRegistryClient = null;
KafkaAvroDeserializer<string> kafkaAvroKeyDeserializer = null;
KafkaAvroDeserializer<LeaveApplicationReceived> kafkaAvroValueDeserializer = null;
if (Convert.ToBoolean(Configuration["IsLocalEnvironment"]))
{
cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
}
else
{
var schemaRegistryClientAz =
new SchemaRegistryClient(Configuration["SchemaRegistryUrlAz"], new DefaultAzureCredential());
var schemaGroupName = Configuration["SchemaRegistryGroupNameAz"];
kafkaAvroKeyDeserializer =
new KafkaAvroDeserializer<string>(schemaRegistryClientAz, schemaGroupName);
kafkaAvroValueDeserializer =
new KafkaAvroDeserializer<LeaveApplicationReceived>(schemaRegistryClientAz, schemaGroupName);
}
using var consumer = new ConsumerBuilder<string, LeaveApplicationReceived>(consumerConfig)
.SetKeyDeserializer(
Convert.ToBoolean(Configuration["IsLocalEnvironment"])
? new AvroDeserializer<string>(cachedSchemaRegistryClient).AsSyncOverAsync()
: kafkaAvroKeyDeserializer)
.SetValueDeserializer(Convert.ToBoolean(Configuration["IsLocalEnvironment"])
? new AvroDeserializer<LeaveApplicationReceived>(cachedSchemaRegistryClient).AsSyncOverAsync()
: kafkaAvroValueDeserializer)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
Hãy kiểm tra xem mọi thứ có hoạt động như mong đợi hay không. Hãy khởi chạy service Employee và gửi đơn xin nghỉ việc mới.
Tính năng Azure Event Hubs Capture cho phép bạn nắm bắt định kỳ và liên tục dữ liệu truyền trực tuyến từ các Trung tâm sự kiện đến bộ lưu trữ Azure Blob hoặc Azure Data Lake Store. Bạn có thể dễ dàng cấu hình tính năng này thông qua cổng quản lý bằng cách làm theo hướng dẫn trên trang web tài liệu của Microsoft.
Sau đây là ảnh chụp màn hình của tệp được lưu trữ trong bộ nhớ Azure Blob có chứa sự kiện mà chúng ta đã ghi lại trong Event Hubs. Lưu ý rằng trình định dạng cú pháp Avro không có sẵn trong cổng thông tin. Do đó, bạn sẽ thấy một số văn bản Unicode trong cửa sổ trình soạn thảo.
Bây giờ chúng ta hãy khởi chạy service Manager để xem đơn xin nghỉ việc đã được ghi lại và thực hiện nó như sau:
Cuối cùng, hãy xem trạng thái của đơn xin nghỉ việc bằng cách khởi chạy một phiên bản mới của dịch vụ ResultReader như sau:
Bạn có thể xem các tệp lược đồ Avro trong nhóm time-off-schema như sau:
Trong bài viết này, chúng ta đã mở rộng ứng dụng của mình để sử dụng Event Hubs để nhắn tin. Event Hubs hỗ trợ các API của Kafka Producer và Consumer, vì vậy chúng ta không phải thực hiện bất kỳ thay đổi nào đối với các phần của ứng dụng tạo ra và sử dụng các sự kiện.
Tuy nhiên, chúng ta phải tạo các thể hiện của Event Hubs thông qua portal vì Event Hubs không hỗ trợ các API Kafka Admin. Chúng ta đã cắm một serializer và deserializer tùy chỉnh vào ứng dụng của mình để thay thế các API Confluent Schema Registry độc quyền.
Tôi hy vọng bạn thích đọc các bài viết trong loạt bài này và bạn đã có được sự tự tin để di chuyển các ứng dụng hiện có hoặc tạo các ứng dụng mới sử dụng Kafka để nhắn tin.
Bạn có thể vui lòng tắt trình chặn quảng cáo ❤️ để hỗ trợ chúng tôi duy trì hoạt động của trang web.
Trong bài viết này, chúng ta sẽ tìm hiểu cách tạo consumer xử lý tin nhắn sử dụng Consumer API của Kafka bằng .NET.
Trong bài viết này, chúng ta sẽ tìm hiểu cách chuẩn bị môi trường cục bộ để phát triển và xuất bản tin nhắn tới Kafka bằng .NET.
Trong bài viết này, chúng ta sẽ tìm hiểu một số case study về hệ thống dựa trên queue (RabbitMQ) và log (Kafka) trong kiến trúc Event Driven Architecture.
Trong hướng dẫn này, chúng tôi sẽ so sánh hệ thống tin nhắn dựa trên queue và dựa trên log trong kiến trúc hệ thống hướng sự kiện (event driven architecture)