From 4f6d0d001014187313a27d241e7bd4179d588cfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=9B=BD=E4=BC=9F?= <366193849@qq.com> Date: Tue, 9 Nov 2021 14:11:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0DelegatingStream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DelegatingDuplexPipe.cs | 44 +++++ FastGithub.FlowAnalyze/DelegatingStream.cs | 142 ++++++++++++++ FastGithub.FlowAnalyze/DuplexPipeStream.cs | 167 ----------------- .../DuplexPipeStreamAdapter.cs | 53 ------ .../DuplexPipeStreamExtensions.cs | 175 ++++++++++++++++++ .../FlowAnalyzeDuplexPipe.cs | 6 +- FastGithub.FlowAnalyze/FlowAnalyzeStream.cs | 108 +---------- 7 files changed, 374 insertions(+), 321 deletions(-) create mode 100644 FastGithub.FlowAnalyze/DelegatingDuplexPipe.cs create mode 100644 FastGithub.FlowAnalyze/DelegatingStream.cs delete mode 100644 FastGithub.FlowAnalyze/DuplexPipeStream.cs delete mode 100644 FastGithub.FlowAnalyze/DuplexPipeStreamAdapter.cs create mode 100644 FastGithub.FlowAnalyze/DuplexPipeStreamExtensions.cs diff --git a/FastGithub.FlowAnalyze/DelegatingDuplexPipe.cs b/FastGithub.FlowAnalyze/DelegatingDuplexPipe.cs new file mode 100644 index 0000000..8489b2d --- /dev/null +++ b/FastGithub.FlowAnalyze/DelegatingDuplexPipe.cs @@ -0,0 +1,44 @@ +using System; +using System.IO; +using System.IO.Pipelines; +using System.Threading.Tasks; + +namespace FastGithub.FlowAnalyze +{ + class DelegatingDuplexPipe : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream + { + private bool disposed; + private readonly object syncRoot = new(); + + public PipeReader Input { get; } + + public PipeWriter Output { get; } + + public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func delegatingStreamFactory) : + this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), delegatingStreamFactory) + { + } + + public DelegatingDuplexPipe(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func delegatingStreamFactory) + { + var delegatingStream = delegatingStreamFactory(duplexPipe.AsStream()); + this.Input = PipeReader.Create(delegatingStream, readerOptions); + this.Output = PipeWriter.Create(delegatingStream, writerOptions); + } + + public virtual async ValueTask DisposeAsync() + { + lock (this.syncRoot) + { + if (this.disposed == true) + { + return; + } + this.disposed = true; + } + + await this.Input.CompleteAsync(); + await this.Output.CompleteAsync(); + } + } +} \ No newline at end of file diff --git a/FastGithub.FlowAnalyze/DelegatingStream.cs b/FastGithub.FlowAnalyze/DelegatingStream.cs new file mode 100644 index 0000000..0e36d0a --- /dev/null +++ b/FastGithub.FlowAnalyze/DelegatingStream.cs @@ -0,0 +1,142 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace FastGithub.FlowAnalyze +{ + abstract class DelegatingStream : Stream + { + public Stream Inner { get; } + + public DelegatingStream(Stream inner) + { + this.Inner = inner; + } + + public override bool CanRead + { + get + { + return this.Inner.CanRead; + } + } + + public override bool CanSeek + { + get + { + return this.Inner.CanSeek; + } + } + + public override bool CanWrite + { + get + { + return this.Inner.CanWrite; + } + } + + public override long Length + { + get + { + return this.Inner.Length; + } + } + + public override long Position + { + get + { + return this.Inner.Position; + } + + set + { + this.Inner.Position = value; + } + } + + public override void Flush() + { + this.Inner.Flush(); + } + + public override Task FlushAsync(CancellationToken cancellationToken) + { + return this.Inner.FlushAsync(cancellationToken); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return this.Inner.Read(buffer, offset, count); + } + + public override int Read(Span destination) + { + return this.Inner.Read(destination); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return this.Inner.ReadAsync(buffer, offset, count, cancellationToken); + } + + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return this.Inner.ReadAsync(destination, cancellationToken); + } + + public override long Seek(long offset, SeekOrigin origin) + { + return this.Inner.Seek(offset, origin); + } + + public override void SetLength(long value) + { + this.Inner.SetLength(value); + } + + public override void Write(byte[] buffer, int offset, int count) + { + this.Inner.Write(buffer, offset, count); + } + + public override void Write(ReadOnlySpan source) + { + this.Inner.Write(source); + } + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + return this.Inner.WriteAsync(buffer, offset, count, cancellationToken); + } + + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + return this.Inner.WriteAsync(source, cancellationToken); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + TaskToApm.End(asyncResult); + } + } +} diff --git a/FastGithub.FlowAnalyze/DuplexPipeStream.cs b/FastGithub.FlowAnalyze/DuplexPipeStream.cs deleted file mode 100644 index bb839a9..0000000 --- a/FastGithub.FlowAnalyze/DuplexPipeStream.cs +++ /dev/null @@ -1,167 +0,0 @@ -using System; -using System.Buffers; -using System.IO; -using System.IO.Pipelines; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; - -namespace FastGithub.FlowAnalyze -{ - class DuplexPipeStream : Stream - { - private readonly PipeReader _input; - private readonly PipeWriter _output; - private readonly bool _throwOnCancelled; - private volatile bool _cancelCalled; - - public DuplexPipeStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false) - { - _input = input; - _output = output; - _throwOnCancelled = throwOnCancelled; - } - - public void CancelPendingRead() - { - _cancelCalled = true; - _input.CancelPendingRead(); - } - - public override bool CanRead => true; - - public override bool CanSeek => false; - - public override bool CanWrite => true; - - public override long Length - { - get - { - throw new NotSupportedException(); - } - } - - public override long Position - { - get - { - throw new NotSupportedException(); - } - set - { - throw new NotSupportedException(); - } - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - public override int Read(byte[] buffer, int offset, int count) - { - ValueTask vt = ReadAsyncInternal(new Memory(buffer, offset, count), default); - return vt.IsCompleted ? - vt.Result : - vt.AsTask().GetAwaiter().GetResult(); - } - - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) - { - return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); - } - - public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) - { - return ReadAsyncInternal(destination, cancellationToken); - } - - public override void Write(byte[] buffer, int offset, int count) - { - WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); - } - - public override async Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken) - { - await _output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken) ; - } - - public override async ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) - { - await _output.WriteAsync(source, cancellationToken); - } - - public override void Flush() - { - FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); - } - - public override async Task FlushAsync(CancellationToken cancellationToken) - { - await _output.FlushAsync(cancellationToken); - } - - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] - private async ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) - { - while (true) - { - var result = await _input.ReadAsync(cancellationToken); - var readableBuffer = result.Buffer; - try - { - if (_throwOnCancelled && result.IsCanceled && _cancelCalled) - { - // Reset the bool - _cancelCalled = false; - throw new OperationCanceledException(); - } - - if (!readableBuffer.IsEmpty) - { - // buffer.Count is int - var count = (int)Math.Min(readableBuffer.Length, destination.Length); - readableBuffer = readableBuffer.Slice(0, count); - readableBuffer.CopyTo(destination.Span); - return count; - } - - if (result.IsCompleted) - { - return 0; - } - } - finally - { - _input.AdvanceTo(readableBuffer.End, readableBuffer.End); - } - } - } - - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); - } - - public override int EndRead(IAsyncResult asyncResult) - { - return TaskToApm.End(asyncResult); - } - - public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); - } - - public override void EndWrite(IAsyncResult asyncResult) - { - TaskToApm.End(asyncResult); - } - } -} diff --git a/FastGithub.FlowAnalyze/DuplexPipeStreamAdapter.cs b/FastGithub.FlowAnalyze/DuplexPipeStreamAdapter.cs deleted file mode 100644 index 4f4ffa0..0000000 --- a/FastGithub.FlowAnalyze/DuplexPipeStreamAdapter.cs +++ /dev/null @@ -1,53 +0,0 @@ -using System; -using System.IO; -using System.IO.Pipelines; -using System.Threading.Tasks; - -namespace FastGithub.FlowAnalyze -{ - class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream - { - private bool _disposed; - private readonly object _disposeLock = new object(); - - public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : - this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) - { - } - - public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : - base(duplexPipe.Input, duplexPipe.Output) - { - var stream = createStream(this); - Stream = stream; - Input = PipeReader.Create(stream, readerOptions); - Output = PipeWriter.Create(stream, writerOptions); - } - - public TStream Stream { get; } - - public PipeReader Input { get; } - - public PipeWriter Output { get; } - - public override async ValueTask DisposeAsync() - { - lock (_disposeLock) - { - if (_disposed) - { - return; - } - _disposed = true; - } - - await Input.CompleteAsync(); - await Output.CompleteAsync(); - } - - protected override void Dispose(bool disposing) - { - throw new NotSupportedException(); - } - } -} \ No newline at end of file diff --git a/FastGithub.FlowAnalyze/DuplexPipeStreamExtensions.cs b/FastGithub.FlowAnalyze/DuplexPipeStreamExtensions.cs new file mode 100644 index 0000000..adebfa8 --- /dev/null +++ b/FastGithub.FlowAnalyze/DuplexPipeStreamExtensions.cs @@ -0,0 +1,175 @@ +using System; +using System.Buffers; +using System.IO; +using System.IO.Pipelines; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace FastGithub.FlowAnalyze +{ + static class DuplexPipeStreamExtensions + { + public static Stream AsStream(this IDuplexPipe duplexPipe, bool throwOnCancelled = false) + { + return new DuplexPipeStream(duplexPipe, throwOnCancelled); + } + + private class DuplexPipeStream : Stream + { + private readonly PipeReader input; + private readonly PipeWriter output; + private readonly bool throwOnCancelled; + private volatile bool cancelCalled; + + public DuplexPipeStream(IDuplexPipe duplexPipe, bool throwOnCancelled = false) + { + this.input = duplexPipe.Input; + this.output = duplexPipe.Output; + this.throwOnCancelled = throwOnCancelled; + } + + public void CancelPendingRead() + { + this.cancelCalled = true; + this.input.CancelPendingRead(); + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + public override long Length + { + get + { + throw new NotSupportedException(); + } + } + + public override long Position + { + get + { + throw new NotSupportedException(); + } + set + { + throw new NotSupportedException(); + } + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new NotSupportedException(); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override int Read(byte[] buffer, int offset, int count) + { + ValueTask vt = ReadAsyncInternal(new Memory(buffer, offset, count), default); + return vt.IsCompleted ? + vt.Result : + vt.AsTask().GetAwaiter().GetResult(); + } + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(new Memory(buffer, offset, count), cancellationToken).AsTask(); + } + + public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) + { + return ReadAsyncInternal(destination, cancellationToken); + } + + public override void Write(byte[] buffer, int offset, int count) + { + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + + public override async Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken) + { + await this.output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken); + } + + public override async ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + await this.output.WriteAsync(source, cancellationToken); + } + + public override void Flush() + { + FlushAsync(CancellationToken.None).GetAwaiter().GetResult(); + } + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + await this.output.FlushAsync(cancellationToken); + } + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] + private async ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) + { + while (true) + { + var result = await this.input.ReadAsync(cancellationToken); + var readableBuffer = result.Buffer; + try + { + if (this.throwOnCancelled && result.IsCanceled && this.cancelCalled) + { + // Reset the bool + this.cancelCalled = false; + throw new OperationCanceledException(); + } + + if (!readableBuffer.IsEmpty) + { + // buffer.Count is int + var count = (int)Math.Min(readableBuffer.Length, destination.Length); + readableBuffer = readableBuffer.Slice(0, count); + readableBuffer.CopyTo(destination.Span); + return count; + } + + if (result.IsCompleted) + { + return 0; + } + } + finally + { + this.input.AdvanceTo(readableBuffer.End, readableBuffer.End); + } + } + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); + } + + public override int EndRead(IAsyncResult asyncResult) + { + return TaskToApm.End(asyncResult); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); + } + + public override void EndWrite(IAsyncResult asyncResult) + { + TaskToApm.End(asyncResult); + } + } + } +} diff --git a/FastGithub.FlowAnalyze/FlowAnalyzeDuplexPipe.cs b/FastGithub.FlowAnalyze/FlowAnalyzeDuplexPipe.cs index e7bbdec..15e3861 100644 --- a/FastGithub.FlowAnalyze/FlowAnalyzeDuplexPipe.cs +++ b/FastGithub.FlowAnalyze/FlowAnalyzeDuplexPipe.cs @@ -2,10 +2,10 @@ namespace FastGithub.FlowAnalyze { - sealed class FlowAnalyzeDuplexPipe : DuplexPipeStreamAdapter + sealed class FlowAnalyzeDuplexPipe : DelegatingDuplexPipe { - public FlowAnalyzeDuplexPipe(IDuplexPipe transport, IFlowAnalyzer flowAnalyzer) : - base(transport, stream => new FlowAnalyzeStream(stream, flowAnalyzer)) + public FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) : + base(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer)) { } } diff --git a/FastGithub.FlowAnalyze/FlowAnalyzeStream.cs b/FastGithub.FlowAnalyze/FlowAnalyzeStream.cs index d9a9afe..b0a6fc5 100644 --- a/FastGithub.FlowAnalyze/FlowAnalyzeStream.cs +++ b/FastGithub.FlowAnalyze/FlowAnalyzeStream.cs @@ -5,155 +5,67 @@ using System.Threading.Tasks; namespace FastGithub.FlowAnalyze { - - sealed class FlowAnalyzeStream : Stream + sealed class FlowAnalyzeStream : DelegatingStream { - private readonly Stream inner; private readonly IFlowAnalyzer flowAnalyzer; public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer) + : base(inner) { - this.inner = inner; this.flowAnalyzer = flowAnalyzer; } - public override bool CanRead - { - get - { - return inner.CanRead; - } - } - - public override bool CanSeek - { - get - { - return inner.CanSeek; - } - } - - public override bool CanWrite - { - get - { - return inner.CanWrite; - } - } - - public override long Length - { - get - { - return inner.Length; - } - } - - public override long Position - { - get - { - return inner.Position; - } - - set - { - inner.Position = value; - } - } - - public override void Flush() - { - inner.Flush(); - } - - public override Task FlushAsync(CancellationToken cancellationToken) - { - return inner.FlushAsync(cancellationToken); - } - public override int Read(byte[] buffer, int offset, int count) { - int read = inner.Read(buffer, offset, count); + int read = base.Read(buffer, offset, count); this.flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override int Read(Span destination) { - int read = inner.Read(destination); + int read = base.Read(destination); this.flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - int read = await inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken); + int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken); this.flowAnalyzer.OnFlow(FlowType.Read, read); return read; } public override async ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) { - int read = await inner.ReadAsync(destination, cancellationToken); + int read = await base.ReadAsync(destination, cancellationToken); this.flowAnalyzer.OnFlow(FlowType.Read, read); return read; } - public override long Seek(long offset, SeekOrigin origin) - { - return inner.Seek(offset, origin); - } - - public override void SetLength(long value) - { - inner.SetLength(value); - } public override void Write(byte[] buffer, int offset, int count) { this.flowAnalyzer.OnFlow(FlowType.Wirte, count); - inner.Write(buffer, offset, count); + base.Write(buffer, offset, count); } public override void Write(ReadOnlySpan source) { this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); - inner.Write(source); + base.Write(source); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { this.flowAnalyzer.OnFlow(FlowType.Wirte, count); - return inner.WriteAsync(buffer, offset, count, cancellationToken); + return base.WriteAsync(buffer, offset, count, cancellationToken); } public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) { this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length); - return inner.WriteAsync(source, cancellationToken); - } - - - // The below APM methods call the underlying Read/WriteAsync methods which will still be logged. - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state); - } - - public override int EndRead(IAsyncResult asyncResult) - { - return TaskToApm.End(asyncResult); - } - - public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) - { - return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state); - } - - public override void EndWrite(IAsyncResult asyncResult) - { - TaskToApm.End(asyncResult); + return base.WriteAsync(source, cancellationToken); } } }