0%

[DotnetCore]Socket程式實作

前情提要

筆者目前於金融業資訊部門就職,如大家所知,核心系統大都維持使用cobol程式語言開發,我們這種寫C#程式語言的,只能攻外圍系統,但不免俗的需要與各核心相關系統做打交道,不管是取得資料或者是更新資料等,比較舊型的架構都是透過TCP/IP交換資料,也就是透過socket程式交換資料。界於此,筆者需要實作socket程式基礎框架適應各種Client程式的開發需求,今天就來寫一支socket core程式吧。

內容

尋找適合nuget套件

筆者堅持不重新造輪子的觀念,若有適合的底層套件則不偏向自己開發,因為基本上現今網路上大神們貢獻的程式碼,因Open Source的關係,也會有同好的大神們加入開發並改善,可以透過

  • 最新維護日期
  • 星星數
  • 網路上資源多寡程度

來決定是否使用該套件。筆者想TCP/IP溝通應該算是一個底層程式的議題,應該有不乏現成套件可使用,在茫茫程式碼大海中找到這個

chronoxor/NetCoreServer

主要選擇原因為星星數這個量算多,接著最後編輯時間不超過一個月,表示有持續在更新,看了一下套用範例程式也算簡單。就選擇他了。

撰寫Client/Server程式

NetCoreServer本身提供諸多的介接實作,筆者這邊主要參考TCP的部份,因此只要參考TCP Chat ServerTCP Chat Client範例程式即可。那筆者就照著套件提供的範例程式開發Client Server程式吧

  • MyTcpClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class MyTcpClient : NetCoreServer.TcpClient
{
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// 連接成功事件 item1:是否連接成功
/// </summary>
public event Action<bool> OnConnect;
/// <summary>
/// 接收通知事件 item1:數據
/// </summary>
public event Action<byte[]> OnReceive;
/// <summary>
/// 切割byte內容
/// </summary>
public Func<List<byte>, Tuple<List<byte>, List<Byte[]>>> SliceByteFunc;
/// <summary>
/// 已發送通知事件 item1:長度
/// </summary>
public event Action<long, long> OnSend;
/// <summary>
/// 斷開連接通知事件
/// </summary>
public event Action OnClose;
/// <summary>
/// 接收到的数据缓存
/// </summary>
private List<byte> queue = new List<byte>();

private readonly SocketSettingOption _socketSetting;
public MyTcpClient (IPAddress address, int port
, IOptions<SocketSettingOption> options) : base(address, port)
{
if (SliceByteFunc == null)
{
SliceByteFunc = CommSlicingByteFunc;
}
_socketSetting = options.Value;
}

public void DisconnectAndStop()
{
_stop = true;
DisconnectAsync();
while (IsConnected)
Thread.Yield();
}

protected override void OnConnected()
{
_logger.Info($"Chat TCP client connected a new session with Id {Id}");
OnConnect(this.IsConnected);
}

protected override void OnDisconnected()
{
_logger.Info($"Chat TCP client disconnected a session with Id {Id}");
OnClose();

// Wait for a while...
Thread.Sleep(1000);

// Try to connect again
if (!_stop && _socketSetting.DisconnectAfterClientReceive == false)
ConnectAsync();
}

protected override void OnReceived(byte[] buffer, long offset, long size)
{
try
{
if (OnReceive != null)
{
_logger.Info($"[Buffer] {buffer.Length};[Offset] {offset};[Size] {size}");
var dataBuffer = ByteExtensions.GetSubByte(buffer, (int)offset, (int)size);
_logger.Info($"[TcpServer_eventactionReceive:data]: {CoreHelper.Encoding.GetString(dataBuffer)}");
queue.AddRange(dataBuffer);
_logger.Info($"[TcpServer_eventactionReceive:queue]: {CoreHelper.Encoding.GetString(queue.ToArray())}");
lock (queue)
{
var sliceResult = SliceByteFunc(queue);
queue = sliceResult.Item1;
foreach (var item in sliceResult.Item2)
{
OnReceive(item);
}
}
}
}
catch (Exception ex)
{
_logger.Info($"[TcpServer_eventactionReceive error]: {ex}");
}
}

protected override void OnError(SocketError error)
{
_logger.Info($"Chat TCP client caught an error with code {error}");
}

protected override void OnSent(long sent, long pending)
{
OnSend(sent, pending);
base.OnSent(sent, pending);
}
}
  • MyTcpClient的Private Method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private bool _stop;

private Tuple<List<byte>, List<byte[]>> CommSlicingByteFunc(List<byte> source)
{
var result = new Tuple<List<byte>, List<byte[]>>(new List<byte>(), new List<byte[]>());
try
{
if (source.Count > 4)
{
var totalLength = int.Parse(CoreHelper.Encoding.GetString(ByteExtensions.GetSubByte(source.ToArray(), 0, 4)));
_logger.Info($"[Total Length]: {totalLength}");
while (totalLength <= source.Count)
{
var subBytes = new byte[totalLength];
System.Buffer.BlockCopy(source.ToArray(), 0, subBytes, 0, totalLength);
_logger.Info($"[Sub byte]: {CoreHelper.Encoding.GetString(subBytes)}");
var newQueue = ByteExtensions.GetSubByte(source.ToArray(), totalLength, source.Count - totalLength);
_logger.Info($"[newQueue]: {CoreHelper.Encoding.GetString(newQueue)}");
source = new List<byte>();
for (int i = 0; i < newQueue.Length; i++)
{
source.Add(newQueue[i]);
}
result.Item2.Add(subBytes);
if (newQueue.Length > 4)
{
totalLength = int.Parse(CoreHelper.Encoding.GetString(ByteExtensions.GetSubByte(newQueue, 0, 4)));
}
}
}
}
catch (Exception ex)
{
_logger.Error($"[CommSlicingByteFunc]: {ex}");
//throw;
result.Item2.Add(source.ToArray());
source = new List<byte>();
}
foreach (var item in source)
{
result.Item1.Add(item);
}
return result;
}
  • MyTcpServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class MyTcpServer : TcpServer
{
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// 連接成功事件 item1:connectId
/// </summary>
public event Action<Guid> OnAccept;
/// <summary>
/// 接收通知事件 item1:connectId,item2:數據
/// </summary>
public event Action<Guid, byte[]> OnReceive;
/// <summary>
/// 已發送通知事件 item1:connectId,item2:長度
/// </summary>
public event Action<long, long> OnSend;
/// <summary>
/// 斷開連接通知事件 item1:connectId,
/// </summary>
public event Action<Guid> OnClose;
public MyTcpServer(IPAddress address, int port) : base(address, port)
{
}

protected override TcpSession CreateSession()
{
_logger.Info($"Create Session!");
var session = new MyTcpSession(this);
_logger.Info($"Session Id: {session.Id}");
session.OnAccept += OnAccept;
session.OnReceive += OnReceive;
session.OnSend += OnSend;
session.OnClose += OnClose;
return session;
}

protected override void OnError(SocketError error)
{
_logger.Error($"[MyTcpServer:Error] {error}");
}

protected override void OnStarted()
{
_logger.Info($"[MyTcpServer Started]");
}

protected override void OnStopped()
{
_logger.Info($"[MyTcpServer Stopped]");
}

}
  • MyTcpSession
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyTcpSession : TcpSession
{
/// <summary>
/// 連接成功事件 item1:connectId
/// </summary>
public event Action<Guid> OnAccept;
/// <summary>
/// 接收通知事件 item1:connectId,item2:數據
/// </summary>
public event Action<Guid, byte[]> OnReceive;
/// <summary>
/// 已發送通知事件 item1:connectId,item2:長度
/// </summary>
public event Action<long, long> OnSend;
/// <summary>
/// 斷開連接通知事件 item1:connectId,
/// </summary>
public event Action<Guid> OnClose;

public MyTcpSession(MyTcpServer server) : base(server)
{
}

protected override void OnConnected()
{
OnAccept(Id);
}

protected override void OnDisconnected()
{
OnClose(Id);
}

protected override void OnReceived(byte[] buffer, long offset, long size)
{
var dataBuffer = ByteExtensions.GetSubByte(buffer, (int)offset, (int)size);
OnReceive(Id, dataBuffer);
}

protected override void OnSent(long sent, long pending)
{
OnSend(sent, pending);
base.OnSent(sent, pending);
}

protected override void OnError(SocketError error)
{
base.OnError(error);
}
}

撰寫Base Client/Base Server程式

基本上筆者希望使用端不需要在出現new出相對應的ServerClient的程式,基本上連線資訊使用appsetting設定,底層程式將appsetting的設定讀出來做設定。因此最上層Client端只要簡單注入就即可使用,接著只要設定好相對應的Event事件,底層的Base Class作轉觸發的動作,最上層的Client端程式只要針對Event事件作對應處理,完美封裝了底層實作。

不免俗的定義一下Interface規格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// IClient
public interface IClient
{
void Client_OnClose();
void Client_OnReceive(byte[] data);
void Client_OnConnect(bool obj);
void Client_OnSend(long sent, long pending);
void Send(byte[] data, int offset, int length);
}
// IServer
public interface IServer
{
/// <summary>
/// 傳送封包
/// </summary>
/// <param name="data"></param>
void Send(byte[] data);
/// <summary>
/// 回傳特定Client封包
/// </summary>
/// <param name="clientId"></param>
/// <param name="data"></param>
void Send(Guid clientId, byte[] data);
/// <summary>
/// Client連線事件
/// </summary>
/// <param name="clientId"></param>
void Server_OnAccept(Guid clientId);
/// <summary>
/// 傳送事件
/// </summary>
/// <param name="sent"></param>
/// <param name="pending"></param>
void Server_OnSend(long sent, long pending);
/// <summary>
/// 特定Client封包接受事件
/// </summary>
/// <param name="clientId"></param>
/// <param name="buffer"></param>
void Server_OnReceive(Guid clientId, byte[] buffer);
/// <summary>
/// 特定Client關閉
/// </summary>
/// <param name="clietId"></param>
void Server_OnClose(Guid clietId);
}
  • BaseServer部份,基本上就是實作IServer相對應的Method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class BaseServer
{
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
private MyTcpServer _server;
private readonly IAnalyzeService _analyzeService;
private SocketSetting _socketSetting = new SocketSetting();
private SocketSettingOption _socketSettingOption;
private SocketSettingType _socketSettingType;
private string _connectionStr;

public BaseServer(IAnalyzeService analyzeService
, IOptions<SocketSettingOption> socketSettingOption
, IConfiguration config
, SocketSettingType socketSettingType)
{
_logger.Info("ServerPush Constructor!!");
_analyzeService = analyzeService;

_socketSettingOption = socketSettingOption.Value;
switch (socketSettingType)
{
case SocketSettingType.Main:
config.Bind(_socketSettingOption.MainSocketSetting, _socketSetting);
break;
case SocketSettingType.Sub:
config.Bind(_socketSettingOption.SubSocketSetting, _socketSetting);
break;
default:
break;
}
_socketSettingType = socketSettingType;

_connectionStr = Encoding.UTF8.GetString(
Convert.FromBase64String(config.GetConnectionString(_socketSettingOption.ConnectionStringKey)));

var ipAddress = IPAddress.Parse(_socketSetting.Server.ReceiveIP);
_server = new MyTcpServer(ipAddress, _socketSetting.Server.ReceivePort);
_server.OnAccept += Server_OnAccept;
_server.OnClose += Server_OnClose;
_server.OnReceive += Server_OnReceive;
_server.OnSend += Server_OnSend;
_server.Start();
}

public void Send(byte[] data)
{
if (_server.ConnectedSessions == 0)
{
_logger.Info("[BaseServer]: No Client to send!");
return;

}

data.Logging(_socketSettingOption, _socketSetting, _connectionStr);
_server.Multicast(data);
}

public void Send(Guid clientId, byte[] data)
{
if (_server.ConnectedSessions == 0)
{
_logger.Info("[BaseServer]: No Client to send!");
return;
}
var session = _server.FindSession(clientId);
if(session != null)
{
data.Logging(_socketSettingOption, _socketSetting, _connectionStr);
session.SendAsync(data);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 其餘事件
public void Server_OnAccept(Guid clientId)
{
_logger.Info($"Server_OnAccept {clientId}");
}

public void Server_OnClose(Guid clientId)
{
_logger.Info($"Server OnClose {clientId}");
}

public void Server_OnReceive(Guid clientId, byte[] data)
{
data.Logging(_socketSettingOption, _socketSetting, _connectionStr);

var receiveData = CoreHelper.Encoding.GetString(data);
_logger.Info($"Server_OnReceive: {receiveData}");
_analyzeService.ServerAnalyze(clientId, data);
}

public void Server_OnSend(long arg1, long arg2)
{
_logger.Info("Server OnSend");
}
  • BaseClient部份,基本上就是實作IClient相對應的Method
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
public class BaseClient
{
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly IAnalyzeService _analyzeService;
private readonly MyTcpClient _client;
private SocketSetting _socketSetting = new SocketSetting();
private SocketSettingOption _socketSettingOption;
private SocketSettingType _socketSettingType;
private string _connectionStr;
public BaseClient(IAnalyzeService analyzeService
, IOptions<SocketSettingOption> socketSettingOption
, IConfiguration config
, SocketSettingType socketSettingType)
{
_analyzeService = analyzeService;
_socketSettingOption = socketSettingOption.Value;
var options = socketSettingOption.Value;
switch (socketSettingType)
{
case SocketSettingType.Main:
config.Bind(options.MainSocketSetting, _socketSetting);
break;
case SocketSettingType.Sub:
config.Bind(options.SubSocketSetting, _socketSetting);
break;
default:
break;
}
_socketSettingType = socketSettingType;

_connectionStr = Encoding.UTF8.GetString(
Convert.FromBase64String(config.GetConnectionString(_socketSettingOption.ConnectionStringKey)));

var ipAddress = IPAddress.Parse(_socketSetting.Client.RemoteIP);
_client = new MyTcpClient(ipAddress, _socketSetting.Client.RemotePort, socketSettingOption);
_client.OnClose += Client_OnClose;
_client.OnConnect += Client_OnConnect;
_client.OnReceive += Client_OnReceive;
_client.OnSend += Client_OnSend;
if (options.ClientSliceByteFunc != null)
{
_client.SliceByteFunc = options.ClientSliceByteFunc;
}
_client.ConnectAsync();
_logger.Info($"[Client Connect]: {_client.IsConnected}");
}
public void Client_OnClose()
{
_logger.Info("Client OnClose");
}

public void Client_OnConnect(bool obj)
{
_logger.Info($"Client OnConnect: {obj}");
}

public void Client_OnReceive(byte[] data)
{
if (string.IsNullOrEmpty(CoreHelper.Encoding.GetString(data).Trim())) return;
_logger.Info($"Client OnReceive[Hex String]: {data.ToHexString()}");

data.Logging(_socketSettingOption, _socketSetting, _connectionStr);

_logger.Info($"Client OnReceive: {CoreHelper.Encoding.GetString(data)}");
_analyzeService.ClientAnalyze(data);
}

public void Client_OnSend(long sent, long pending)
{
_logger.Info("Client OnSend");
}

public void Send(byte[] data, int offset, int length)
{
if (_socketSettingOption.DisconnectAfterClientReceive)
{
_client.DisconnectAndStop();
Thread.Sleep(500);
}
if (PoolingService.RetryUntilSuccessOrTimeout(() =>
{
if (!_client.IsConnected)
_client.ConnectAsync();
Thread.Sleep(500);
return _client.IsConnected;
}, TimeSpan.FromSeconds(10)))
{
_logger.Info($"Client Send: {CoreHelper.Encoding.GetString(data)}");
data.Logging(_socketSettingOption, _socketSetting, _connectionStr);
var sent = _client.SendAsync(data);
_logger.Info($"Client Send Length: {sent}");
}
else
{
_logger.Error($"Client Connect Timeout Error!!");
}
}
}

Client端使用Server、Client物件

到這邊,我們已經有BaseServerBaseClient物件可以使用了,我們為了Client端方便使用,再包裝一個對外的Server、Client物件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// MyClient
public class MyClient : BaseClient, IClient
{
public MyClient (IAnalyzeService analyzeService
, IOptions<SocketSettingOption> socketSettingOption
, IConfiguration config)
: base(analyzeService, socketSettingOption, config, SocketSettingType.Main)
{
}
}
// MyServer
public class MyServer : BaseServer, IServer
{
public MyServer(IAnalyzeService analyzeService
, IOptions<SocketSettingOption> socketSettingOption
, IConfiguration config)
: base(analyzeService, socketSettingOption, config, SocketSettingType.Main)
{
}
}

Service Registration Extension實作

最後的最後,基本上socket程式本身不像一般的Service跟著http request執行,而是一個singleton的實體,底層套件就直接做完注入new出一個實體的作業。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static class SocketServiceRegistration
{
public static IServiceCollection AddMySocket(this IServiceCollection services
, Action<SocketSettingOption> setupOptions)
{
services.AddSingleton<IServer, MyServer>();
services.AddSingleton<IClient, MyClient>();

var options = new SocketSettingOption();
setupOptions(options);
services.Configure(setupOptions);

services.AddHostedService<SocketWorker>();

return services;
}
}

以上述程式碼中可以看到有一個SocketWorker,可以想成是一個Socket Initial Worker,就是硬生生給他new出來放著,基本上最上層Client端使用時就是一個ready好的ServerClient實體。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SocketWorker : BackgroundService
{
private static readonly Logger _logger = LogManager.GetCurrentClassLogger();
private readonly IClient _client;
private readonly IServer _server;
private readonly IExchangeService _exchangeService;
private readonly IConfiguration _config;
private readonly SocketSettingOption _socketSetting;
public SocketWorker(IClient client
, IServer server
, IExchangeService exchangeService
, IConfiguration config
, IOptions<SocketSettingOption> options)
{
_client = client;
_server = server;
_exchangeService = exchangeService;
_socketSetting = options.Value;
_config = config;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
}
}

最上層Client端套用

套用部份,只要於Program中,呼叫上述Extension中的方法即可,當然宣告的同時要設定好Socket相關設定值。

1
2
3
4
services.AddMySocket(option =>
{
option.MainSocketSetting = "Socket";
});

也順便列出appsetting設定值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
"Socket": 
{
"Client": {
"SendBufferSize": 1024,
"RemoteIP": "RemoteIP",
"RemotePort": RemotePort
},
"Server": {
"NumConnections": 20000,
"ReceiveBufferSize": 1024,
"ReceiveIP": "MyIP",
"ReceivePort": MyPort,
"OverTime": 0
},
"Encoding": "Big5"
}

結論

雖然包了很多層,筆者追求底層邏輯封裝,讓最上層Client使用端易用性高,基本上底層邏輯封裝做得越好,若使用的底層套件有問題,還可轉用其他的,對於Client使用端來說只要接口、使用方式不變,不需調整任何程式,就可享有效能更好的底層套件體驗。筆者下一篇來解釋最上層Client使用端如何串接解析處裡封包。