AccessQueueService/AccessQueueService/Data/TakeANumberAccessQueueRepo.cs

180 lines
6.6 KiB
C#

using System.Collections.Concurrent;
using System.Runtime.Serialization;
using System.Text.Json;
using AccessQueueService.Models;
using Microsoft.Extensions.Configuration;
namespace AccessQueueService.Data
{
public class TakeANumberAccessQueueRepo : IAccessQueueRepo
{
private ConcurrentDictionary<string, AccessTicket> _accessTickets = new();
private ConcurrentDictionary<string, ulong> _queueNumbers = new();
private ConcurrentDictionary<ulong, AccessTicket> _accessQueue = new();
internal ulong _nowServing = 0;
internal ulong _nextUnusedTicket = 0;
public int GetUnexpiredTicketsCount() => _accessTickets.Count(t => t.Value.ExpiresOn > DateTime.UtcNow);
public int GetActiveTicketsCount(DateTime activeCutoff) => _accessTickets
.Count(t => t.Value.ExpiresOn > DateTime.UtcNow && t.Value.LastActive > activeCutoff);
public int GetQueueCount() => (int)(_nextUnusedTicket - _nowServing);
public int GetRequestsAhead(string userId)
{
if (_queueNumbers.TryGetValue(userId, out var queueNumber))
{
if (_accessQueue.TryGetValue(queueNumber, out var ticket))
{
ticket.LastActive = DateTime.UtcNow;
return queueNumber >= _nowServing ? (int)(queueNumber - _nowServing) : -1;
}
}
return -1;
}
public void Enqueue(AccessTicket ticket)
{
if (_nextUnusedTicket >= long.MaxValue)
{
// Prevent overflow
Optimize();
}
_queueNumbers[ticket.UserId] = _nextUnusedTicket;
_accessQueue[_nextUnusedTicket] = ticket;
_nextUnusedTicket++;
}
public int DeleteExpiredTickets()
{
var cutoff = DateTime.UtcNow;
var expiredTickets = _accessTickets.Where(t => t.Value.ExpiresOn < cutoff).ToList();
int count = 0;
foreach (var ticket in expiredTickets)
{
count++;
_accessTickets.TryRemove(ticket.Key, out _);
}
return count;
}
public bool DidDequeueUntilFull(int activeSeconds, int expirationSeconds, int capacityLimit)
{
var now = DateTime.UtcNow;
var activeCutoff = now.AddSeconds(-activeSeconds);
var numberOfActiveUsers = _accessTickets.Count(t => t.Value.ExpiresOn > now && t.Value.LastActive > activeCutoff);
var openSpots = capacityLimit - numberOfActiveUsers;
if (openSpots <= 0)
{
return true;
}
int filledSpots = 0;
while (filledSpots < openSpots && _nowServing < _nextUnusedTicket)
{
if (_accessQueue.TryRemove(_nowServing, out var nextUser))
{
_queueNumbers.TryRemove(nextUser.UserId, out _);
if (nextUser.LastActive < activeCutoff)
{
// User is inactive, throw away their ticket
_nowServing++;
continue;
}
_accessTickets[nextUser.UserId] = new AccessTicket
{
UserId = nextUser.UserId,
ExpiresOn = now.AddSeconds(expirationSeconds),
LastActive = now
};
filledSpots++;
}
_nowServing++;
}
return filledSpots == openSpots;
}
public AccessTicket? GetTicket(string userId)
{
return _accessTickets.TryGetValue(userId, out var ticket) ? ticket : null;
}
public void UpsertTicket(AccessTicket ticket)
{
_accessTickets[ticket.UserId] = ticket;
}
public bool RemoveUser(string userId)
{
if (_queueNumbers.TryRemove(userId, out var queueNumber))
{
_accessQueue.TryRemove(queueNumber, out _);
}
return _accessTickets.TryRemove(userId, out _);
}
internal void Optimize()
{
var newQueue = new ConcurrentDictionary<ulong, AccessTicket>();
var newQueueNumbers = new ConcurrentDictionary<string, ulong>();
ulong newIndex = 0;
for (ulong i = _nowServing; i < _nextUnusedTicket; i++)
{
if (_accessQueue.TryGetValue(i, out var user))
{
newQueue[newIndex] = user;
newQueueNumbers[user.UserId] = newIndex++;
}
}
_accessQueue = newQueue;
_queueNumbers = newQueueNumbers;
_nowServing = 0;
_nextUnusedTicket = newIndex;
}
public string ToState()
{
var state = new TakeANumberAccessQueueRepoState
{
AccessTickets = new Dictionary<string, AccessTicket>(_accessTickets),
AccessQueue = new Dictionary<ulong, AccessTicket>(_accessQueue),
};
return JsonSerializer.Serialize(state);
}
public static TakeANumberAccessQueueRepo FromState(string stateJson)
{
var state = JsonSerializer.Deserialize<TakeANumberAccessQueueRepoState?>(stateJson);
if (state?.AccessTickets == null || state?.AccessQueue == null)
{
return new();
}
var _accessTickets = new ConcurrentDictionary<string, AccessTicket>(state.AccessTickets);
var _accessQueue = new ConcurrentDictionary<ulong, AccessTicket>(state.AccessQueue);
var _nextUnusedTicket = 0ul;
var _nowServing = ulong.MaxValue;
var _queueNumbers = new ConcurrentDictionary<string, ulong>();
foreach (var queueItem in state.AccessQueue)
{
_queueNumbers[queueItem.Value.UserId] = queueItem.Key;
_nextUnusedTicket = Math.Max(_nextUnusedTicket, queueItem.Key + 1);
_nowServing = Math.Min(_nowServing, queueItem.Key);
}
if (_nowServing == ulong.MaxValue)
{
_nowServing = 0;
}
return new()
{
_accessQueue = _accessQueue,
_accessTickets = _accessTickets,
_nextUnusedTicket = _nextUnusedTicket,
_nowServing = _nowServing,
_queueNumbers = _queueNumbers
};
}
}
}