Tạo Event Driven Architecture với Apache Kafka bằng .NET: Azure Event Hubs

Trong loạt bài này:

  1. Môi trường phát triển và Event Producer.
  2. Event Consumer.
  3. Tích hợp Azure Event Hubs (bài viết này).


Azure Event Hubs là một dịch vụ nhận sự kiện có thể mở rộng theo chiều ngang có khả năng nhận và xử lý hàng triệu sự kiện mỗi giây. Nó hỗ trợ Apache Kafka Producer và Consumer API mà bạn có thể sử dụng thay thế cho việc chạy một cụm Apache Kafka tự quản lý.

Giờ đây, bạn có thể tích hợp các ứng dụng hệ sinh thái Kafka như Kafdrop và nhiều ứng dụng khác với Event Hubs. Vui lòng truy cập trang web tài liệu của Microsoft để đọc chi tiết về cách sử dụng Azure Event Hub làm messaging backplane cho ứng dụng Apache Kafka.

Bây giờ chúng ta sẽ mở rộng ứng dụng TimeOff để sử dụng Kafka chạy trong Docker trong môi trường cục bộ và sử dụng Event Hub trong các môi trường khác. Bằng cách xây dựng các tính năng mới của ứng dụng, bạn cũng sẽ hiểu quá trình di chuyển ứng dụng sử dụng cụm Kafka tự lưu trữ hoặc được quản lý sang Event Hub.

Một trong những thành phần quan trọng của ứng dụng của chúng ta là Schema Registry, lưu trữ và thực thi các lược đồ giữa producer và consumer các sự kiện. Azure Schema Registry là một tính năng mới tùy chọn của Event Hubs hoạt động như một kho lưu trữ trung tâm cho các tài liệu lược đồ. Về mặt chức năng, nó tương tự như Confluent Schema Registry và bạn có thể sử dụng nó để thực thi các ràng buộc trong quá trình serialization và deserialization tin nhắn.

Chúng ta sẽ mở rộng ứng dụng của mình để sử dụng Azure Schema Registry trong môi trường không phải cục bộ và Confluent Schema Registry trong môi trường cục bộ.

Thiết lập dịch vụ Azure: Event Hubs

Di chuyển đến Azure Management Portal và tạo namespace Event Hubs mới. Tôi đã đặt tên cho namespace Event Hubs của mình là - timeoff-eh. Bạn có thể chọn bất kỳ tên không trùng lặp nào có sẵn cho namespace của mình. Lưu ý rằng dịch vụ đăng ký lược đồ chỉ có sẵn trong các gói tiêu chuẩn hoặc cao hơn. Vì vậy, vui lòng chọn mức giá cho phù hợp.

Tạo namespace cho Event Hubs

Hãy sao chép chuỗi kết nối của namespace từ cổng thông tin, mà sau này chúng ta sẽ sử dụng trong ứng dụng của mình.

Copy chuỗi kết nối namespace của Event Hubs

Chúng ta đã sử dụng Admin API của Kafka trong ứng dụng của mình để tạo các topic Kafka mới một cách nhanh chóng. Vì Event Hubs không hỗ trợ Admin API của Kafka, chúng ta sẽ tạo hai topic mà chúng ta yêu cầu - leave-applications (3 phân vùng) và leave-applications-results (1 phân vùng) dưới dạng các bản sao của Event Hubs như sau:

Tạo một Event Hub mới

Hãy nhớ rằng với Event Hubs, một topic Kafka tương ứng với một thể hiện của Event Hubs. Phần còn lại của các khái niệm, chẳng hạn như phân vùng, nhóm consumer và người nhận sự kiện, vẫn giữ nguyên. Bạn có thể thêm nhóm consumer theo cách thủ công hoặc để Consumer API của Kafka tự động tạo nhóm consumer (không cần thay đổi trong ứng dụng của chúng ta).

Nhóm consumer của một Event Hub

Bây giờ, bạn sẽ có hai phiên bản Event Hubs với số lượng phân vùng thích hợp trong namespace của bạn như sau:

Các Event Hub trong namespace

Bây giờ chúng ta hãy tạo Schema Registry.

Thiết lập dịch vụ Azure: Schema Registry

Hãy tạo một Schema Registry mới trong namespace Event Hubs của chúng ta. Lưu ý rằng Schema Registry là một tính năng độc lập mà bạn cũng có thể sử dụng với các dịch vụ nhắn tin khác như Service Bus và Azure Queues. Đầu tiên chúng ta sẽ tạo một Schema Group được sử dụng để quản lý một tập hợp các lược đồ. Bạn có thể tạo Schema Group cho mỗi miền hoặc ứng dụng.

Nhấp vào tùy chọn Schema Registry trong namespace Event Hubs của bạn, bên dưới Entities và chọn tùy chọn để tạo Schema Group mới như sau:

Tạo mới Schema Group

Nhập tên Schema Group là time-off-schema, định dạng serialization là Avro và chế độ tương thích.

Bây giờ chúng ta cần cấp quyền truy cập vào ứng dụng của mình để giao tiếp với Schema Registry để thêm các lược đồ còn thiếu và đọc các lược đồ hiện có. Azure khuyên bạn nên sử dụng Role-Based Access Control (RBAC) để cấp quyền truy cập cần thiết vào tài nguyên. Hãy đăng ký ứng dụng TimeOff trong Active Directory của chúng ta. Bạn có thể đọc chi tiết về quy trình đăng ký ứng dụng trên trang web tài liệu của Microsoft .

Di chuyển đến menu Active Directory của bạn và nhấp vào App registrations và chọn tùy chọn New registration như sau:

Đăng ký ứng dụng

Nhập tên ứng dụng và bấm vào nút Register.

Đăng ký ứng dụng TimeOff

Bây giờ chúng ta hãy tạo một client secret mới cho ứng dụng của chúng ta bằng cách nhấp vào Certificates and secrets và tiếp theo vào tùy chọn New secret. Điền vào các chi tiết được yêu cầu cho secret như sau:

Tạo client secret

Ghi lại secret ở màn hình tiếp theo. Secret sẽ được hiển thị một lần, hãy đảm bảo rằng bạn không làm mất giá trị secret.

Lưu lại giá trị của client secret

Ngoài ra, hãy ghi lại Client Id và Tenant Id của ứng dụng từ trang tổng quan như sau:

Ghi lại giá trị Client Id và Tenant Id

Bây giờ chúng ta hãy gán quyền Schema Registry cho ứng dụng TimeOff. Gán quyền Schema Registry Contributor cho ứng dụng TimeOff như sau, cho phép thêm lược đồ vào nhóm theo chương trình nếu nó không tồn tại:

Gán quyền cho ứng dụng TimeOff

Bây giờ, hãy cập nhật mã ứng dụng để chuyển sang Event Hubs dựa trên cài đặt cấu hình.

Cập nhật Producer: Service Employee

Mở rộng dự án TimeOff.Eaffee và thêm thiết lập IsLocalEnvironment với giá trị false trong tệp appsettings.json. Giá trị của thiết lập IsLocalEnvironment sẽ xác định xem chúng ta sử dụng Event Hub và Azure Schema Registry hay Kafka và Confluent Schema Registry trong ứng dụng của mình hay không.

Dựa trên giá trị của thiết lập, hãy tạo một đối tượng mới ProducerConfig được khởi tạo từ các chi tiết của Event Hubs như sau:

var config = new ProducerConfig()
{
    BootstrapServers = "<EH namespace>.servicebus.windows.net=9093",
    EnableDeliveryReports = true,
    SaslUsername = "$ConnectionString",
    SaslPassword = "<EH namespace connection string>"
};

config.ClientId = Dns.GetHostName();
config.SecurityProtocol = SecurityProtocol.SaslSsl;
config.SaslMechanism = SaslMechanism.Plain;

Tiếp theo, chúng ta cần khởi tạo Schema Registry client SchemaRegistryClient mà chúng ta sẽ sử dụng để tương tác với Azure Schema Registry như sau:

var schemaRegistryClientAz = new SchemaRegistryClient("<EH namespace>.servicebus.windows.net", new DefaultAzureCredential());

DefaultAzureCredential cho phép bạn chuyển đổi định danh dựa trên môi trường. Lớp DefaultAzureCredential kết hợp vài lớp định danh được sử dụng để lấy định danh Azure Active Directory.

Trong thời gian chạy, DefaultAzureCredential bắt đầu cố gắng khởi tạo một trong các lớp định danh bắt đầu bằng EnvironmentCredential và kết thúc bằng InteractiveBrowserCredential.

Bất kỳ lớp định danh nào được khởi tạo trước sẽ được sử dụng để xác thực các lệnh gọi API tài nguyên. Bạn có thể đọc thêm về thư viện Azure Schema Registry cho .NET từ tệp ReadMe của SDK.

Hãy thêm các biến môi trường sau được sử dụng bởi lớp EnvironmentCredential vào ứng dụng của chúng ta trong tệp launchsettings.json. Schema Registry client sẽ sử dụng thông tin đăng nhập để xác thực các yêu cầu được gửi để tạo hoặc đọc lược đồ từ sổ đăng ký.

Lưu ý rằng chúng ta đang thêm các thông tin xác thực này vào tệp thiết lập khởi chạy để hỗ trợ chúng ta trong quá trình gỡ lỗi và mô phỏng hành vi của ứng dụng trong môi trường không cục bộ.

{
  "profiles": {
    "TimeOff.Employee": {
      "commandName": "Project",
      "environmentVariables": {
        "AZURE_CLIENT_SECRET": "<TimeOff application client secret>",
        "AZURE_CLIENT_ID": "<TimeOff application client ID>",
        "AZURE_TENANT_ID": "<TimeOff application tenant ID>"
      }
    }
  }
}

Nếu bạn quan sát phương thức SetKeySerializer trong lớp ProducerBuilder của Kafka Producer API, bạn sẽ nhận thấy rằng nó yêu cầu một đối tượng kiểu IAsyncSerializer. Việc triển khai serializer nhận một đối tượng và trả về một mảng byte. Hãy tạo một triển khai của interface IAsyncSerializer để serialize một đối tượng với lược đồ và trả về một mảng byte như sau:

public class KafkaAvroAsyncSerializer<T> : IAsyncSerializer<T>
{
    private readonly SchemaRegistryAvroObjectSerializer _serializer;

    public KafkaAvroAsyncSerializer(SchemaRegistryClient schemaRegistryClient, string schemaGroup,
        bool autoRegisterSchemas = true)
    {
        _serializer = new SchemaRegistryAvroObjectSerializer(
            schemaRegistryClient,
            schemaGroup,
            new SchemaRegistryAvroObjectSerializerOptions
            {
                AutoRegisterSchemas = autoRegisterSchemas
            });
    }

    public async Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        if (data == null)
        {
            return null;
        }

        // SchemaRegistryAvroObjectSerializer can only serialize GenericRecord or ISpecificRecord.
        if (data is string s)
        {
            return Encoding.ASCII.GetBytes(s);
        }

        await using var stream = new MemoryStream();
        await _serializer.SerializeAsync(stream, data, typeof(T), CancellationToken.None);
        return stream.ToArray();
    }
}

Lưu ý rằng Azure Schema Registry serializer chỉ có thể serialize các đối tượng của một trong hai kiểu là GenericRecord hoặc ISpecificRecord. Vì các khóa tin nhắn của chúng ta là kiểu string, chúng ta đã xử lý trường hợp đặc biệt là serialize kiểu dữ liệu string.

Tôi sẽ lạc đề một chút và thu hút sự chú ý của bạn đến phương thức SetKeyDeserializer của lớp ConsumerBuilder sử dụng kiểu triển khai của interface IDeserializer để deserialize các tin nhắn nhận được từ topic Kafka. Hãy viết một triển khai tùy chỉnh của interface IDeserializer cho ứng dụng của chúng ta như sau:

public class KafkaAvroDeserializer<T> : IDeserializer<T>
{
    private readonly SchemaRegistryAvroObjectSerializer _serializer;

    public KafkaAvroDeserializer(SchemaRegistryClient schemaRegistryClient, string schemaGroup)
    {
        _serializer = new SchemaRegistryAvroObjectSerializer(schemaRegistryClient, schemaGroup);
    }

    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (data.IsEmpty)
        {
            return default;
        }

        // SchemaRegistryAvroObjectSerializer can only serialize GenericRecord or ISpecificRecord.
        if (typeof(T) == typeof(string))
        {
            return (T) Convert.ChangeType(Encoding.ASCII.GetString(data.ToArray()), typeof(T));
        }

        return (T) _serializer.Deserialize(new MemoryStream(data.ToArray()), typeof(T), CancellationToken.None);
    }
}

Sau đây là mã hoàn chỉnh tạo một Schema Registry client thích hợp dựa trên môi trường của ứng dụng. Dựa trên Schema Registry đã chọn, IProducer client được tạo để có thể gửi tin nhắn đến topic Kafka:

CachedSchemaRegistryClient cachedSchemaRegistryClient = null;
KafkaAvroAsyncSerializer<string> kafkaAvroAsyncKeySerializer = null;
KafkaAvroAsyncSerializer<LeaveApplicationReceived> kafkaAvroAsyncValueSerializer = null;

if (Convert.ToBoolean(Configuration["IsLocalEnvironment"]))
{
    cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
}
else
{
    var schemaRegistryClientAz =
        new SchemaRegistryClient(Configuration["SchemaRegistryUrlAz"], new DefaultAzureCredential());
    var schemaGroupName = "time-off-schema";
    kafkaAvroAsyncKeySerializer =
        new KafkaAvroAsyncSerializer<string>(schemaRegistryClientAz, schemaGroupName);
    kafkaAvroAsyncValueSerializer =
        new KafkaAvroAsyncSerializer<LeaveApplicationReceived>(schemaRegistryClientAz, schemaGroupName);
}

using var producer = new ProducerBuilder<string, LeaveApplicationReceived>(config)
    .SetKeySerializer(Convert.ToBoolean(Configuration["IsLocalEnvironment"])
        ? new AvroSerializer<string>(cachedSchemaRegistryClient)
        : kafkaAvroAsyncKeySerializer)
    .SetValueSerializer(Convert.ToBoolean(Configuration["IsLocalEnvironment"])
        ? new AvroSerializer<LeaveApplicationReceived>(cachedSchemaRegistryClient)
        : kafkaAvroAsyncValueSerializer)
    .Build();

Chúng ta không cần thực hiện bất kỳ thay đổi nào khác đối với phần còn lại của project vì các API Kafka Producer và Client API hoàn toàn tương thích với Event Hubs. Các API Schema Registry là độc quyền của Confluent và do đó chúng ta phải thực hiện các thay đổi đối với việc triển khai serializer và deserializer.

Microsoft đã xác định các vấn đề với các API Schema Registry độc quyền và gửi đặc tả API trung lập với nhà cung cấp cho CNCF. Nếu các nhà cung cấp Schema Registry tuân thủ một thông số kỹ thuật tiêu chuẩn, chúng ta sẽ không cần thực hiện bất kỳ thay đổi nào đối với ứng dụng.

Cập nhật Consumer: Service Manager

Service Manager quản lý những thay đổi mà tôi đã mô tả trước đây, chúng ta cần thực hiện một thay đổi nhỏ để tích hợp deserializer Schema Registry trong IConsumer như sau:

CachedSchemaRegistryClient cachedSchemaRegistryClient = null;
KafkaAvroDeserializer<string> kafkaAvroKeyDeserializer = null;
KafkaAvroDeserializer<LeaveApplicationReceived> kafkaAvroValueDeserializer = null;

if (Convert.ToBoolean(Configuration["IsLocalEnvironment"]))
{
    cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
}
else
{
    var schemaRegistryClientAz =
        new SchemaRegistryClient(Configuration["SchemaRegistryUrlAz"], new DefaultAzureCredential());
    var schemaGroupName = Configuration["SchemaRegistryGroupNameAz"];
    kafkaAvroKeyDeserializer =
        new KafkaAvroDeserializer<string>(schemaRegistryClientAz, schemaGroupName);
    kafkaAvroValueDeserializer =
        new KafkaAvroDeserializer<LeaveApplicationReceived>(schemaRegistryClientAz, schemaGroupName);
}

using var consumer = new ConsumerBuilder<string, LeaveApplicationReceived>(consumerConfig)
    .SetKeyDeserializer(
        Convert.ToBoolean(Configuration["IsLocalEnvironment"])
            ? new AvroDeserializer<string>(cachedSchemaRegistryClient).AsSyncOverAsync()
            : kafkaAvroKeyDeserializer)
    .SetValueDeserializer(Convert.ToBoolean(Configuration["IsLocalEnvironment"])
        ? new AvroDeserializer<LeaveApplicationReceived>(cachedSchemaRegistryClient).AsSyncOverAsync()
        : kafkaAvroValueDeserializer)
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .Build();

Bản trình diễn ứng dụng

Hãy kiểm tra xem mọi thứ có hoạt động như mong đợi hay không. Hãy khởi chạy service Employee và gửi đơn xin nghỉ việc mới.

Tính năng Azure Event Hubs Capture cho phép bạn nắm bắt định kỳ và liên tục dữ liệu truyền trực tuyến từ các Trung tâm sự kiện đến bộ lưu trữ Azure Blob hoặc Azure Data Lake Store. Bạn có thể dễ dàng cấu hình tính năng này thông qua cổng quản lý bằng cách làm theo hướng dẫn trên trang web tài liệu của Microsoft.

Sau đây là ảnh chụp màn hình của tệp được lưu trữ trong bộ nhớ Azure Blob có chứa sự kiện mà chúng ta đã ghi lại trong Event Hubs. Lưu ý rằng trình định dạng cú pháp Avro không có sẵn trong cổng thông tin. Do đó, bạn sẽ thấy một số văn bản Unicode trong cửa sổ trình soạn thảo.

Xem event trong Azure Blob Storage

Bây giờ chúng ta hãy khởi chạy service Manager để xem đơn xin nghỉ việc đã được ghi lại và thực hiện nó như sau:

Chạy service Manager

Cuối cùng, hãy xem trạng thái của đơn xin nghỉ việc bằng cách khởi chạy một phiên bản mới của dịch vụ ResultReader như sau:

Chạy service Result Reader

Bạn có thể xem các tệp lược đồ Avro trong nhóm time-off-schema như sau:

Xem lược đồ

Phần kết luận

Trong bài viết này, chúng ta đã mở rộng ứng dụng của mình để sử dụng Event Hubs để nhắn tin. Event Hubs hỗ trợ các API của Kafka Producer và Consumer, vì vậy chúng ta không phải thực hiện bất kỳ thay đổi nào đối với các phần của ứng dụng tạo ra và sử dụng các sự kiện.

Tuy nhiên, chúng ta phải tạo các thể hiện của Event Hubs thông qua portal vì Event Hubs không hỗ trợ các API Kafka Admin. Chúng ta đã cắm một serializer và deserializer tùy chỉnh vào ứng dụng của mình để thay thế các API Confluent Schema Registry độc quyền.

Tôi hy vọng bạn thích đọc các bài viết trong loạt bài này và bạn đã có được sự tự tin để di chuyển các ứng dụng hiện có hoặc tạo các ứng dụng mới sử dụng Kafka để nhắn tin.

Event Driven ArchitectureSoftware ArchitectureAzure Event Hubs
Bài Viết Liên Quan:
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Consumer
Trung Nguyen 01/05/2021
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Consumer

Trong bài viết này, chúng ta sẽ tìm hiểu cách tạo consumer xử lý tin nhắn sử dụng Consumer API của Kafka bằng .NET.

Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Producer
Trung Nguyen 01/05/2021
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Producer

Trong bài viết này, chúng ta sẽ tìm hiểu cách chuẩn bị môi trường cục bộ để phát triển và xuất bản tin nhắn tới Kafka bằng .NET.

Event Driven Architecture: Queue vs Log - Nghiên cứu điển hình
Trung Nguyen 30/04/2021
Event Driven Architecture: Queue vs Log - Nghiên cứu điển hình

Trong bài viết này, chúng ta sẽ tìm hiểu một số case study về hệ thống dựa trên queue (RabbitMQ) và log (Kafka) trong kiến trúc Event Driven Architecture.

Event Driven Architecture: Queue vs Log
Trung Nguyen 30/04/2021
Event Driven Architecture: Queue vs Log

Trong hướng dẫn này, chúng tôi sẽ so sánh hệ thống tin nhắn dựa trên queue và dựa trên log trong kiến trúc hệ thống hướng sự kiện (event driven architecture)