Xây dựng Microservices bằng ASP.NET Core – Transaction Outbox với RabbitMQ

Xây dựng Microservices bằng ASP.NET Core – Transaction Outbox với RabbitMQ

Danh sách các bài viết:

  • Phần 1: Lập kế hoạch.
  • Phần 2: Định hình kiến ​​trúc Microservices với CQRS và MediatR.
  • Phần 3: Khám phá dịch vụ với Eureka.
  • Phần 4: Xây dựng API Gateway với Ocelot.
  • Phần 5: Thao tác với database PostgreSQL bằng Marten.
  • Phần 6: Giao tiếp máy chủ thời gian thực với SignalR và RabbitMQ.
  • Phần 7: Transaction outbox với RabbitMQ (bài viết này).

Giờ đây, các dịch vụ microservices của chúng ta giao tiếp thông qua việc xuất bản thông tin về các sự kiện kinh doanh quan trọng như: sản phẩm được kích hoạt hoặc ngừng cung cấp, chính sách đang được bán hoặc chấm dứt, thanh toán được thực hiện hoặc bỏ lỡ.

Tất cả đều có vẻ tuyệt vời, nhưng chúng ta đã bỏ lỡ một số điều quan trọng – tính nhất quán của dữ liệu và ranh giới transaction.

Khi chúng ta quyết định đi theo con đường sử dụng kiến trúc microservices, trong đó mỗi dịch vụ là một ứng dụng độc lập với kho dữ liệu riêng, chúng ta đã rời khỏi vùng đất an toàn của transaction trong cơ sở dữ liệu mà chúng ta có trong các giải pháp nguyên khối của mình.

Không còn ACID?

Hãy xem đoạn mã sau:

public async Task Handle(CreatePolicyCommand request, CancellationToken cancellationToken)
{
    using (var uow = uowProvider.Create())
    {
        var offer = await uow.Offers.WithNumber(request.OfferNumber);
        var customer =  …;
        var policy = offer.Buy(customer);

        uow.Policies.Add(policy);
        await uow.CommitChanges();  

        await eventPublisher.PublishMessage(PolicyCreated(policy));

        return new CreatePolicyResult
        {
            PolicyNumber = policy.Number
        };    
    }
}

Như bạn có thể thấy, chúng ta commit các thay đổi trong Unit of Work trước khi gọi  phương thức PublishMessage của nhà xuất bản sự kiện. Tại sao lại làm vậy?

Hãy tưởng tượng chúng ta thay đổi thứ tự của các hoạt động, chúng ta thêm các đối tượng miền mới vào Unit of Work. Chúng ta xuất bản tin nhắn tới message broker (trong trường hợp của chúng ta là RabbitMQ) và cuối cùng chúng ta commit các thay đổi dẫn đến việc thực thi các câu lệnh INSERT và UPDATE thực tế đối với cơ sở dữ liệu của chúng ta.

Nếu có gì sai – sự cố kết nối cơ sở dữ liệu, vi phạm ràng buộc cơ sở dữ liệu, bế tắc hoặc bất cứ điều gì – thì giao dịch cơ sở dữ liệu của chúng ta sẽ được khôi phục và không có thay đổi nào được duy trì.

Điều đó thật tuyệt, nhưng còn những tin nhắn chúng ta đã gửi tới message broker của mình thì sao? Chúng đã được gửi đi khiến toàn bộ hệ thống ở trạng thái không nhất quán.

Trong trường hợp này, PolicyService của chúng ta không thể tạo và lưu trữ chính sách với số lượng nhất định, nhưng tất cả các dịch vụ khác đều được thông báo rằng chính sách đó đã được tạo. Điều này không đúng.

Vì vậy, chúng tôi đã quay lại thứ tự ban đầu. Bây giờ, nếu có bất kỳ sự cố nào xảy ra trong quá trình lưu thay đổi vào cơ sở dữ liệu, một ngoại lệ sẽ được ném ra, các thay đổi được khôi phục và trình xuất bản  sự kiện không được thực thi.

Nhưng điều gì sẽ xảy ra nếu message broker không khả dụng hoặc chúng tôi gặp sự cố mạng và việc xuất bản tin nhắn gây ra ngoại lệ?

Bây giờ chúng ta có chính sách mới trong  cơ sở dữ liệu PolicyService, nhưng các dịch vụ khác không biết về nó, vì vậy trạng thái của toàn bộ hệ thống  cuối cùng sẽ không nhất quán.

Chúng ta có thể làm gì để giải quyết vấn đề này? Tôi biết RabbitMQ chạy trên OTP và được viết bằng Erlang. Nhưng ngay cả các chương trình Erlang cũng có lỗi và không miễn nhiễm 100% với các lỗi cấu hình và mạng hoặc lỗi phần cứng.

Chúng ta có thể bắt đầu với việc bọc mã chịu trách nhiệm gửi với một số loại cơ chế thử lại (retry) bằng cách sử dụng thư viện như Polly, nhưng điều này không giải quyết được vấn đề hoàn toàn, nó chỉ làm giảm xác suất thất bại.

Có một giải pháp tốt hơn – sử dụng mẫu hộp thư đi (outbox pattern).

Mẫu hộp thư đi

Mẫu hộp thư đi (outbox pattern) là một công cụ đơn giản nhưng mạnh mẽ. Hãy xem nó hoạt động như thế nào và kết quả của việc áp dụng nó là gì.

Với outbox pattern, thay vì gửi tin nhắn tới message broker, chúng ta lưu tin nhắn vào cơ sở dữ liệu microservices của mình như một phần của giao dịch kinh doanh hiện tại.

Bằng cách này, chúng ta đạt được sự nhất quán nội bộ bên trong dịch vụ của mình. Sẽ không có tin nhắn nào được gửi nếu một giao dịch được khôi phục.

Bây giờ khi tin nhắn của chúng ta được lưu trữ trong hộp thư đi, chúng ta cần một quy trình sẽ chuyển tiếp nó đến message broker. Đó là phần thứ hai của outbox pattern – một quy trình chạy không đồng bộ và cố gắng gửi tin nhắn đến message broker.

Quá trình này đọc tin nhắn từ bảng hộp thư đi theo đúng thứ tự mà chúng đã được tạo, sau đó nó cố gắng gửi đến message broker, nếu một tin nhắn được gửi thành công, thì quy trình sẽ xóa tin nhắn khỏi cơ sở dữ liệu (hoặc đánh dấu là đã xử lý).

Kết quả của cách tiếp cận này là gì: như đã đề cập, chúng ta không gửi tin nhắn nếu giao dịch được khôi phục, theo cách này, chúng ta không gửi tin nhắn “ma” liên quan đến những thứ không thực sự xảy ra, quan trọng hơn là nếu message broker tạm thời không có sẵn hoặc mạng có vấn đề không làm ảnh hướng đến các giao dịch kinh doanh của chúng ta.

Bằng cách này, chúng ta đạt được ít nhất một lần đảm bảo giao hàng. Điều này có nghĩa là chúng ta chắc chắn rằng tin nhắn của chúng ta sẽ được gửi ít nhất một lần.

Trường hợp rất hiếm, tin nhắn của chúng ta có thể được gửi hai lần hoặc nhiều lần. Điều này chỉ có thể xảy ra nếu tin nhắn được gửi thành công đến message broker nhưng không bị xóa khỏi cơ sở dữ liệu microservices của chúng ta.

Tôi nghĩ rằng điều này ít xảy ra hơn nhiều so với các lỗi khác được mô tả trước đây. Để hệ thống của chúng ta luôn nhất quán, phía người nhận có trách nhiệm xử lý thông báo theo cách cung cấp tính idempotent.

Điều này có nghĩa là người nhận của chúng ta phải có khả năng phát hiện ra rằng tin nhắn nhất định đã được xử lý hoặc phải có khả năng xử lý nó theo cách không vi phạm tính nhất quán của dữ liệu.

Trong ví dụ của chúng ta khi PaymentService nhận được tin nhắn liên quan đến việc bán chính sách mới, nó sẽ kiểm tra xem tài khoản cho chính sách đã tồn tại hay chưa trước khi tạo một chính sách mới.

Triển khai RabbitMQ và NHibernate

Bước đầu tiên là lưu trữ tin nhắn thay vì gửi ngay lập tức tới message broker. Để làm được điều này, chúng ta phải tạo một lớp chứa thông điệp và siêu dữ liệu của nó. Chúng ta sẽ lưu trữ các instance của lớp này như một phần của giao dịch kinh doanh của chúng ta.

public class Message
{
    public virtual long? Id { get; protected set; }
    
    public virtual string Type { get; protected set; }
    
    public virtual string Payload { get; protected set; }

    public Message(object message)
    {
        Type = message.GetType().FullName + ", " + message.GetType().Assembly.GetName().Name;
        Payload = JsonConvert.SerializeObject(message);
    }

    public virtual object RecreateMessage() => JsonConvert.DeserializeObject(Payload, System.Type.GetType(Type));
}

Lớp tin nhắn của chúng ta lưu trữ dữ liệu sẽ được gửi dưới dạng JSON được serialize thành một chuỗi. Nó cũng lưu trữ kiểu tin nhắn để có thể được tái tạo lại. Hãy nhớ lại ở bài viết trước! Chúng ta đang sử dụng tên lớp làm khóa định tuyến trong RabbitMQ, vì vậy phải đảm bảo rằng chúng ta không để mất thông tin này.

Chúng ta cũng cần định nghĩa ánh xạ NHibernate cho lớp này trong file Message.hbm.xml

<class name="Message" table="outbox_messages">
    <id name="Id" column="id">
        <generator class="identity" />
    </id>
    <property name="Type" column="type" length="500"/>
    <property name="Payload" column="json_payload" length="8000"/>
</class>

Bây giờ chúng ta cần loại bỏ lớp RabbitEventPublisher và cung cấp triển khai mới của interface IEventPublisher sẽ lưu trữ Message trong cơ sở dữ liệu.

public class OutboxEventPublisher : IEventPublisher
{
    private readonly ISession session;

    public OutboxEventPublisher(ISession session)
    {
        this.session = session;
    }

    public async Task PublishMessage(T msg)
    {
        await session.SaveAsync(new Message(msg));
    }
}

Để có thể đưa vào instance của NHibernate hiện tại, chúng ta cần sửa đổi triển khai UnitOfWork và trình cài đặt NHibernate của chúng ta. Các dòng quan trọng nhất là:

services.AddSingleton(cfg.BuildSessionFactory());

services.AddScoped(s => s.GetService().OpenSession());

services.AddScoped<IUnitOfWork, UnitOfWork>();

Ở đây chúng ta đăng ký một singleton của session factory và factory method để tạo instance có phạm vi cho session NHibernate. Bằng cách này, chúng ta chia sẻ phiên của mình giữa UnitOfWork được sử dụng để quản lý các thay đổi của các thực thể và OutboxEventPublisher của chúng ta.

Với thiết lập này, tin nhắn của chúng ta được lưu trữ như một phần của các giao dịch kinh doanh và sẽ không được lưu trong cơ sở dữ liệu nếu giao dịch kinh doanh được khôi phục.

Bây giờ chúng tôi đã sẵn sàng thực hiện phần thứ hai – gửi tin nhắn ra khỏi hộp thư đi.

Chúng tôi có thể sử dụng bộ lập lịch Hangfire hoặc Quartz.net để thực hiện quy trình không đồng bộ của mình, nhưng tôi nghĩ rằng nó hơi quá đối với giải pháp nhỏ của chúng ta.

Thay vào đó, chúng tôi sẽ tận dụng một trong những tính năng mới của .NET Core – IHostedService.

Các dịch vụ được lưu trữ là một cách để triển khai các công việc nền trong ASP.NET Core. Bạn có thể đọc thêm về trong tài liệu chính thức.

public class OutboxSendingService : IHostedService
{
    private readonly Outbox outbox;
    private Timer timer;
    private static object locker = new object();

    public OutboxSendingService(Outbox outbox)
    {
        this.outbox = outbox;
    }


    public Task StartAsync(CancellationToken cancellationToken)
    {
        timer = new Timer
        (
            PushMessages,
            null,
            TimeSpan.Zero,
            TimeSpan.FromSeconds(1)
        );
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        timer?.Change(Timeout.Infinite, 0);
        return Task.CompletedTask;
    }
    
    
    private async void PushMessages(object state)
    {
        var hasLock = false;

        try
        {
            Monitor.TryEnter(locker, ref hasLock);

            if (!hasLock)
            {
                return;
            }
            
            await outbox.PushPendingMessages();

        }
        finally
        {
            if (hasLock)
            {
                Monitor.Exit(locker);
            }
        }
    }
}

Ở đây chúng ta có dịch vụ OutboxSendingService sử dụng timer để kích hoạt xử lý hộp thư đi sau mỗi 1 giây (để sử dụng sản xuất, bạn có thể chuyển cài đặt này vào file cấu hình ứng dụng appsettings.json).

Chức năng PushMessage được gọi sau mỗi 1 giây, vì vậy chúng ta cần đảm bảo rằng chúng ta không chạy nó hai lần cùng một lúc, tránh gửi cùng một tin nhắn hai lần.

Chúng tôi đã sử dụng Monitor để đồng bộ hóa việc thực thi mã chịu trách nhiệm gửi tin nhắn. Mã này được chuyển đến lớp Outbox được trình bày bên dưới.

public class Outbox
{
    private readonly IBusClient busClient;
    private readonly ISessionFactory sessionFactory;
    private readonly OutboxLogger logger;

    public Outbox(IBusClient busClient, ISessionFactory sessionFactory, ILogger logger)
    {
        this.busClient = busClient;
        this.sessionFactory = sessionFactory;
        this.logger = new OutboxLogger(logger);
    }


    public async Task PushPendingMessages()
    {
        var messagesToPush = FetchPendingMessages();
        logger.LogPending(messagesToPush);

        foreach (var msg in messagesToPush)
        {
            if (!await TryPush(msg))
            {
                break;
            }
        }
    }

    private IList FetchPendingMessages()
    {
        List messagesToPush;
        using (var session = sessionFactory.OpenStatelessSession())
        {
            messagesToPush = session.Query()
                .OrderBy(m => m.Id)
                .Take(50)
                .ToList();
        }

        return messagesToPush;
    }

    private async Task TryPush(Message msg)
    {
        using (var session = sessionFactory.OpenStatelessSession())
        {
            var tx = session.BeginTransaction();
            try
            {
                await PublishMessage(msg);
                
                session
                    .CreateQuery("delete Message where id=:id")
                    .SetParameter("id", msg.Id)
                    .ExecuteUpdate();
                
                tx.Commit();
                logger.LogSuccessPush();
                return true;
            }
            catch (Exception e)
            {
                logger.LogFailedPush(e);
                tx?.Rollback();
                return false;
            }
        }
    }

    private async Task PublishMessage(Message msg)
    {
        var deserializedMsg = msg.RecreateMessage();
        var messageKey = deserializedMsg.GetType().Name.ToLower();
        await busClient.BasicPublishAsync(deserializedMsg,
            cfg =>
            {
                cfg.OnExchange("lab-dotnet-micro").WithRoutingKey(messageKey);
            });
    }

}

Khi phương thứcPushPendingMessages của Outbox được gọi, nó sẽ đọc 50 tin nhắn cũ nhất và cố gắng gửi từng tin nhắn đó. Đối với mỗi tin nhắn, chúng ta mở một giao dịch cơ sở dữ liệu mới, chúng ta cố gắng gửi tin nhắn bằng RawRabbit và nếu điều này thành công thì chúng ta xóa tin nhắn khỏi cơ sở dữ liệu. Nếu có vấn đề gì xảy ra, chúng ta sẽ ngừng xử lý và đợi lần sau bộ đếm thời gian gọi mã của chúng tôi để chúng tôi có thể thử lại.

Bây giờ, để kết thúc, chúng ta cần sửa đổi RawRabbitInstaller để cài đặt tất cả các thành phần mới mà chúng ta vừa phát triển.

services.AddScoped<IEventPublisher,OutboxEventPublisher>();
services.AddSingleton<Outbox.Outbox>();
services.AddHostedService<OutboxSendingService>();

Tóm tắt

Microservices không phải là bữa ăn trưa miễn phí. Khi chúng ta bước ra khỏi vùng an toàn của mình được bảo vệ bởi cơ sở dữ liệu quan hệ tuân thủ ACID, có nhiều vấn đề chúng ta phải giải quyết.

Trong bài viết này, chúng tôi đã trình bày outbox pattern và một trong những cách triển khai có thể có của nó. Outbox pattern giúp chúng ta duy trì tính nhất quán khi sử dụng giao tiếp dựa trên sự kiện không đồng bộ giữa các dịch vụ và sử dụng RDBMS để quản lý trạng thái nội bộ của dịch vụ nhỏ nhất định.

Tất nhiên, có nhiều cách mà mô hình này có thể được thực hiện. Nếu bạn quan tâm, bạn có thể kiểm tra triển khai dựa trên Entity Framework được xây dựng dựa trên khái niệm tổng hợp DDD trong bài viết của Kamil Grzybek.

Cũng lưu ý rằng có các framework đã được thử nghiệm và sẵn sàng sản xuất như NServiceBus cung cấp chức năng này.

Bài viết gốc.

Trả lời

Email của bạn sẽ không được hiển thị công khai. Các trường bắt buộc được đánh dấu *