Removed ConcurrentDictionary in favor of locked dictionary, and added… (#287)
* Removed ConcurrentDictionary in favor of locked dictionary, and added reconnection behaviour to OnConnection * Refactored code to handle multiple ws connected at the same time and offer graceful disconnection of appropriate sockets * removed unused usings --------- Co-authored-by: Alex <clodanSPT@hotmail.com> Co-authored-by: Chomp <27521899+chompDev@users.noreply.github.com>
This commit is contained in:
@@ -722,6 +722,7 @@
|
||||
"websocket-not_ready_message_not_sent": "[WS] Socket not ready for %s, message not sent",
|
||||
"websocket-pinging_player": "[WS] Pinging player: %s",
|
||||
"websocket-player_connected": "[WS] Player: %s has connected",
|
||||
"websocket-player_reconnect": "[WS] Player: %s reconnection received, closing previous active socket",
|
||||
"websocket-received_message": "[WS] Received message from user %s ",
|
||||
"websocket-socket_lost_deleting_handle": "[WS] Socket lost, deleting handle",
|
||||
"websocket-started": "Started websocket at %s",
|
||||
|
||||
@@ -34,6 +34,8 @@ public class WebSocketServer(
|
||||
return;
|
||||
}
|
||||
|
||||
var sessionIdContext = DateTime.UtcNow.ToString("yyyyMMddHHmmssfff");
|
||||
|
||||
foreach (var wsh in socketHandlers)
|
||||
{
|
||||
if (webSocket.State == WebSocketState.Open)
|
||||
@@ -44,7 +46,7 @@ public class WebSocketServer(
|
||||
}
|
||||
}
|
||||
|
||||
await wsh.OnConnection(webSocket, context);
|
||||
await wsh.OnConnection(webSocket, context, sessionIdContext);
|
||||
}
|
||||
|
||||
// Discard this task, we dont need to await it.
|
||||
@@ -79,7 +81,7 @@ public class WebSocketServer(
|
||||
foreach (var wsh in socketHandlers)
|
||||
{
|
||||
await cts.CancelAsync();
|
||||
await wsh.OnClose(webSocket, context);
|
||||
await wsh.OnClose(webSocket, context, sessionIdContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ public interface IWebSocketConnectionHandler
|
||||
{
|
||||
string GetHookUrl();
|
||||
string GetSocketId();
|
||||
Task OnConnection(WebSocket ws, HttpContext context);
|
||||
Task OnConnection(WebSocket ws, HttpContext context, string sessionIdContext);
|
||||
Task OnMessage(byte[] rawData, WebSocketMessageType messageType, WebSocket ws, HttpContext context);
|
||||
Task OnClose(WebSocket ws, HttpContext context);
|
||||
Task OnClose(WebSocket ws, HttpContext context, string sessionIdContext);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using SPTarkov.DI.Annotations;
|
||||
@@ -18,12 +17,11 @@ public class SptWebSocketConnectionHandler(
|
||||
LocalisationService _localisationService,
|
||||
JsonUtil _jsonUtil,
|
||||
ProfileHelper _profileHelper,
|
||||
ConfigServer _configServer,
|
||||
IEnumerable<ISptWebSocketMessageHandler> _messageHandlers
|
||||
) : IWebSocketConnectionHandler
|
||||
{
|
||||
protected WsPing _defaultNotification = new();
|
||||
protected ConcurrentDictionary<string, WebSocket> _sockets = new();
|
||||
protected Dictionary<string, Dictionary<string, WebSocket>> _sockets = new();
|
||||
protected Lock _socketsLock = new();
|
||||
|
||||
public string GetHookUrl()
|
||||
{
|
||||
@@ -35,29 +33,57 @@ public class SptWebSocketConnectionHandler(
|
||||
return "SPT WebSocket Handler";
|
||||
}
|
||||
|
||||
public Task OnConnection(WebSocket ws, HttpContext context)
|
||||
public Task OnConnection(WebSocket ws, HttpContext context, string sessionIdContext)
|
||||
{
|
||||
var splitUrl = context.Request.Path.Value.Split("/");
|
||||
var sessionID = splitUrl.Last();
|
||||
var playerProfile = _profileHelper.GetFullProfile(sessionID);
|
||||
var playerInfoText = $"{playerProfile.ProfileInfo.Username} ({sessionID})";
|
||||
|
||||
_logger.Info(_localisationService.GetText("websocket-player_connected", playerInfoText));
|
||||
|
||||
if (!_sockets.TryAdd(sessionID, ws) && _logger.IsLogEnabled(LogLevel.Debug))
|
||||
lock (_socketsLock)
|
||||
{
|
||||
_logger.Debug($"[ws] player: {playerInfoText} has already connected");
|
||||
}
|
||||
if (_sockets.TryGetValue(sessionID, out var sessionSockets) && sessionSockets.Any())
|
||||
{
|
||||
if (_logger.IsLogEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.Debug(_localisationService.GetText("websocket-player_reconnect", playerInfoText));
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
foreach (var oldSocket in sessionSockets)
|
||||
{
|
||||
if (_logger.IsLogEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.Debug($"[ws] Removing websocket reference {oldSocket.Key} for session {sessionID}");
|
||||
}
|
||||
|
||||
oldSocket.Value.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).Wait();
|
||||
}
|
||||
|
||||
sessionSockets.Clear();
|
||||
}
|
||||
else
|
||||
{
|
||||
sessionSockets = new Dictionary<string, WebSocket>();
|
||||
_sockets.Add(sessionID, sessionSockets);
|
||||
}
|
||||
|
||||
sessionSockets.Add(sessionIdContext, ws);
|
||||
if (_logger.IsLogEnabled(LogLevel.Info))
|
||||
{
|
||||
_logger.Info(_localisationService.GetText("websocket-player_connected", playerInfoText));
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task OnMessage(byte[] receivedMessage, WebSocketMessageType messageType, WebSocket ws, HttpContext context)
|
||||
public async Task OnMessage(
|
||||
byte[] receivedMessage,
|
||||
WebSocketMessageType messageType,
|
||||
WebSocket ws,
|
||||
HttpContext context)
|
||||
{
|
||||
var splitUrl = context.Request.Path.Value.Split("/");
|
||||
var sessionID = splitUrl.Last();
|
||||
var playerProfile = _profileHelper.GetFullProfile(sessionID);
|
||||
var playerInfoText = $"{playerProfile.ProfileInfo.Username} ({sessionID})";
|
||||
|
||||
foreach (var sptWebSocketMessageHandler in _messageHandlers)
|
||||
{
|
||||
@@ -65,19 +91,31 @@ public class SptWebSocketConnectionHandler(
|
||||
}
|
||||
}
|
||||
|
||||
public async Task OnClose(WebSocket ws, HttpContext context)
|
||||
public async Task OnClose(WebSocket ws, HttpContext context, string sessionIdContext)
|
||||
{
|
||||
var splitUrl = context.Request.Path.Value.Split("/");
|
||||
var sessionID = splitUrl.Last();
|
||||
|
||||
if (!_sockets.Remove(sessionID, out _) && _logger.IsLogEnabled(LogLevel.Debug))
|
||||
lock (_socketsLock)
|
||||
{
|
||||
_logger.Debug($"[ws] Error removing socket for session: {sessionID}");
|
||||
if (_sockets.TryGetValue(sessionID, out var sessionSockets) && sessionSockets.Any())
|
||||
{
|
||||
if (!sessionSockets.TryGetValue(sessionIdContext, out _) && _logger.IsLogEnabled(LogLevel.Info))
|
||||
{
|
||||
_logger.Info($"[ws] The websocket session {sessionID} with reference {sessionIdContext} has already been removed or reconnected");
|
||||
}
|
||||
else
|
||||
{
|
||||
sessionSockets.Remove(sessionIdContext);
|
||||
if (_logger.IsLogEnabled(LogLevel.Info))
|
||||
{
|
||||
var playerProfile = _profileHelper.GetFullProfile(sessionID);
|
||||
var playerInfoText = $"{playerProfile.ProfileInfo.Username} ({sessionID})";
|
||||
_logger.Info($"[ws] player: {playerInfoText} has disconnected");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var playerProfile = _profileHelper.GetFullProfile(sessionID);
|
||||
var playerInfoText = $"{playerProfile.ProfileInfo.Username} ({sessionID})";
|
||||
_logger.Info($"[ws] player: {playerInfoText} has disconnected");
|
||||
}
|
||||
|
||||
public void SendMessage(string sessionID, WsNotificationEvent output)
|
||||
@@ -86,15 +124,19 @@ public class SptWebSocketConnectionHandler(
|
||||
{
|
||||
if (IsWebSocketConnected(sessionID))
|
||||
{
|
||||
var ws = GetSessionWebSocket(sessionID);
|
||||
var webSockets = GetSessionWebSocket(sessionID);
|
||||
|
||||
foreach (var webSocket in webSockets)
|
||||
{
|
||||
var sendTask = webSocket.SendAsync(
|
||||
Encoding.UTF8.GetBytes(_jsonUtil.Serialize(output, output.GetType())),
|
||||
WebSocketMessageType.Text,
|
||||
true,
|
||||
CancellationToken.None
|
||||
);
|
||||
sendTask.Wait();
|
||||
}
|
||||
|
||||
var sendTask = ws.SendAsync(
|
||||
Encoding.UTF8.GetBytes(_jsonUtil.Serialize(output, output.GetType())),
|
||||
WebSocketMessageType.Text,
|
||||
true,
|
||||
CancellationToken.None
|
||||
);
|
||||
sendTask.Wait();
|
||||
if (_logger.IsLogEnabled(LogLevel.Debug))
|
||||
{
|
||||
_logger.Debug(_localisationService.GetText("websocket-message_sent"));
|
||||
@@ -116,11 +158,17 @@ public class SptWebSocketConnectionHandler(
|
||||
|
||||
public bool IsWebSocketConnected(string sessionID)
|
||||
{
|
||||
return _sockets.TryGetValue(sessionID, out var socket) && socket.State == WebSocketState.Open;
|
||||
lock (_socketsLock)
|
||||
{
|
||||
return _sockets.TryGetValue(sessionID, out var sockets) && sockets.Any(s => s.Value.State == WebSocketState.Open);
|
||||
}
|
||||
}
|
||||
|
||||
public WebSocket GetSessionWebSocket(string sessionID)
|
||||
public IEnumerable<WebSocket> GetSessionWebSocket(string sessionID)
|
||||
{
|
||||
return _sockets.GetValueOrDefault(sessionID);
|
||||
lock (_socketsLock)
|
||||
{
|
||||
return _sockets.GetValueOrDefault(sessionID)?.Values.Where(s => s.State == WebSocketState.Open);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user