前情提要
筆者目前於金融業資訊部門就職,如大家所知,核心系統大都維持使用cobol
程式語言開發,我們這種寫C#
程式語言的,只能攻外圍系統,但不免俗的需要與各核心相關系統做打交道,不管是取得資料或者是更新資料等,比較舊型的架構都是透過TCP/IP
交換資料,也就是透過socket
程式交換資料。界於此,筆者需要實作socket程式基礎框架適應各種Client程式的開發需求,今天就來寫一支socket core
程式吧。
內容
尋找適合nuget套件
筆者堅持不重新造輪子的觀念,若有適合的底層套件則不偏向自己開發,因為基本上現今網路上大神們貢獻的程式碼,因Open Source的關係,也會有同好的大神們加入開發並改善,可以透過
來決定是否使用該套件。筆者想TCP/IP溝通應該算是一個底層程式的議題,應該有不乏現成套件可使用,在茫茫程式碼大海中找到這個
chronoxor/NetCoreServer
主要選擇原因為星星數這個量算多,接著最後編輯時間不超過一個月,表示有持續在更新,看了一下套用範例程式也算簡單。就選擇他了。
撰寫Client/Server程式
NetCoreServer本身提供諸多的介接實作,筆者這邊主要參考TCP
的部份,因此只要參考TCP Chat Server
、TCP Chat Client
範例程式即可。那筆者就照著套件提供的範例程式開發Client Server
程式吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| public class MyTcpClient : NetCoreServer.TcpClient { private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); public event Action<bool> OnConnect; public event Action<byte[]> OnReceive; public Func<List<byte>, Tuple<List<byte>, List<Byte[]>>> SliceByteFunc; public event Action<long, long> OnSend; public event Action OnClose; private List<byte> queue = new List<byte>();
private readonly SocketSettingOption _socketSetting; public MyTcpClient (IPAddress address, int port , IOptions<SocketSettingOption> options) : base(address, port) { if (SliceByteFunc == null) { SliceByteFunc = CommSlicingByteFunc; } _socketSetting = options.Value; }
public void DisconnectAndStop() { _stop = true; DisconnectAsync(); while (IsConnected) Thread.Yield(); }
protected override void OnConnected() { _logger.Info($"Chat TCP client connected a new session with Id {Id}"); OnConnect(this.IsConnected); }
protected override void OnDisconnected() { _logger.Info($"Chat TCP client disconnected a session with Id {Id}"); OnClose();
Thread.Sleep(1000);
if (!_stop && _socketSetting.DisconnectAfterClientReceive == false) ConnectAsync(); }
protected override void OnReceived(byte[] buffer, long offset, long size) { try { if (OnReceive != null) { _logger.Info($"[Buffer] {buffer.Length};[Offset] {offset};[Size] {size}"); var dataBuffer = ByteExtensions.GetSubByte(buffer, (int)offset, (int)size); _logger.Info($"[TcpServer_eventactionReceive:data]: {CoreHelper.Encoding.GetString(dataBuffer)}"); queue.AddRange(dataBuffer); _logger.Info($"[TcpServer_eventactionReceive:queue]: {CoreHelper.Encoding.GetString(queue.ToArray())}"); lock (queue) { var sliceResult = SliceByteFunc(queue); queue = sliceResult.Item1; foreach (var item in sliceResult.Item2) { OnReceive(item); } } } } catch (Exception ex) { _logger.Info($"[TcpServer_eventactionReceive error]: {ex}"); } }
protected override void OnError(SocketError error) { _logger.Info($"Chat TCP client caught an error with code {error}"); }
protected override void OnSent(long sent, long pending) { OnSend(sent, pending); base.OnSent(sent, pending); } }
|
- MyTcpClient的Private Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| private bool _stop;
private Tuple<List<byte>, List<byte[]>> CommSlicingByteFunc(List<byte> source) { var result = new Tuple<List<byte>, List<byte[]>>(new List<byte>(), new List<byte[]>()); try { if (source.Count > 4) { var totalLength = int.Parse(CoreHelper.Encoding.GetString(ByteExtensions.GetSubByte(source.ToArray(), 0, 4))); _logger.Info($"[Total Length]: {totalLength}"); while (totalLength <= source.Count) { var subBytes = new byte[totalLength]; System.Buffer.BlockCopy(source.ToArray(), 0, subBytes, 0, totalLength); _logger.Info($"[Sub byte]: {CoreHelper.Encoding.GetString(subBytes)}"); var newQueue = ByteExtensions.GetSubByte(source.ToArray(), totalLength, source.Count - totalLength); _logger.Info($"[newQueue]: {CoreHelper.Encoding.GetString(newQueue)}"); source = new List<byte>(); for (int i = 0; i < newQueue.Length; i++) { source.Add(newQueue[i]); } result.Item2.Add(subBytes); if (newQueue.Length > 4) { totalLength = int.Parse(CoreHelper.Encoding.GetString(ByteExtensions.GetSubByte(newQueue, 0, 4))); } } } } catch (Exception ex) { _logger.Error($"[CommSlicingByteFunc]: {ex}"); result.Item2.Add(source.ToArray()); source = new List<byte>(); } foreach (var item in source) { result.Item1.Add(item); } return result; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class MyTcpServer : TcpServer { private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); public event Action<Guid> OnAccept; public event Action<Guid, byte[]> OnReceive; public event Action<long, long> OnSend; public event Action<Guid> OnClose; public MyTcpServer(IPAddress address, int port) : base(address, port) { }
protected override TcpSession CreateSession() { _logger.Info($"Create Session!"); var session = new MyTcpSession(this); _logger.Info($"Session Id: {session.Id}"); session.OnAccept += OnAccept; session.OnReceive += OnReceive; session.OnSend += OnSend; session.OnClose += OnClose; return session; }
protected override void OnError(SocketError error) { _logger.Error($"[MyTcpServer:Error] {error}"); }
protected override void OnStarted() { _logger.Info($"[MyTcpServer Started]"); }
protected override void OnStopped() { _logger.Info($"[MyTcpServer Stopped]"); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class MyTcpSession : TcpSession { public event Action<Guid> OnAccept; public event Action<Guid, byte[]> OnReceive; public event Action<long, long> OnSend; public event Action<Guid> OnClose;
public MyTcpSession(MyTcpServer server) : base(server) { }
protected override void OnConnected() { OnAccept(Id); }
protected override void OnDisconnected() { OnClose(Id); }
protected override void OnReceived(byte[] buffer, long offset, long size) { var dataBuffer = ByteExtensions.GetSubByte(buffer, (int)offset, (int)size); OnReceive(Id, dataBuffer); }
protected override void OnSent(long sent, long pending) { OnSend(sent, pending); base.OnSent(sent, pending); }
protected override void OnError(SocketError error) { base.OnError(error); } }
|
撰寫Base Client/Base Server程式
基本上筆者希望使用端不需要在出現new
出相對應的Server
或Client
的程式,基本上連線資訊使用appsetting
設定,底層程式將appsetting
的設定讀出來做設定。因此最上層Client端只要簡單注入就即可使用,接著只要設定好相對應的Event
事件,底層的Base Class
作轉觸發的動作,最上層的Client端程式只要針對Event
事件作對應處理,完美封裝了底層實作。
不免俗的定義一下Interface規格
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public interface IClient { void Client_OnClose(); void Client_OnReceive(byte[] data); void Client_OnConnect(bool obj); void Client_OnSend(long sent, long pending); void Send(byte[] data, int offset, int length); }
public interface IServer { void Send(byte[] data); void Send(Guid clientId, byte[] data); void Server_OnAccept(Guid clientId); void Server_OnSend(long sent, long pending); void Server_OnReceive(Guid clientId, byte[] buffer); void Server_OnClose(Guid clietId); }
|
- BaseServer部份,基本上就是實作IServer相對應的Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| public class BaseServer { private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); private MyTcpServer _server; private readonly IAnalyzeService _analyzeService; private SocketSetting _socketSetting = new SocketSetting(); private SocketSettingOption _socketSettingOption; private SocketSettingType _socketSettingType; private string _connectionStr;
public BaseServer(IAnalyzeService analyzeService , IOptions<SocketSettingOption> socketSettingOption , IConfiguration config , SocketSettingType socketSettingType) { _logger.Info("ServerPush Constructor!!"); _analyzeService = analyzeService;
_socketSettingOption = socketSettingOption.Value; switch (socketSettingType) { case SocketSettingType.Main: config.Bind(_socketSettingOption.MainSocketSetting, _socketSetting); break; case SocketSettingType.Sub: config.Bind(_socketSettingOption.SubSocketSetting, _socketSetting); break; default: break; } _socketSettingType = socketSettingType;
_connectionStr = Encoding.UTF8.GetString( Convert.FromBase64String(config.GetConnectionString(_socketSettingOption.ConnectionStringKey)));
var ipAddress = IPAddress.Parse(_socketSetting.Server.ReceiveIP); _server = new MyTcpServer(ipAddress, _socketSetting.Server.ReceivePort); _server.OnAccept += Server_OnAccept; _server.OnClose += Server_OnClose; _server.OnReceive += Server_OnReceive; _server.OnSend += Server_OnSend; _server.Start(); }
public void Send(byte[] data) { if (_server.ConnectedSessions == 0) { _logger.Info("[BaseServer]: No Client to send!"); return;
}
data.Logging(_socketSettingOption, _socketSetting, _connectionStr); _server.Multicast(data); }
public void Send(Guid clientId, byte[] data) { if (_server.ConnectedSessions == 0) { _logger.Info("[BaseServer]: No Client to send!"); return; } var session = _server.FindSession(clientId); if(session != null) { data.Logging(_socketSettingOption, _socketSetting, _connectionStr); session.SendAsync(data); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public void Server_OnAccept(Guid clientId) { _logger.Info($"Server_OnAccept {clientId}"); }
public void Server_OnClose(Guid clientId) { _logger.Info($"Server OnClose {clientId}"); }
public void Server_OnReceive(Guid clientId, byte[] data) { data.Logging(_socketSettingOption, _socketSetting, _connectionStr);
var receiveData = CoreHelper.Encoding.GetString(data); _logger.Info($"Server_OnReceive: {receiveData}"); _analyzeService.ServerAnalyze(clientId, data); }
public void Server_OnSend(long arg1, long arg2) { _logger.Info("Server OnSend"); }
|
- BaseClient部份,基本上就是實作IClient相對應的Method
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| public class BaseClient { private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); private readonly IAnalyzeService _analyzeService; private readonly MyTcpClient _client; private SocketSetting _socketSetting = new SocketSetting(); private SocketSettingOption _socketSettingOption; private SocketSettingType _socketSettingType; private string _connectionStr; public BaseClient(IAnalyzeService analyzeService , IOptions<SocketSettingOption> socketSettingOption , IConfiguration config , SocketSettingType socketSettingType) { _analyzeService = analyzeService; _socketSettingOption = socketSettingOption.Value; var options = socketSettingOption.Value; switch (socketSettingType) { case SocketSettingType.Main: config.Bind(options.MainSocketSetting, _socketSetting); break; case SocketSettingType.Sub: config.Bind(options.SubSocketSetting, _socketSetting); break; default: break; } _socketSettingType = socketSettingType;
_connectionStr = Encoding.UTF8.GetString( Convert.FromBase64String(config.GetConnectionString(_socketSettingOption.ConnectionStringKey)));
var ipAddress = IPAddress.Parse(_socketSetting.Client.RemoteIP); _client = new MyTcpClient(ipAddress, _socketSetting.Client.RemotePort, socketSettingOption); _client.OnClose += Client_OnClose; _client.OnConnect += Client_OnConnect; _client.OnReceive += Client_OnReceive; _client.OnSend += Client_OnSend; if (options.ClientSliceByteFunc != null) { _client.SliceByteFunc = options.ClientSliceByteFunc; } _client.ConnectAsync(); _logger.Info($"[Client Connect]: {_client.IsConnected}"); } public void Client_OnClose() { _logger.Info("Client OnClose"); }
public void Client_OnConnect(bool obj) { _logger.Info($"Client OnConnect: {obj}"); }
public void Client_OnReceive(byte[] data) { if (string.IsNullOrEmpty(CoreHelper.Encoding.GetString(data).Trim())) return; _logger.Info($"Client OnReceive[Hex String]: {data.ToHexString()}");
data.Logging(_socketSettingOption, _socketSetting, _connectionStr);
_logger.Info($"Client OnReceive: {CoreHelper.Encoding.GetString(data)}"); _analyzeService.ClientAnalyze(data); }
public void Client_OnSend(long sent, long pending) { _logger.Info("Client OnSend"); }
public void Send(byte[] data, int offset, int length) { if (_socketSettingOption.DisconnectAfterClientReceive) { _client.DisconnectAndStop(); Thread.Sleep(500); } if (PoolingService.RetryUntilSuccessOrTimeout(() => { if (!_client.IsConnected) _client.ConnectAsync(); Thread.Sleep(500); return _client.IsConnected; }, TimeSpan.FromSeconds(10))) { _logger.Info($"Client Send: {CoreHelper.Encoding.GetString(data)}"); data.Logging(_socketSettingOption, _socketSetting, _connectionStr); var sent = _client.SendAsync(data); _logger.Info($"Client Send Length: {sent}"); } else { _logger.Error($"Client Connect Timeout Error!!"); } } }
|
Client端使用Server、Client物件
到這邊,我們已經有BaseServer
及BaseClient
物件可以使用了,我們為了Client端方便使用,再包裝一個對外的Server、Client物件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class MyClient : BaseClient, IClient { public MyClient (IAnalyzeService analyzeService , IOptions<SocketSettingOption> socketSettingOption , IConfiguration config) : base(analyzeService, socketSettingOption, config, SocketSettingType.Main) { } }
public class MyServer : BaseServer, IServer { public MyServer(IAnalyzeService analyzeService , IOptions<SocketSettingOption> socketSettingOption , IConfiguration config) : base(analyzeService, socketSettingOption, config, SocketSettingType.Main) { } }
|
Service Registration Extension實作
最後的最後,基本上socket
程式本身不像一般的Service跟著http request
執行,而是一個singleton
的實體,底層套件就直接做完注入new
出一個實體的作業。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static class SocketServiceRegistration { public static IServiceCollection AddMySocket(this IServiceCollection services , Action<SocketSettingOption> setupOptions) { services.AddSingleton<IServer, MyServer>(); services.AddSingleton<IClient, MyClient>();
var options = new SocketSettingOption(); setupOptions(options); services.Configure(setupOptions);
services.AddHostedService<SocketWorker>();
return services; } }
|
以上述程式碼中可以看到有一個SocketWorker
,可以想成是一個Socket Initial Worker
,就是硬生生給他new
出來放著,基本上最上層Client端使用時就是一個ready好的Server
及Client
實體。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class SocketWorker : BackgroundService { private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); private readonly IClient _client; private readonly IServer _server; private readonly IExchangeService _exchangeService; private readonly IConfiguration _config; private readonly SocketSettingOption _socketSetting; public SocketWorker(IClient client , IServer server , IExchangeService exchangeService , IConfiguration config , IOptions<SocketSettingOption> options) { _client = client; _server = server; _exchangeService = exchangeService; _socketSetting = options.Value; _config = config; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { return Task.CompletedTask; } }
|
最上層Client端套用
套用部份,只要於Program中,呼叫上述Extension中的方法即可,當然宣告的同時要設定好Socket相關設定值。
1 2 3 4
| services.AddMySocket(option => { option.MainSocketSetting = "Socket"; });
|
也順便列出appsetting
設定值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| "Socket": { "Client": { "SendBufferSize": 1024, "RemoteIP": "RemoteIP", "RemotePort": RemotePort }, "Server": { "NumConnections": 20000, "ReceiveBufferSize": 1024, "ReceiveIP": "MyIP", "ReceivePort": MyPort, "OverTime": 0 }, "Encoding": "Big5" }
|
結論
雖然包了很多層,筆者追求底層邏輯封裝,讓最上層Client使用端易用性高,基本上底層邏輯封裝做得越好,若使用的底層套件有問題,還可轉用其他的,對於Client使用端來說只要接口、使用方式不變,不需調整任何程式,就可享有效能更好的底層套件體驗。筆者下一篇來解釋最上層Client使用端如何串接解析處裡封包。