Skip to content

Commit b245eb8

Browse files
committed
Spiked HttpResponsePipeWriter
1 parent 5fd1db2 commit b245eb8

26 files changed

+616
-77
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Buffers;
6+
using System.IO;
7+
using System.IO.Pipelines;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Microsoft.AspNetCore.Internal;
11+
12+
namespace Microsoft.AspNetCore.WebUtilities
13+
{
14+
// TODO: Implement spooling to disk
15+
// For perf it would be nice to write straight to the pipe until a threshold, to optimize performance for small payloads.
16+
// Later switch to a file stream approach, probably keeping a buffer to write in large chunks to disk.
17+
// This approach would be faster for small write but larger memory usage for big ones, so next will require to find new thresholds.
18+
// However it would probbaly not meet the buffering requirement.
19+
/// <summary>
20+
/// A <see cref="Stream"/> that buffers content to be written to disk.
21+
/// </summary>
22+
public sealed class FileBufferingPipeWriter : PipeWriter, IAsyncDisposable
23+
{
24+
private const int DefaultMemoryThreshold = 32 * 1024; // 32k
25+
private readonly PipeWriter _pipeWriter;
26+
private readonly int _memoryThreshold;
27+
private readonly long? _bufferLimit;
28+
private readonly Func<string> _tempFileDirectoryAccessor;
29+
30+
/// <summary>
31+
/// Initializes a new instance of <see cref="FileBufferingWriteStream"/>.
32+
/// </summary>
33+
/// <param name="pipeWriter"></param>
34+
/// <param name="memoryThreshold">
35+
/// The maximum amount of memory in bytes to allocate before switching to a file on disk.
36+
/// Defaults to 32kb.
37+
/// </param>
38+
/// <param name="bufferLimit">
39+
/// The maximum amount of bytes that the <see cref="FileBufferingWriteStream"/> is allowed to buffer.
40+
/// </param>
41+
/// <param name="tempFileDirectoryAccessor">Provides the location of the directory to write buffered contents to.
42+
/// When unspecified, uses the value specified by the environment variable <c>ASPNETCORE_TEMP</c> if available, otherwise
43+
/// uses the value returned by <see cref="Path.GetTempPath"/>.
44+
/// </param>
45+
public FileBufferingPipeWriter(
46+
PipeWriter pipeWriter,
47+
int memoryThreshold = DefaultMemoryThreshold,
48+
long? bufferLimit = null,
49+
Func<string>? tempFileDirectoryAccessor = null)
50+
{
51+
if (memoryThreshold < 0)
52+
{
53+
throw new ArgumentOutOfRangeException(nameof(memoryThreshold));
54+
}
55+
56+
if (bufferLimit != null && bufferLimit < memoryThreshold)
57+
{
58+
// We would expect a limit at least as much as memoryThreshold
59+
throw new ArgumentOutOfRangeException(nameof(bufferLimit), $"{nameof(bufferLimit)} must be larger than {nameof(memoryThreshold)}.");
60+
}
61+
62+
_pipeWriter = pipeWriter;
63+
_memoryThreshold = memoryThreshold;
64+
_bufferLimit = bufferLimit;
65+
_tempFileDirectoryAccessor = tempFileDirectoryAccessor ?? AspNetCoreTempDirectory.TempDirectoryFactory;
66+
PagedByteBuffer = new PagedByteBuffer(ArrayPool<byte>.Shared);
67+
}
68+
69+
internal PagedByteBuffer PagedByteBuffer { get; }
70+
71+
internal FileStream? FileStream { get; private set; }
72+
73+
internal bool Disposed { get; private set; }
74+
75+
private void EnsureFileStream()
76+
{
77+
if (FileStream == null)
78+
{
79+
var tempFileDirectory = _tempFileDirectoryAccessor();
80+
var tempFileName = Path.Combine(tempFileDirectory, "ASPNETCORE_" + Guid.NewGuid() + ".tmp");
81+
FileStream = new FileStream(
82+
tempFileName,
83+
FileMode.Create,
84+
FileAccess.Write,
85+
FileShare.Delete | FileShare.ReadWrite,
86+
bufferSize: 1,
87+
FileOptions.SequentialScan | FileOptions.DeleteOnClose);
88+
}
89+
}
90+
91+
private void ThrowIfDisposed()
92+
{
93+
if (Disposed)
94+
{
95+
throw new ObjectDisposedException(nameof(FileBufferingWriteStream));
96+
}
97+
}
98+
99+
private static void ThrowArgumentException(byte[] buffer, int offset, int count)
100+
{
101+
if (buffer == null)
102+
{
103+
throw new ArgumentNullException(nameof(buffer));
104+
}
105+
106+
if (offset < 0)
107+
{
108+
throw new ArgumentOutOfRangeException(nameof(offset));
109+
}
110+
111+
if (count < 0)
112+
{
113+
throw new ArgumentOutOfRangeException(nameof(count));
114+
}
115+
116+
if (buffer.Length - offset < count)
117+
{
118+
throw new ArgumentOutOfRangeException(nameof(offset));
119+
}
120+
}
121+
122+
public override void Advance(int bytes)
123+
{
124+
_pipeWriter.Advance(bytes);
125+
}
126+
127+
public override void CancelPendingFlush()
128+
{
129+
_pipeWriter.CancelPendingFlush();
130+
}
131+
132+
public override void Complete(Exception? exception = null)
133+
{
134+
_pipeWriter.Complete(exception);
135+
}
136+
137+
public override Memory<byte> GetMemory(int sizeHint = 0)
138+
{
139+
return _pipeWriter.GetMemory(sizeHint);
140+
}
141+
142+
public override Span<byte> GetSpan(int sizeHint = 0)
143+
{
144+
return _pipeWriter.GetSpan(sizeHint);
145+
}
146+
147+
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
148+
{
149+
return _pipeWriter.FlushAsync(cancellationToken);
150+
}
151+
152+
public Task DrainBufferAsync()
153+
{
154+
return Task.CompletedTask;
155+
}
156+
157+
public async ValueTask DisposeAsync()
158+
{
159+
// TODO: perf
160+
await FlushAsync();
161+
Disposed = true;
162+
}
163+
}
164+
}

0 commit comments

Comments
 (0)