Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Consumer

Trong loạt bài này:

  1. Môi trường phát triển và Event Producer.
  2. Event Consumer (bài viết này).
  3. Tích hợp Azure Event Hubs.

Hãy tiếp tục cuộc thảo luận của chúng ta ở bài viết trước và triển khai consumer cho những sự kiện được publish bởi service Employee đến topic leave-applications. Chúng ta sẽ mở rộng ứng dụng mà chúng ta đã phát triển trước đó để thêm hai service mới để chứng minh cách hoạt động của consumer Kafka: service Manager service Result reader.

Ví dụ về consumer: Service Manager

Service Manager đóng vai trò vừa là consumer vừa là producer các sự kiện. Dịch vụ này đọc các đơn xin nghỉ việc từ topic leave-applications (consumer), nhận quyết định approve hay reject của người quản lý và publish kết quả như là một sự kiện mang tên leave application processed đến topic Kafka leave-applications-results (producer).

Vì trước đây chúng ta đã thảo luận chi tiết về Publisher API và việc triển khai nó trong service Employee trong bài viết trước, nên tôi sẽ không đề cập lại tính năng của event producer. Tôi khuyến khích bạn cố gắng xây dựng tính năng publisher của dịch vụ bằng cách sử dụng phiên bản mã nguồn của tôi làm hướng dẫn.

Khởi chạy Visual Studio hoặc VS Code của bạn để tạo ứng dụng .NET Core Console mới có tên TimeOff.Manager trong cùng solution với service Employee.

Như trước đây, chúng ta sẽ cài đặt các gói NuGet sau vào project của mình để cho phép ứng dụng của chúng ta hiểu cách tạo và sử dụng tin nhắn:

Install-Package Confluent.Kafka
Install-Package Confluent.SchemaRegistry.Serdes.Avro

Mở tệp lớp Program trong trình soạn thảo của bạn và bắt đầu viết code vào phương thức Main theo chỉ dẫn. Bạn có thể truy cập Consumer API của Kafka thông qua một thể hiện của lớp kế thừa interface IConsumer. Như trước đây, chúng ta cần đăng ký lược đồ client (CachedSchemaRegistryClient) để thực thi các ràng buộc lược đồ đối với consumer.

Giống như producer client, consumer client yêu cầu một số tham số khởi tạo nhất định, chẳng hạn như danh sách các máy chủ Bootstrap, các broker mà client sẽ kết nối ban đầu. Sử dụng mã sau để tạo các cấu hình sẽ được sử dụng để khởi tạo consumer client.

var schemaRegistryConfig = new SchemaRegistryConfig 
{ 
    Url = "http://127.0.0.1:8081" 
};
var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "127.0.0.1:9092",
    GroupId = "manager",
    EnableAutoCommit = false,
    EnableAutoOffsetStore = false,
    // Read messages from start if no commit exists.
    AutoOffsetReset = AutoOffsetReset.Earliest,
    MaxPollIntervalMs = 10000
};

Hãy thảo luận chi tiết hơn về các thuộc tính khởi tạo và giá trị của chúng. Nhiều consumer có thể được nhóm lại thành một nhóm consumer với một GroupId được xác định duy nhất. Kafka sẽ tự động cân đối phân bổ các phân vùng cho những consumer thuộc cùng một nhóm consumer.

Khi consumer đọc tin nhắn từ một phân vùng, chúng lưu trữ một con trỏ đến vị trí của chúng trong phân vùng (được gọi là offset) trong Kafka. Kafka lưu trữ thông tin này trong một topic có tên __consumer_offsets. Nếu consumer tiếp tục xử lý sau một thời gian trì hoãn do các lần tắt máy theo lịch trình hoặc ứng dụng gặp sự cố, chúng có thể tiếp tục xử lý các tin nhắn từ nơi đã dừng trước đó.

Kafka .NET client có thể tự động ghi lại và lưu trữ định kỳ các offset trong Kafka. Chúng ta có thể tắt quy trình duy trì offset tự động bằng cách đặt giá trị của thuộc tính EnableAutoCommit thành false để kiểm soát tốt hơn. Tính năng duy trì offset tự động sử dụng cơ sở dữ liệu trong bộ nhớ để ghi lại các offset. Chúng ta có thể tắt tính năng cơ sở dữ liệu một cách an toàn bằng cách đặt giá trị của thuộc tính EnableAutoOffsetStore thành false.

Khi bạn bắt đầu một consumer, bạn có thể cấu hình nó để bắt đầu đọc dữ liệu từ offset được ghi cuối cùng hoặc từ phần bắt đầu của phân vùng. Theo mặc định, consumer nhận được các tin nhắn được xếp hàng đợi vào các phân vùng của nó sau khi tiến trình consumer được bắt đầu. Chúng ta không muốn mất tin nhắn nếu client của chúng ta gặp sự cố, vì vậy chúng ta sẽ đặt giá trị của thuộc tính AutoOffsetReset thành AutoOffsetReset.Earliest.

Về cơ bản, phương thức MaxPollIntervalMs chỉ định thời lượng tính bằng mili giây mà sau đó bạn phải gọi phương thức IConsumer.Consume. Nếu vượt quá khoảng này, Kafka sẽ coi consumer là không thành công và nó sẽ cân bằng lại các phân vùng để gán các phân vùng bị ảnh hưởng cho consumer khỏe mạnh.

Vì việc sử dụng tin nhắn nhạy cảm với thời gian, bạn phải ghi lại và lưu trữ các khoảng thời gian chênh lệch trong khoảng thời gian mà bạn chỉ định. Đối với các tiến trình có thể yêu cầu một lượng thời gian thay đổi để xử lý thư, tôi khuyên bạn nên ghi lại tin nhắn trong cơ sở dữ liệu và xử lý nó đồng bộ, thay vì giữ tin nhắn và đợi quá trình xử lý hoàn tất.

Hãy tiếp tục viết mã xử lý logic tin nhắn của consumer như sau:

record KafkaMessage(string Key, int Partition, LeaveApplicationReceived Message);
var leaveApplicationReceivedMessages = new Queue<KafkaMessage>();

using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
using var consumer = new ConsumerBuilder<string, LeaveApplicationReceived>(consumerConfig)
    .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
    .SetValueDeserializer(new AvroDeserializer<LeaveApplicationReceived>(schemaRegistry).AsSyncOverAsync())
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .Build();
{
    try
    {
        consumer.Subscribe("leave-applications");
        Console.WriteLine("Consumer loop started...\n");
        while (true)
        {
            try
            {
                // We will give the process 1 second to commit the message and store its offset.
                var result = consumer.Consume(TimeSpan.FromMilliseconds(consumerConfig.MaxPollIntervalMs - 1000));
                var leaveRequest = result?.Message?.Value;
                if (leaveRequest == null)
                {
                    continue;
                }

                // Adding message to a list just for the demo.
                // You should persist the message in database and process it later.
                leaveApplicationReceivedMessages.Add(new KafkaMessage(result.Message.Key, result.Partition.Value, result.Message.Value));

                consumer.Commit(result);
                consumer.StoreOffset(result);
            }
            catch (ConsumeException e) when (!e.Error.IsFatal)
            {
                Console.WriteLine($"Non fatal error: {e}");
            }
        }
    }
    finally
    {
        consumer.Close();
    }
}

Hãy thảo luận chi tiết về đoạn mã trên. Chúng ta đã tạo một thể hiện của lớp CachedSchemaRegistryClient để truy cập vào Schema Registry. Tiếp theo, chúng ta tạo một thể hiện của lớp Consumer, lớp này thực thi interface IConsumer. Kafka đưa ra khả năng consumer tin nhắn thông qua interface IConsumer. Vì consumer chỉ cần hiểu cơ chế giải mã khóa và giá trị của tin nhắn, chúng ta đã gắn trình deserializer Avro cho khóa và giá trị vào IConsumer.

Để liên kết consumer với một topic, bạn cần gọi phương thức Subscribe với tên của topic. Bạn cũng có thể chuyển một danh sách tên topic sang một phương thức quá tải khác của Subscribe nếu bạn quan tâm đến việc sử dụng đồng thời các tin nhắn từ một số topic.

Sau khi phương thức Subscribe được gọi, ứng dụng Kafka .NET client tạo ra một số luồng nền có thể tìm nạp trước các tin nhắn từ các topic của consumer. Bạn có thể truy xuất từng tin nhắn một bằng cách gọi phương thức Consume.

Phương thức Consume phải được áp dụng trong một vòng lặp để nhận được tất cả các tin nhắn. Trong trường hợp xảy ra lỗi trong quá trình xử lý, phương thức Close sẽ giữ nguyên offset trong bộ nhớ và hướng dẫn Kafka cân bằng lại các phân vùng để các phân vùng gắn với consumer này được phân bổ cho một client khác.

Phương thức Commit sẽ commit offset của tin nhắn được xử lý, và phương thức  StoreOffset ngay lập tức ghi lại offsets trong Kafka. Commit và lưu trữ các giá trị chênh lệch thường xuyên là cách tiếp cận ưa thích của tôi vì nó đảm bảo rằng chúng ta sẽ không xử lý lại một số thông báo sau khi gián đoạn dịch vụ.

Service Manager yêu cầu thu thập sự chấp thuận của người quản lý cho mỗi đơn xin nghỉ việc có thể mất một khoảng thời gian vô hạn và xuất bản kết quả của đơn đăng ký cho topic leave-applications-results. Quá trình nhận được sự chấp thuận của người quản lý và publish các sự kiện phải diễn ra song song với việc consume các thông điệp được quản lý. Các tác vụ của producer và consumer được thực hiện song song bằng cách sử dụng Task trong  C#.

Hãy khởi chạy ứng dụng và xác minh xem nó có xử lý các tin nhắn mà chúng ta đã publish tới topic trước đó hay không (bài viết trước).

Service Manager nhận tin nhắn

Tái cân bằng phân vùng

Chúng ta đã thảo luận trước đó rằng bất kỳ sự thay đổi nào về số lượng phân vùng hoặc consumer trong một nhóm consumer đều dẫn đến việc Kafka tái cân bằng quyền sở hữu của các phân vùng đối với consumer. Để kiểm tra hành vi này, hãy khởi chạy ba phiên bản của ứng dụng console của service Manager.

Tiếp theo, hãy bắt đầu một phiên bản của ứng dụng console của service Employee và gửi ba đơn xin nghỉ việc, mỗi đơn từ mỗi bộ phận. Bạn sẽ thấy rằng các sự kiện nằm trong các phiên bản ứng dụng dành cho consumer được chỉ định của Kafka. Lưu ý rằng hành vi này được quản lý nội bộ bởi Kafka và do đó bạn có thể thấy hành vi phân bổ khác với hành vi của tôi.

Quyền sở hữu phân vùng của consumer

Hãy gửi thêm ba đơn xin nghỉ việc từ service Employee. Kafka sẽ đảm bảo rằng các tin nhắn tiếp theo trên một phân vùng chỉ đến được ứng dụng client tương ứng.

Consumer nhận tập tin nhắn con từ phân vùng

Hãy thử tắt một số phiên bản của service Manager và bắt đầu những phiên bản mới để xem cách Kafka cân bằng lại quyền sở hữu của các phân vùng cho client. Ngoài ra, vui lòng thử tạo một sự kiện thăm dò ​​nhóm consumer khác từ cùng một topic Kafka bằng cách tạo một bản sao của service Manager và gán cho nó một GroupId khác. Bạn sẽ thấy rằng các nhóm consumer hoàn toàn độc lập với nhau và sự thay đổi cấu trúc liên kết của một nhóm consumer này không ảnh hưởng đến nhóm consumer khác.

Service Result Reader

Dịch vụ đọc kết quả (Service Result Reader) là một ứng dụng đơn giản sử dụng Consumer API của Kafka để thăm dò ​​về topic leave-applications-results và hiển thị các sự kiện leave application processed trên console. Ứng dụng này được đặt tên là TimeOff.ResultReader, dưới đây là mã của ứng dụng.

private static void Main()
{
    CachedSchemaRegistryClient cachedSchemaRegistryClient = null;
    KafkaAvroDeserializer<string> kafkaAvroKeyDeserializer = null;
    KafkaAvroDeserializer<LeaveApplicationProcessed> kafkaAvroValueDeserializer = null;

    Console.WriteLine("TimeOff Results Terminal\n");

    var configuration = new ConfigurationBuilder()
        .AddJsonFile("appsettings.json", true, true)
        .Build();

    var configReader = new ConfigReader(configuration);

    var schemaRegistryConfig = configReader.GetSchemaRegistryConfig();
    var consumerConfig = configReader.GetConsumerConfig();

    if (configReader.IsLocalEnvironment)
    {
        cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
    }
    else
    {
        var schemaRegistryClientAz =
            new SchemaRegistryClient(configuration["SchemaRegistryUrlAz"], new DefaultAzureCredential());
        var schemaGroupName = configuration["SchemaRegistryGroupNameAz"];
        kafkaAvroKeyDeserializer =
            new KafkaAvroDeserializer<string>(schemaRegistryClientAz, schemaGroupName);
        kafkaAvroValueDeserializer =
            new KafkaAvroDeserializer<LeaveApplicationProcessed>(schemaRegistryClientAz, schemaGroupName);
    }

    using var consumer = new ConsumerBuilder<string, LeaveApplicationProcessed>(consumerConfig)
        .SetKeyDeserializer(configReader.IsLocalEnvironment
            ? new AvroDeserializer<string>(cachedSchemaRegistryClient).AsSyncOverAsync()
            : kafkaAvroKeyDeserializer)
        .SetValueDeserializer(configReader.IsLocalEnvironment
            ? new AvroDeserializer<LeaveApplicationProcessed>(cachedSchemaRegistryClient).AsSyncOverAsync()
            : kafkaAvroValueDeserializer)
        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
        .Build();
    {
        try
        {
            Console.WriteLine("");
            consumer.Subscribe(ApplicationConstants.LeaveApplicationResultsTopicName);
            while (true)
            {
                var result = consumer.Consume();
                var leaveRequest = result.Message.Value;
                Console.WriteLine(
                    $"Received message: {result.Message.Key} Value: {JsonSerializer.Serialize(leaveRequest)}");
                consumer.Commit(result);
                consumer.StoreOffset(result);
                Console.WriteLine("\nOffset committed");
                Console.WriteLine("----------\n\n");
            }
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Consume error: {e.Error.Reason}");
        }
        finally
        {
            consumer.Close();
        }
    }
}

Ảnh chụp màn hình sau đây trình bày kết quả của việc chạy ứng dụng này để xem tình trạng của các đơn xin nghỉ việc mà chúng ta đã gửi cho đến thời điểm hiện tại:

Chạy ứng dụng Result Reader

Phần kết luận

Trong bài viết này, chúng ta đã học cách sử dụng Consumer API của Kafka để xây dựng consumer xử lý tin nhắn. Chúng ta đã sử dụng Schema Registry để quản lý lược đồ của các tin nhắn được sử dụng. Cuối cùng, chúng ta đã kiểm tra cách Kafka cân bằng lại các phân vùng và xây dựng service Result Reader để hoàn thành ứng dụng của chúng ta.

Giờ đây, chúng ta đã có một ứng dụng hướng sự kiện hoàn chỉnh sử dụng Kafka làm messaging backplane. Trong bài viết tiếp theo, chúng ta sẽ tìm hiểu về những thay đổi mà chúng ta cần thực hiện đối với ứng dụng của mình để sử dụng Azure Event Hubs làm messaging backplane.Azure Event Hubs hỗ trợ các API của Publisher và Consumer Kafka, vì vậy quá trình này sẽ dễ dàng.

Tạo Event Driven Architecture với Apache Kafka bằng .NET: Azure Event Hubs
Trong bài viết này, chúng ta sẽ mở rộng ứng dụng của mình sử dụng Event Hubs để nhắn tin thay thế cho Kafka.
Event Driven ArchitectureSoftware ArchitectureApache Kafka
Bài Viết Liên Quan:
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Azure Event Hubs
Trung Nguyen 01/05/2021
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Azure Event Hubs

Trong bài viết này, chúng ta sẽ mở rộng ứng dụng của mình sử dụng Event Hubs để nhắn tin thay thế cho Kafka.

Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Producer
Trung Nguyen 01/05/2021
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Producer

Trong bài viết này, chúng ta sẽ tìm hiểu cách chuẩn bị môi trường cục bộ để phát triển và xuất bản tin nhắn tới Kafka bằng .NET.

Event Driven Architecture: Queue vs Log - Nghiên cứu điển hình
Trung Nguyen 30/04/2021
Event Driven Architecture: Queue vs Log - Nghiên cứu điển hình

Trong bài viết này, chúng ta sẽ tìm hiểu một số case study về hệ thống dựa trên queue (RabbitMQ) và log (Kafka) trong kiến trúc Event Driven Architecture.

Event Driven Architecture: Queue vs Log
Trung Nguyen 30/04/2021
Event Driven Architecture: Queue vs Log

Trong hướng dẫn này, chúng tôi sẽ so sánh hệ thống tin nhắn dựa trên queue và dựa trên log trong kiến trúc hệ thống hướng sự kiện (event driven architecture)