前情提要 筆者公司有需求是系統跟系統串接,前人是使用資料庫交換的方式進行,簡單來講就是系統A有需求要請系統B做事情,簡單作法就是系統A於共同資料庫中的資料表中寫一筆資料,系統B則Database Pooling
的方式進行監聽,若有撈出一筆Status = N
的資料則進行邏輯處裡,進行完成後壓回Status = Y 或 N
的註記方式來完成該次的串接需求,當然途中也會壓Status = P
作為下次Pooling
會撈到的防呆,大概就是這麼簡單的技術完成串接的需求。
筆者因前一份公司有接觸過Message Queue,主要是Rabbit MQ ,覺得使用Message Queue當作中間交換資訊平台,確實有其發展性,不論是之後需求變大時可以做分散式的處理,系統間也間接解偶了。
內容 那怎麼會有產生此篇文章呢,是因為筆者在上一份工作有寫過監聽Rabbit MQ
的Queue
以及publish
的程式,基本上是不難,照著官網範例打,應該都寫得出來,只是落落等,抽共用方法是可以解決,但會有Persistance
的需求,就要自幹了。
那找到這個套件也是因為筆者在熟悉一種框架時會找相對應的developer roadmap
,當作自己要完成的目標,參考asp.net core developer roadmap ,裡頭有一個分類是Message Bus,介紹了幾個推薦用的套件,其中包含CAP ,其實這個命名有一個玄機,由CAP理論 而來,意思就是此套件也是照著CAP理論來達成的。
套件安裝 主要套件安裝
1 PM> Install-Package DotNetCore.CAP
接著要考慮的你要串接的是哪一個Message Queue,CAP套件支援
Kafka
Rabbit MQ
AzureServiceBus
AmazonSQS
依照選擇的對象不同而安裝對應的套件
1 2 3 4 5 PM> Install-Package DotNetCore.CAP.Kafka PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.AzureServiceBus PM> Install-Package DotNetCore.CAP.AmazonSQS
接著關係到Persistance
,依你選擇的Persistance
不同而安裝對應的套件
1 2 3 4 5 PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql PM> Install-Package DotNetCore.CAP.PostgreSql PM> Install-Package DotNetCore.CAP.MongoDB //need MongoDB 4.0+ cluster
安裝完以上三個套件後就完成安裝作業了。筆者再列一下筆者安裝的項目
1 2 3 PM> Install-Package DotNetCore.CAP PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.SqlServer
初始化套件 安裝套件完畢後,該是時候初始化它了,我們宣告於Startup.cs
中
1 2 3 4 5 6 7 8 9 10 11 12 services.AddCap(x => { x.UseSqlServer($"{連線字串} " ); x.UseRabbitMQ(cfg => { cfg.HostName = $"{RabbitMQ Server IP} " ; cfg.UserName = $"{RabbitMQ Server Account} " ; cfg.Password = $"{RabbitMQ Server Password} " ; }); x.FailedRetryCount = 5 ; });
發佈Queue 這個發布只有一種模式,叫做ICapPublisher
,任何Service物件皆可進行發佈,只要能使用建構式注入的物件,例: Controller/Action
,一般Service
,WorkerService
,只要是能藉由建構式注入ICapPublisher
即可使用發佈功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class PublishController : Controller { private readonly ICapPublisher _capBus; public PublishController (ICapPublisher capPublisher ) { _capBus = capPublisher; } [Route("~/adonet/transaction" ) ] public IActionResult AdonetWithTransaction () { using (var connection = new MySqlConnection(ConnectionString)) { using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true )) { _capBus.Publish("xxx.services.show.time" , DateTime.Now); } } return Ok(); } [Route("~/ef/transaction" ) ] public IActionResult EntityFrameworkWithTransaction ([FromServices]AppDbContext dbContext ) { using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true )) { _capBus.Publish("xxx.services.show.time" , DateTime.Now); } return Ok(); } }
當然發佈之於,需要理解一下Rabbit MQ
中的發佈定義,Publish/Subscribe
部份有四種模式
Fanout
Routing
Topics: CAP套件採用此類交換模式
RPC
詳細定義請參考五分鐘了解RabbitMQ運作
監聽Queue 透過CAP
套件監聽Queue
也是非常簡單,只要在你在邏輯處裡的Method
上面套上相對應的Attribute
即可完成。
1 2 3 4 5 6 7 8 public class PublishController : Controller { [CapSubscribe("xxx.services.show.time" ) ] public void CheckReceivedMessage (DateTime datetime ) { Console.WriteLine(datetime); } }
若想要在一般的Service層套用監聽則
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 namespace BusinessCode.Service { public interface ISubscriberService { void CheckReceivedMessage (DateTime datetime ) ; } public class SubscriberService : ISubscriberService , ICapSubscribe { [CapSubscribe("xxx.services.show.time" ) ] public void CheckReceivedMessage (DateTime datetime ) { } } }
結論 透過CAP套件,你也可以很容易地使用Message Queue
當作系統間的橋樑,這是跨入Microservice
世界的一小步,也可以達成物件導向追求的解偶。