Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Producer

Trong loạt bài này:

  1. Môi trường phát triển và Event Producer (bài viết này)
  2. Event Consumer.
  3. Tích hợp Azure Event Hubs.

Kiến trúc hướng sự kiện (Event Driven Architecture) sử dụng các sự kiện để kích hoạt và giao tiếp giữa các microservices. Sự kiện là sự thay đổi trạng thái của service, chẳng hạn như một mặt hàng được thêm vào giỏ hàng. Khi một sự kiện xảy ra, service sẽ tạo ra một thông báo sự kiện là một gói thông tin về sự kiện đó.

Kiến trúc bao gồm một event producer, một bộ định tuyến sự kiện và một event consumer. Producer gửi các sự kiện đến bộ định tuyến và consumer nhận các sự kiện từ bộ định tuyến.

Tùy thuộc vào khả năng, bộ định tuyến có thể đẩy các sự kiện đến consumer hoặc gửi các sự kiện đến consumer theo yêu cầu. Các service của producer và consumer được tách biệt, điều này cho phép chúng mở rộng quy mô, triển khai và cập nhật một cách độc lập.

Apache Kafka là một trong những nền tảng phát trực tuyến sự kiện nguồn mở phổ biến nhất. Nó có thể mở rộng theo chiều ngang, phân tán và chịu lỗi. Mô hình lập trình của Kafka dựa trên mô hình publish - subscribe.

Với Kafka, các publisher gửi tin nhắn đến các topic được gọi là các kênh logic. Subcriber đăng ký một topic sẽ nhận được tất cả các tin nhắn được xuất bản cho topic đó.

Trong kiến ​​trúc hướng sự kiện, Kafka được sử dụng như một bộ định tuyến sự kiện và các microservices publish và subscribe các sự kiện.

Trong bài viết này, chúng ta sẽ tìm hiểu cách chuẩn bị môi trường cục bộ để phát triển và xuất bản tin nhắn tới Kafka. Các bài viết tiếp theo của tôi sẽ tập trung vào việc xây dựng các thành phần của một ứng dụng end-to-end giúp bạn xây dựng các microservices hướng sự kiện.

Các thành phần Kafka

Tôi sẽ thảo luận ngắn gọn về các thành phần của Kafka có liên quan đến việc chúng ta sử dụng Kafka làm message broker. Ngoài mô hình publish-subscribe, Kafka cũng hỗ trợ các API:

  • Streams API để chuyển dữ liệu từ topic này sang topic khác.
  • Connect API giúp bạn triển khai các trình kết nối kéo dữ liệu từ các hệ thống bên ngoài vào Kafka hoặc đẩy dữ liệu từ Kafka ra bên ngoài các hệ thống.

Các API này nằm ngoài phạm vi của bài viết này. Để hiểu chi tiết về kiến ​​trúc của Kafka, vui lòng đọc bài viết giới thiệu về Kafka trên trang web Confluent docs.

Chúng ta hiểu rằng Kafka hoạt động như một trung gian cho phép trao đổi thông tin từ producer đến consumer. Kafka có thể được thiết lập trên nhiều máy chủ, được gọi là Kafka broker. Với nhiều broker, bạn nhận được lợi ích của việc sao chép dữ liệu, khả năng chịu lỗi và tính khả dụng cao của cụm Kafka của bạn.

Sau đây là thiết kế hệ thống cấp cao của một cụm Kafka:

Kafka Cluster
Kafka Cluster

Siêu dữ liệu của các quy trình cụm Kafka được lưu trữ trong một hệ thống độc lập có tên Apache Zookeeper . Zookeeper giúp Kafka thực hiện một số chức năng quan trọng, chẳng hạn như bầu người lãnh đạo trong trường hợp nút bị lỗi. Nó cũng duy trì danh sách người tiêu dùng trong một nhóm người tiêu dùng và quản lý danh sách kiểm soát truy cập của các chủ đề Kafka.

Sự phân tách cấp độ đầu tiên của các sự kiện/tin nhắn trong Kafka xảy ra thông qua một đối tượng Kafka được gọi là topic. Event producer xuất bản các sự kiện theo một topic mà sau đó Kafka có thể chuyển cho những consumer quan tâm.

Hãy nghĩ về một topic như một tập hợp các hàng đợi FIFO (First In First Out). Bạn có thể lưu trữ ngẫu nhiên một tin nhắn vào một trong các hàng đợi hoặc đặt các tin nhắn có liên quan vào một hàng đợi duy nhất để đảm bảo FIFO.

Mỗi hàng đợi trong một topic được gọi là một phân vùng chủ đề (topic partition). Mỗi tin nhắn trong hàng đợi được đặt ở một vị trí duy nhất được gọi là phần offset.

Partition và offset
Partition và offset

Bạn có thể kết hợp nhiều consumer trong một nhóm consumer (consumer group) để mở rộng mức xử lý tin nhắn từ một topic. Một nhóm consumer được xác định thông qua một id nhóm duy nhất. Kafka cân bằng việc phân bổ các phân vùng giữa những consumer cá nhân của một nhóm consumer để tránh việc xử lý trùng lặp các tin nhắn.

Sau khi consumer sử dụng một tin nhắn được lưu trữ tại vị trí offset, nó sẽ commit tin nhắn cho Kafka rằng chúng đã xử lý xong. Đối với yêu cầu tiếp theo, consumer sẽ nhận được tin nhắn ở vị trí offset tiếp theo và cứ tiếp tục như vậy.

Thiết lập môi trường cục bộ

Việc thiết lập môi trường phát triển để làm việc với Kafka khá dễ dàng với Docker Compose. Bạn có thể chia sẻ thông số kỹ thuật Docker Compose với các nhà phát triển khác trong nhóm của mình để đảm bảo tính nhất quán của môi trường. Chúng ta sẽ sử dụng Docker Compose để thiết lập một cụm Kafka bao gồm các thành phần sau:

  1. Apache Zookeeper: Sự phụ thuộc của Zookeeper sẽ bị xóa khỏi Kafka trong tương lai bởi một số nhà cung cấp như Confluent. Đọc tài liệu mới nhất từ ​​nhà cung cấp mà bạn định sử dụng cho Kafka.
  2. Kafka
  3. Kafdrop: Kafdrop là một giao diện người dùng dựa trên web phổ biến để xem các topic Kafka và duyệt các nhóm consumer. Nó làm cho cụm Kafka của bạn có thể quan sát được, giúp bạn chẩn đoán các vấn đề và giúp bạn phát triển.
  4. Schema Registry: là một dịch vụ nằm bên ngoài cụm của bạn và cho phép các nhà phát triển quản lý các lược đồ tin nhắn. Kafka hỗ trợ các tin nhắn ở các định dạng Avro, JSON và Protobuf. Schema Registry hỗ trợ lưu trữ và truy xuất các lược đồ được phiên bản ở tất cả các định dạng đó. Bạn có thể đọc thêm về đăng ký lược đồ trên trang web tài liệu hợp lưu.

Một số nhà cung cấp xuất bản Docker image cho Zookeeper và Kafka với sự khác biệt nhỏ về hành vi và cấu hình. Tôi thường sử dụng các bản phân phối từ Bitnami. Tuy nhiên, bạn cũng có thể sử dụng các bản phân phối từ Confluent, SpotifyWurstmeister. Bitnami và Confluent xây dựng và kiểm tra image hàng đêm và chúng cũng tương thích với nhau, vì vậy tôi khuyên bạn nên sử dụng chúng.

Tạo một tệp có tên là docker-compose.yml và điền vào tệp với nội dung sau:

version: "2"

networks:
  kafka-net:
    driver: bridge

services:
  zookeeper-server:
    image: bitnami/zookeeper:latest
    networks:
      - kafka-net
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafdrop:
    image: obsidiandynamics/kafdrop
    networks:
      - kafka-net
    restart: "no"
    ports:
      - 9000:9000
    environment:
      KAFKA_BROKERCONNECT: PLAINTEXT://kafka-server:29092
      JVM_OPTS: -Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify
      SCHEMAREGISTRY_CONNECT: http://schema-registry:8081
    depends_on:
      - kafka-server
  kafka-server:
    image: bitnami/kafka:latest
    networks:
      - kafka-net
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server:29092,PLAINTEXT_HOST://127.0.0.1:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper-server
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    networks:
      - kafka-net
    ports:
      - 8081:8081
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka-server:29092
      - SCHEMA_REGISTRY_HOST_NAME=localhost
      - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
    depends_on:
      - kafka-server

Chúng ta đã tạo các dịch vụ mà chúng ta đã thảo luận trước đó trong cùng một mạng để có khả năng khám phá. Các giá trị cài đặt biến môi trường của các dịch vụ được đặt với các giá trị được đề xuất của chúng, vì vậy hãy sửa đổi chúng một cách thận trọng.

Từ thiết bị đầu cuối của bạn, hãy chuyển sang thư mục nơi bạn đã lưu trữ tệp này và chạy lệnh sau để bắt đầu các dịch vụ:

docker-compose up -d

Sau khi cài đặt xong, hãy thực hiện lệnh sau để xác minh xem các dịch vụ có hoạt động tốt hay không.

docker-compose ps

Ảnh chụp màn hình sau đây mô tả đầu ra của lệnh:

Cài đặt kafka bằng Docker Compose

Tiện ích giám sát Kafdrop của chúng ta có thể truy cập tại http://localhost:9000.

Kafdrop

Bạn chỉ có thể xem hai topic đặc biệt tại thời điểm này là: topic __consumer_offsets ghi lại các offset được xử lý bởi consumer và topic _schemas mà Schema registry sử dụng để lưu trữ các lược đồ được phiên bản của tin nhắn. Bạn sẽ thấy nhiều topic hơn ở đây khi chúng ta tạo chúng theo chương trình.

Ứng dụng Demo: TimeOff

Để khám phá cách chúng ta có thể sử dụng Kafka để xây dựng các ứng dụng hướng sự kiện, chúng ta sẽ xây dựng một hệ thống quản lý nhân viên nghỉ việc rất đơn giản. Ứng dụng của chúng ta bao gồm các dịch vụ sau:

  1. Employee service: Một nhân viên có thể sử dụng dịch vụ này để nộp đơn xin nghỉ việc. Dịch vụ này sẽ gửi đơn xin nghỉ việc đã nhận vào một topic là leave-applications.
  2. Manager service: Dịch vụ này sử dụng các sự kiện từ topic leave-applications và ghi lại thông tin đầu vào của người quản lý. Kết quả của ứng dụng được gửi dưới dạng một sự kiện có tên đơn xin nghỉ việc được xử lý tới topic leave-applications-results.
  3. Result reader service: Dịch vụ này hiển thị các đơn xin nghỉ việc đã được phê duyệt hoặc chưa được phê duyệt bằng cách sử dụng các tin nhắn từ topic Kafka leave-applications-results.

Sau đây là sơ đồ trình tự của sự tương tác giữa các dịch vụ:

Sơ đồ luồng nghiệp vụ giữa các service

Ví dụ về producer: Employee Service

Sử dụng Visual Studio hoặc VS Code để tạo ứng dụng .NET Core Console mới và đặt tên là TimeOff.Employee.

Trước khi chúng ta tiếp tục, tôi muốn thảo luận ngắn gọn về các định dạng tin nhắn có sẵn trong Kafka. Kafka hỗ trợ các tin nhắn có định dạng Avro, Protobuf và JSON. Các định dạng tin nhắn này cũng được hỗ trợ bởi Schema Registry.

Avro là một định dạng được ưu tiên hơn những định dạng khác nếu tất cả các dịch vụ trong hệ sinh thái có thể hỗ trợ nó. Bạn có thể đọc về lý do tại sao Avro là định dạng tuần tự hóa dữ liệu tốt hơn cho dữ liệu luồng trên trang web Confluent.

Để cho phép ứng dụng của chúng ta hoạt động với Kafka, bạn cần có Kafka .NET Client. Ngoài ra, vì chúng ta sẽ sử dụng Confluent Schema Registry để thực thi hợp đồng giữa producer và consumer, nên chúng ta cần serializer (cho producer) và deserializer (cho consumer) cho các ứng dụng của chúng ta.

Chúng ta sẽ sử dụng các tin nhắn có định dạng Avro trong ứng dụng của mình và vì vậy chúng ta sẽ cài đặt Avro serializer trong dự án của mình. Sử dụng các lệnh sau để cài đặt các gói NuGet cần thiết trong dự án của bạn:

Install-Package Confluent.Kafka
Install-Package Confluent.SchemaRegistry.Serdes.Avro

Mở tệp lớp Program trong trình soạn thảo của bạn và bắt đầu viết code phương thức Main theo chỉ dẫn. Hãy bắt đầu với việc khởi tạo Admin client (IAdminClient) để tạo topic, Producer client ( IProducer) để publish tin nhắn tới Kafka và Schema Registry client ( CachedSchemaRegistryClient) để thực thi các ràng buộc lược đồ đối với producer.

Mỗi client yêu cầu các tham số khởi tạo nhất định, chẳng hạn như máy chủ Bootstrap, là danh sách các broker mà client sẽ kết nối ban đầu. Sau kết nối ban đầu, client tự động phát hiện ra phần còn lại của các broker. Schema registry yêu cầu địa chỉ của server Schema Registry. Sử dụng mã sau để tạo cấu hình sẽ được sử dụng để khởi tạo client.

var adminConfig = new AdminClientConfig 
{ 
    BootstrapServers = "127.0.0.1:9092" 
};
var schemaRegistryConfig = new SchemaRegistryConfig 
{ 
    Url = "http://127.0.0.1:8081" 
};
var producerConfig = new ProducerConfig
{
    BootstrapServers = "127.0.0.1:9092",
    // Guarantees delivery of message to topic.
    EnableDeliveryReports = true,
    ClientId = Dns.GetHostName()
};

Vui lòng truy cập trang web Tài liệu hợp lưu để đọc thêm vềcấu hình Admin client, cấu hình Producercấu hình Schema registry.

Đầu tiên chúng ta hãy tạo topic sẽ nhận được tin nhắn của chúng ta. Thêm mã sau vào chương trình của bạn để tạo một topic mới có tên leave-applications với ba phân vùng.

using var adminClient = new AdminClientBuilder(adminConfig).Build();
try
{
    await adminClient.CreateTopicsAsync(new[]
    {
        new TopicSpecification
        {
            Name = "leave-applications",
            ReplicationFactor = 1,
            NumPartitions = 3
        }
    });
}
catch (CreateTopicsException e) when (e.Results.Select(r => r.Error.Code)
    .Any(el => el == ErrorCode.TopicAlreadyExists))
{
    Console.WriteLine($"Topic {e.Results[0].Topic} already exists");
}

Chắc hẳn bạn đang thắc mắc tại sao chúng ta lại tạo ra ba phân vùng? Chúng ta muốn khám phá cách producer có thể viết vào các phân vùng khác nhau của một topic.

Một nhân viên thuộc một bộ phận, vì vậy chúng ta sẽ tạo một phân vùng cho từng bộ phận trong topic leave-applications. Đơn xin nghỉ việc của nhân viên sẽ được xếp hàng tuần tự trong từng bộ phận. Hãy tạo một enum có tên Department để sử dụng sau này trong logic của producer.

public enum Departments : byte
{
    HR = 0,
    IT = 1,
    OPS = 2
}

Bây giờ chúng ta đã sẵn sàng để ghi vào topic của chúng ta. Nhưng trước khi làm điều đó, chúng ta cần xác định lược đồ của tin nhắn mà chúng ta sẽ viết cho chủ đề của mình.

Lược đồ Avro và Avrogen

Tạo một tệp lược đồ Avro có tên là LeaveApplicationReceived.avsc trong một project class library có tên TimeOff.Models để chúng ta có thể chia sẻ tệp đó giữa producer và consumer. Thêm đặc tả lược đồ sau vào tệp:

{
  "namespace": "TimeOff.Models",
  "type": "record",
  "name": "LeaveApplicationReceived",
  "fields": [
    {
      "name": "EmpEmail",
      "type": "string"
    },
    {
      "name": "EmpDepartment",
      "type": "string"
    },
    {
      "name": "LeaveDurationInHours",
      "type": "int"
    },
    {
      "name": "LeaveStartDateTicks",
      "type": "long"
    }
  ]
}

Chúng ta sẽ chuyển đổi lược đồ Avro thành tệp lớp C# để nó được hiểu bởi các trình serializer và deserializer của .NET Core Avro. Chúng ta sẽ sử dụng công cụ avrogen từ Confluent để tự động tạo tệp lớp C# từ đặc tả Avro của chúng ta.

Để cài đặt công cụ avrogen, hãy thực hiện lệnh sau:

dotnet tool install --global Apache.Avro.Tools

Tiếp theo, trong thiết bị đầu cuối của bạn, thay đổi thư mục chứa tệp LeaveApplicationReceive.avsc và thực hiện lệnh sau để tạo tệp C#.

avrogen -s LeaveApplicationReceived.avsc . --namespace TimeOff.Models:TimeOff.Models

Sao chép tệp được tạo LeaveApplicationReceived.cs từ thư mục đầu ra và dán vào thư mục gốc của project.

Message Producer

Hãy quay lại lớp Program và tiếp tục chỉnh sửa phương thức Main để viết logic trình tạo thông báo (Message Producer) như sau:

using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
using var producer = new ProducerBuilder<string, LeaveApplicationReceived>(producerConfig)
    .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
    .SetValueSerializer(new AvroSerializer<LeaveApplicationReceived>(schemaRegistry))
    .Build();
while (true)
{
    var empEmail = ReadLine.Read("Enter your employee Email (e.g. none@example-company.com): ",
        "none@example.com").ToLowerInvariant();
    var empDepartment = ReadLine.Read("Enter your department code (HR, IT, OPS): ").ToUpperInvariant();
    var leaveDurationInHours =
        int.Parse(ReadLine.Read("Enter number of hours of leave requested (e.g. 8): ", "8"));
    var leaveStartDate = DateTime.ParseExact(ReadLine.Read("Enter vacation start date (dd-mm-yy): ",
        $"{DateTime.Today:dd-MM-yy}"), "dd-mm-yy", CultureInfo.InvariantCulture);

    var leaveApplication = new LeaveApplicationReceived
    {
        EmpDepartment = empDepartment,
        EmpEmail = empEmail,
        LeaveDurationInHours = leaveDurationInHours,
        LeaveStartDateTicks = leaveStartDate.Ticks
    };
    var partition = new TopicPartition(
        ApplicationConstants.LeaveApplicationsTopicName,
        new Partition((int) Enum.Parse<Departments>(empDepartment)));
    var result = await producer.ProduceAsync(partition,
        new Message<string, LeaveApplicationReceived>
        {
            Key = $"{empEmail}-{DateTime.UtcNow.Ticks}",
            Value = leaveApplication
        });
    Console.WriteLine(
        $"\nMsg: Your leave request is queued at offset {result.Offset.Value} in the Topic {result.Topic}:{result.Partition.Value}\n\n");
}
Readline là một thư viện .NET đơn giản cung cấp trải nghiệm nhập liệu bằng bàn phím phong phú cho người dùng ứng dụng console.

Chúng ta hãy cùng nhau xem qua mã này. Chúng ta đã tạo một thể hiện của lớp CachedSchemaRegistryClient, cho phép chúng ta truy cập sổ đăng ký lược đồ. Kafka đưa ra các thể hiện của nhà sản xuất tin nhắn thông qua interface IProducer.

Chúng ta đã nhúng các trình serializer Avro cho khóa và giá trị vào thể hiện của IProducer. Trình serializer Avro sử dụng đăng ký lược đồ client để đăng ký một lược đồ mới và chúng ghi lại id lược đồ với tin nhắn được gửi đến topic Kafka.

Việc CachedSchemaRegistryClient duy trì một bộ đệm ẩn cục bộ của các lược đồ để xác thực nhằm giảm thiểu số lượng lệnh gọi đến Schema Registry. Phương thức ProduceAsync chấp nhận các chỉ số phân vùng và tin nhắn để gửi tin nhắn đến các phân vùng có liên quan của topic.

Bây giờ chúng ta hãy thực thi ứng dụng để ghi lại một vài đơn xin nghỉ việc như sau:

Chạy ứng dụng TimeOff

Hãy sử dụng Kafdrop để xem lược đồ mới được đăng ký.

Xem lược đồ trong Kafdrop

Hãy xem các tin nhắn mới được thêm vào.

Xem tin nhắn trong Kafdrop

Phần kết luận

Trong bài viết này, chúng ta đã tìm hiểu những kiến ​​thức cơ bản về Kafka với tư cách là message mediator. Chúng ta đã thiết lập môi trường Kafka cục bộ và học cách sử dụng Schema Registry và Kafka Producer API để gửi tin nhắn đến một topic Kafka. Chúng ta đã sử dụng Kafdrop để kiểm tra lược đồ và các tin nhắn trong Kafka.

Trong bài viết sau, chúng ta sẽ tìm hiểu cách ghi một tin nhắn cho consumer bằng cách sử dụng Kafka Consumer API.

Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Consumer
Trong bài viết này, chúng ta sẽ tìm hiểu cách tạo consumer xử lý tin nhắn sử dụng Consumer API của Kafka bằng .NET.
Event Driven ArchitectureMicroservicesApache KafkaSoftware ArchitectureDockerLập Trình C#
Bài Viết Liên Quan:
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Azure Event Hubs
Trung Nguyen 01/05/2021
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Azure Event Hubs

Trong bài viết này, chúng ta sẽ mở rộng ứng dụng của mình sử dụng Event Hubs để nhắn tin thay thế cho Kafka.

Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Consumer
Trung Nguyen 01/05/2021
Tạo Event Driven Architecture với Apache Kafka bằng .NET: Event Consumer

Trong bài viết này, chúng ta sẽ tìm hiểu cách tạo consumer xử lý tin nhắn sử dụng Consumer API của Kafka bằng .NET.

Event Driven Architecture: Queue vs Log - Nghiên cứu điển hình
Trung Nguyen 30/04/2021
Event Driven Architecture: Queue vs Log - Nghiên cứu điển hình

Trong bài viết này, chúng ta sẽ tìm hiểu một số case study về hệ thống dựa trên queue (RabbitMQ) và log (Kafka) trong kiến trúc Event Driven Architecture.

Event Driven Architecture: Queue vs Log
Trung Nguyen 30/04/2021
Event Driven Architecture: Queue vs Log

Trong hướng dẫn này, chúng tôi sẽ so sánh hệ thống tin nhắn dựa trên queue và dựa trên log trong kiến trúc hệ thống hướng sự kiện (event driven architecture)