diff --git a/FastGithub.FlowAnalyze/FlowAnalyzer.cs b/FastGithub.FlowAnalyze/FlowAnalyzer.cs index 6fee15b..fd2b8d0 100644 --- a/FastGithub.FlowAnalyze/FlowAnalyzer.cs +++ b/FastGithub.FlowAnalyze/FlowAnalyzer.cs @@ -8,12 +8,8 @@ namespace FastGithub.FlowAnalyze sealed class FlowAnalyzer : IFlowAnalyzer { private const int INTERVAL_SECONDS = 5; - private long totalRead = 0; - private long totalWrite = 0; - private readonly ConcurrentQueue readQueue = new(); - private readonly ConcurrentQueue writeQueue = new(); - - private record QueueItem(long Ticks, int Length); + private readonly FlowQueues readQueues = new(INTERVAL_SECONDS); + private readonly FlowQueues writeQueues = new(INTERVAL_SECONDS); /// /// 收到数据 @@ -24,65 +20,85 @@ namespace FastGithub.FlowAnalyze { if (flowType == FlowType.Read) { - Interlocked.Add(ref this.totalRead, length); - Add(this.readQueue, length); + this.readQueues.OnFlow(length); } else { - Interlocked.Add(ref this.totalWrite, length); - Add(this.writeQueue, length); + this.writeQueues.OnFlow(length); } } - private static void Add(ConcurrentQueue quques, int length) - { - var ticks = Flush(quques); - quques.Enqueue(new QueueItem(ticks, length)); - } - - /// - /// 刷新队列 - /// - /// - /// - private static long Flush(ConcurrentQueue quques) - { - var ticks = Environment.TickCount64; - while (quques.TryPeek(out var item)) - { - if (ticks - item.Ticks < INTERVAL_SECONDS * 1000) - { - break; - } - else - { - quques.TryDequeue(out _); - } - } - return ticks; - } - - - /// /// 获取流量分析 /// /// public FlowStatistics GetFlowStatistics() { - Flush(this.readQueue); - var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS; - - Flush(this.writeQueue); - var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS; - return new FlowStatistics { - TotalRead = this.totalRead, - TotalWrite = this.totalWrite, - ReadRate = readRate, - WriteRate = writeRate + TotalRead = this.readQueues.TotalBytes, + TotalWrite = this.writeQueues.TotalBytes, + ReadRate = this.readQueues.GetRate(), + WriteRate = this.writeQueues.GetRate() }; } + + private class FlowQueues + { + private int cleaning = 0; + private long totalBytes = 0L; + private record QueueItem(long Ticks, int Length); + private readonly ConcurrentQueue queues = new(); + + private readonly int intervalSeconds; + + public long TotalBytes => this.totalBytes; + + public FlowQueues(int intervalSeconds) + { + this.intervalSeconds = intervalSeconds; + } + + public void OnFlow(int length) + { + Interlocked.Add(ref this.totalBytes, length); + this.CleanInvalidRecords(); + this.queues.Enqueue(new QueueItem(Environment.TickCount64, length)); + } + + public double GetRate() + { + this.CleanInvalidRecords(); + return (double)this.queues.Sum(item => item.Length) / this.intervalSeconds; + } + + /// + /// 清除无效记录 + /// + /// + private bool CleanInvalidRecords() + { + if (Interlocked.CompareExchange(ref this.cleaning, 1, 0) != 0) + { + return false; + } + + var ticks = Environment.TickCount64; + while (this.queues.TryPeek(out var item)) + { + if (ticks - item.Ticks < this.intervalSeconds * 1000) + { + break; + } + else + { + this.queues.TryDequeue(out _); + } + } + + Interlocked.Exchange(ref this.cleaning, 0); + return true; + } + } } }