227 lines
8.2 KiB
C#
227 lines
8.2 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Runtime.Serialization;
|
|
using System.Text.Json;
|
|
using AccessQueueService.Models;
|
|
using Microsoft.Extensions.Configuration;
|
|
|
|
namespace AccessQueueService.Data
|
|
{
|
|
public class AccessQueueRepository
|
|
{
|
|
private ConcurrentDictionary<string, AccessTicket> _accessTickets = new();
|
|
private ConcurrentDictionary<string, ulong> _queueNumbers = new();
|
|
private ConcurrentDictionary<ulong, AccessTicket> _accessQueue = new();
|
|
|
|
internal ulong _nowServing = 0;
|
|
internal ulong _nextUnusedTicket = 0;
|
|
|
|
private int? _cachedActiveUsers = null;
|
|
private DateTime _cachedActiveUsersTime = DateTime.MinValue;
|
|
|
|
public int GetUnexpiredTicketsCount() => _accessTickets.Count(t => t.Value.ExpiresOn > DateTime.UtcNow);
|
|
public int GetActiveTicketsCount(DateTime activeCutoff) => _accessTickets
|
|
.Count(t => t.Value.ExpiresOn > DateTime.UtcNow && t.Value.LastActive > activeCutoff);
|
|
public int GetQueueCount() => (int)(_nextUnusedTicket - _nowServing);
|
|
public int GetRequestsAhead(string userId)
|
|
{
|
|
if (_queueNumbers.TryGetValue(userId, out var queueNumber))
|
|
{
|
|
if (_accessQueue.TryGetValue(queueNumber, out var ticket))
|
|
{
|
|
ticket.LastActive = DateTime.UtcNow;
|
|
return queueNumber >= _nowServing ? (int)(queueNumber - _nowServing) : -1;
|
|
}
|
|
}
|
|
return -1;
|
|
|
|
}
|
|
|
|
public void Enqueue(AccessTicket ticket)
|
|
{
|
|
if (_nextUnusedTicket >= long.MaxValue)
|
|
{
|
|
// Prevent overflow
|
|
Optimize();
|
|
}
|
|
_queueNumbers[ticket.UserId] = _nextUnusedTicket;
|
|
_accessQueue[_nextUnusedTicket] = ticket;
|
|
_nextUnusedTicket++;
|
|
}
|
|
|
|
public int DeleteExpiredTickets()
|
|
{
|
|
var cutoff = DateTime.UtcNow;
|
|
var expiredTickets = _accessTickets.Where(t => t.Value.ExpiresOn < cutoff).ToList();
|
|
int count = 0;
|
|
foreach (var ticket in expiredTickets)
|
|
{
|
|
count++;
|
|
_accessTickets.TryRemove(ticket.Key, out _);
|
|
}
|
|
return count;
|
|
}
|
|
|
|
private bool HasValidActiveUsersCache(AccessQueueConfig config, DateTime now)
|
|
{
|
|
return config.CacheMilliseconds.HasValue && _cachedActiveUsers.HasValue && (now - _cachedActiveUsersTime).TotalMilliseconds < config.CacheMilliseconds.Value;
|
|
}
|
|
|
|
private int GetOpenSpots(AccessQueueConfig config, int activeUsers)
|
|
{
|
|
return config.CapacityLimit.Value - activeUsers;
|
|
}
|
|
|
|
private int UpdateActiveUsersCache(DateTime now, DateTime activeCutoff)
|
|
{
|
|
_cachedActiveUsers = _accessTickets.Count(t => t.Value.ExpiresOn > now && t.Value.LastActive > activeCutoff);
|
|
_cachedActiveUsersTime = now;
|
|
return _cachedActiveUsers.GetValueOrDefault();
|
|
}
|
|
|
|
private int DequeueUsersUntilFull(int openSpots, DateTime now, DateTime activeCutoff, AccessQueueConfig config)
|
|
{
|
|
int filledSpots = 0;
|
|
while (filledSpots < openSpots && _nowServing < _nextUnusedTicket)
|
|
{
|
|
if (_accessQueue.TryRemove(_nowServing++, out var nextUser))
|
|
{
|
|
_queueNumbers.TryRemove(nextUser.UserId, out _);
|
|
if (nextUser.LastActive < activeCutoff)
|
|
{
|
|
// User is inactive, throw away their ticket
|
|
continue;
|
|
}
|
|
_accessTickets[nextUser.UserId] = new AccessTicket
|
|
{
|
|
UserId = nextUser.UserId,
|
|
ExpiresOn = now.AddSeconds(config.ExpirationSeconds ?? 0),
|
|
LastActive = now
|
|
};
|
|
filledSpots++;
|
|
}
|
|
}
|
|
return filledSpots;
|
|
}
|
|
|
|
public bool DidDequeueUntilFull(AccessQueueConfig config)
|
|
{
|
|
var now = DateTime.UtcNow;
|
|
if (!config.ActivitySeconds.HasValue || !config.ExpirationSeconds.HasValue || !config.CapacityLimit.HasValue)
|
|
{
|
|
throw new Exception("Required config values are not defined.");
|
|
}
|
|
var activeCutoff = now.AddSeconds(-config.ActivitySeconds.Value);
|
|
|
|
int activeUsers;
|
|
if (HasValidActiveUsersCache(config, now))
|
|
{
|
|
activeUsers = _cachedActiveUsers.GetValueOrDefault();
|
|
}
|
|
else
|
|
{
|
|
activeUsers = UpdateActiveUsersCache(now, activeCutoff);
|
|
}
|
|
|
|
var openSpots = GetOpenSpots(config, activeUsers);
|
|
if (openSpots <= 0)
|
|
{
|
|
return true;
|
|
}
|
|
|
|
int filledSpots = DequeueUsersUntilFull(openSpots, now, activeCutoff, config);
|
|
|
|
// Invalidate cache if any users were granted access
|
|
if (filledSpots > 0)
|
|
{
|
|
_cachedActiveUsers = null;
|
|
_cachedActiveUsersTime = DateTime.MinValue;
|
|
}
|
|
return filledSpots == openSpots;
|
|
}
|
|
|
|
public AccessTicket? GetTicket(string userId)
|
|
{
|
|
return _accessTickets.TryGetValue(userId, out var ticket) ? ticket : null;
|
|
}
|
|
|
|
public void UpsertTicket(AccessTicket ticket)
|
|
{
|
|
_accessTickets[ticket.UserId] = ticket;
|
|
_cachedActiveUsers = null;
|
|
_cachedActiveUsersTime = DateTime.MinValue;
|
|
}
|
|
|
|
public bool RemoveUser(string userId)
|
|
{
|
|
if (_queueNumbers.TryRemove(userId, out var queueNumber))
|
|
{
|
|
_accessQueue.TryRemove(queueNumber, out _);
|
|
}
|
|
return _accessTickets.TryRemove(userId, out _);
|
|
}
|
|
|
|
internal void Optimize()
|
|
{
|
|
var newQueue = new ConcurrentDictionary<ulong, AccessTicket>();
|
|
var newQueueNumbers = new ConcurrentDictionary<string, ulong>();
|
|
ulong newIndex = 0;
|
|
for (ulong i = _nowServing; i < _nextUnusedTicket; i++)
|
|
{
|
|
if (_accessQueue.TryGetValue(i, out var user))
|
|
{
|
|
newQueue[newIndex] = user;
|
|
newQueueNumbers[user.UserId] = newIndex++;
|
|
}
|
|
}
|
|
_accessQueue = newQueue;
|
|
_queueNumbers = newQueueNumbers;
|
|
_nowServing = 0;
|
|
_nextUnusedTicket = newIndex;
|
|
}
|
|
|
|
public TakeANumberAccessQueueRepoState ToState()
|
|
{
|
|
var state = new TakeANumberAccessQueueRepoState
|
|
{
|
|
AccessTickets = new Dictionary<string, AccessTicket>(_accessTickets),
|
|
AccessQueue = new Dictionary<ulong, AccessTicket>(_accessQueue),
|
|
};
|
|
return state;
|
|
}
|
|
|
|
public static AccessQueueRepository FromState(TakeANumberAccessQueueRepoState state)
|
|
{
|
|
if (state?.AccessTickets == null || state?.AccessQueue == null)
|
|
{
|
|
return new();
|
|
}
|
|
|
|
var _accessTickets = new ConcurrentDictionary<string, AccessTicket>(state.AccessTickets);
|
|
var _accessQueue = new ConcurrentDictionary<ulong, AccessTicket>(state.AccessQueue);
|
|
var _nextUnusedTicket = 0ul;
|
|
var _nowServing = ulong.MaxValue;
|
|
var _queueNumbers = new ConcurrentDictionary<string, ulong>();
|
|
foreach (var queueItem in state.AccessQueue)
|
|
{
|
|
_queueNumbers[queueItem.Value.UserId] = queueItem.Key;
|
|
_nextUnusedTicket = Math.Max(_nextUnusedTicket, queueItem.Key + 1);
|
|
_nowServing = Math.Min(_nowServing, queueItem.Key);
|
|
}
|
|
|
|
if (_nowServing == ulong.MaxValue)
|
|
{
|
|
_nowServing = 0;
|
|
}
|
|
|
|
return new()
|
|
{
|
|
_accessQueue = _accessQueue,
|
|
_accessTickets = _accessTickets,
|
|
_nextUnusedTicket = _nextUnusedTicket,
|
|
_nowServing = _nowServing,
|
|
_queueNumbers = _queueNumbers
|
|
};
|
|
}
|
|
}
|
|
}
|