0%

[DotnetCore]EventBus套件CAP:以RabbitMQ為例

前提情要

筆者公司有需求是系統跟系統串接,前人是使用資料庫交換的方式進行,簡單來講就是系統A有需求要請系統B做事情,簡單作法就是系統A於共同資料庫中的資料表中寫一筆資料,系統B則Database Pooling的方式進行監聽,若有撈出一筆Status = N的資料則進行邏輯處裡,進行完成後壓回Status = Y 或 N的註記方式來完成該次的串接需求,當然途中也會壓Status = P作為下次Pooling會撈到的防呆,大概就是這麼簡單的技術完成串接的需求。

筆者因前一份公司有接觸過Message Queue,主要是Rabbit MQ,覺得使用Message Queue當作中間交換資訊平台,確實有其發展性,不論是之後需求變大時可以做分散式的處理,系統間也間接解偶了。

內容

那怎麼會有產生此篇文章呢,是因為筆者在上一份工作有寫過監聽Rabbit MQQueue以及publish的程式,基本上是不難,照著官網範例打,應該都寫得出來,只是落落等,抽共用方法是可以解決,但會有Persistance的需求,就要自幹了。

那找到這個套件也是因為筆者在熟悉一種框架時會找相對應的developer roadmap,當作自己要完成的目標,參考asp.net core developer roadmap,裡頭有一個分類是Message Bus,介紹了幾個推薦用的套件,其中包含CAP,其實這個命名有一個玄機,由CAP理論而來,意思就是此套件也是照著CAP理論來達成的。

套件安裝

主要套件安裝

1
PM> Install-Package DotNetCore.CAP

接著要考慮的你要串接的是哪一個Message Queue,CAP套件支援

  • Kafka
  • Rabbit MQ
  • AzureServiceBus
  • AmazonSQS

依照選擇的對象不同而安裝對應的套件

1
2
3
4
5
PM> Install-Package DotNetCore.CAP.Kafka
# 筆者因為串接RabbitMQ,因此使用這則
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
PM> Install-Package DotNetCore.CAP.AmazonSQS

接著關係到Persistance,依你選擇的Persistance不同而安裝對應的套件

1
2
3
4
5
# 筆者因為SqlServer當作Persistance Target,因此使用這則
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB //need MongoDB 4.0+ cluster

安裝完以上三個套件後就完成安裝作業了。筆者再列一下筆者安裝的項目

1
2
3
PM> Install-Package DotNetCore.CAP
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.SqlServer

初始化套件

安裝套件完畢後,該是時候初始化它了,我們宣告於Startup.cs

1
2
3
4
5
6
7
8
9
10
11
12
// ConfigureServices
services.AddCap(x =>
{
x.UseSqlServer($"{連線字串}");
x.UseRabbitMQ(cfg =>
{
cfg.HostName = $"{RabbitMQ Server IP}";
cfg.UserName = $"{RabbitMQ Server Account}";
cfg.Password = $"{RabbitMQ Server Password}";
});
x.FailedRetryCount = 5;
});

發佈Queue

這個發布只有一種模式,叫做ICapPublisher,任何Service物件皆可進行發佈,只要能使用建構式注入的物件,例: Controller/Action,一般ServiceWorkerService,只要是能藉由建構式注入ICapPublisher即可使用發佈功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class PublishController : Controller
{
private readonly ICapPublisher _capBus;

public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}

[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//your business logic code

_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
}

return Ok();
}

[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
{
//your business logic code

_capBus.Publish("xxx.services.show.time", DateTime.Now);
}

return Ok();
}
}

當然發佈之於,需要理解一下Rabbit MQ中的發佈定義,Publish/Subscribe部份有四種模式

  • Fanout
  • Routing
  • Topics: CAP套件採用此類交換模式
  • RPC

詳細定義請參考五分鐘了解RabbitMQ運作

監聽Queue

透過CAP套件監聽Queue也是非常簡單,只要在你在邏輯處裡的Method上面套上相對應的Attribute即可完成。

1
2
3
4
5
6
7
8
public class PublishController : Controller
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
}
}

若想要在一般的Service層套用監聽則

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
namespace BusinessCode.Service
{
public interface ISubscriberService
{
void CheckReceivedMessage(DateTime datetime);
}

// 主要是你的Service要實作ICapSubscribe
public class SubscriberService: ISubscriberService, ICapSubscribe
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
}
}
}

結論

透過CAP套件,你也可以很容易地使用Message Queue當作系統間的橋樑,這是跨入Microservice世界的一小步,也可以達成物件導向追求的解偶。