Remove stream-like functionality from ContentReader/Writer

Use streams instead
This commit is contained in:
2023-09-15 21:46:17 +02:00
parent f8d759f044
commit d8c9929a97
31 changed files with 313 additions and 310 deletions

View File

@@ -70,7 +70,8 @@ public class ToolUserCommandHandlerService : UserCommandHandlerServiceBase
new TypeUserCommandHandler<AddRemoteContentProviderCommand>(AddRemoteContentProvider),
new TypeUserCommandHandler<OpenInDefaultFileExplorerCommand>(OpenInDefaultFileExplorer),
new TypeUserCommandHandler<CopyNativePathCommand>(CopyNativePath),
new TypeUserCommandHandler<CopyBase64Command>(CopyBase64),
//TODO: this should rather base hash instead of base64
//new TypeUserCommandHandler<CopyBase64Command>(CopyBase64),
new TypeUserCommandHandler<EditCommand>(Edit),
new TypeUserCommandHandler<SearchCommand>(Search),
new TypeUserCommandHandler<ScanSizeCommand>(ScanSize),
@@ -92,10 +93,10 @@ public class ToolUserCommandHandlerService : UserCommandHandlerServiceBase
var path = containerNameInput.Value!;
Func<Task<IRemoteConnection>>? connection = null;
Func<ValueTask<IRemoteConnection>>? connection = null;
if (path.StartsWith("http"))
{
connection = async () => await SignalRConnection.GetOrCreateForAsync(path, providerName.Value);
connection = () => SignalRConnection.GetOrCreateForAsync(path, providerName.Value);
}
if (connection is null)
@@ -192,7 +193,7 @@ public class ToolUserCommandHandlerService : UserCommandHandlerServiceBase
await _currentSelectedTab.Tab.Ordering.SetValue(sortItemsCommand.Ordering);
}
private async Task CopyBase64()
/*private async Task CopyBase64()
{
var item = _currentSelectedItem?.Value?.BaseItem;
if (item?.Type != AbsolutePathType.Element || item is not IElement element) return;
@@ -209,7 +210,7 @@ public class ToolUserCommandHandlerService : UserCommandHandlerServiceBase
var base64Hash = Convert.ToBase64String(ms.ToArray());
await _systemClipboardService.CopyToClipboardAsync(base64Hash);
}
}*/
private async Task Search(SearchCommand searchCommand)
{

View File

@@ -49,7 +49,7 @@ public partial class ElementPreviewViewModel : IElementPreviewViewModel, IAsyncI
{
var readerFactory = _contentAccessorFactory.GetContentReaderFactory(element.Provider);
var reader = await readerFactory.CreateContentReaderAsync(element);
await using var inputStream = reader.AsStream();
await using var inputStream = reader.GetStream();
using var pdfDocument = PdfDocument.Open(inputStream);
var contentBuilder = new StringBuilder();

View File

@@ -1,80 +0,0 @@
namespace FileTime.Core.ContentAccess;
public class ContentAccessStream : Stream
{
private readonly IContentReader? _contentReader;
private readonly IContentWriter? _contentWriter;
public override bool CanRead => _contentReader != null;
public override bool CanSeek => _contentReader != null;
public override bool CanWrite => _contentWriter != null;
public override long Length => throw new NotImplementedException();
public override long Position
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}
public ContentAccessStream(IContentReader contentReader)
{
_contentReader = contentReader;
}
public ContentAccessStream(IContentWriter contentWriter)
{
_contentWriter = contentWriter;
}
public override void Flush()
{
if (_contentWriter == null) throw new NotSupportedException();
Task.Run(async () => await _contentWriter.FlushAsync()).Wait();
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_contentReader == null) throw new IOException("This stream is not readable");
var dataTask = Task.Run(async () => await _contentReader.ReadBytesAsync(count, offset));
dataTask.Wait();
var data = dataTask.Result;
if (data.Length > count) throw new Exception("More bytes has been read than requested");
Array.Copy(data, buffer, data.Length);
return data.Length;
}
public override long Seek(long offset, SeekOrigin origin)
{
if (_contentReader == null) throw new NotSupportedException();
var newPosition = origin switch
{
SeekOrigin.Begin => offset,
SeekOrigin.Current => _contentReader.Position ?? 0 + offset,
_ => throw new NotSupportedException()
};
_contentReader.SetPosition(newPosition);
return newPosition;
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (_contentWriter == null) throw new NotSupportedException();
var data = buffer;
if (buffer.Length != count)
{
data = new byte[count];
Array.Copy(buffer, data, count);
}
Task.Run(async () => await _contentWriter.WriteBytesAsync(data, offset)).Wait();
}
}

View File

@@ -7,8 +7,6 @@ namespace FileTime.Core.ContentAccess;
public interface IContentProvider : IContainer, IOnContainerEnter
{
bool SupportsContentStreams { get; }
Task<IItem> GetItemByFullNameAsync(
FullName fullName,
PointInTime pointInTime,

View File

@@ -1,11 +1,5 @@
namespace FileTime.Core.ContentAccess;
public interface IContentReader : IDisposable
public interface IContentReader : IStreamContainer
{
int PreferredBufferSize { get; }
long? Position { get; }
Task<byte[]> ReadBytesAsync(int bufferSize, int? offset = null);
void SetPosition(long position);
Stream AsStream();
}

View File

@@ -1,10 +1,5 @@
namespace FileTime.Core.ContentAccess;
public interface IContentWriter : IDisposable
public interface IContentWriter : IStreamContainer
{
int PreferredBufferSize { get; }
Task WriteBytesAsync(byte[] data, int? index = null, CancellationToken cancellationToken = default);
Task FlushAsync(CancellationToken cancellationToken = default);
Stream AsStream();
}

View File

@@ -0,0 +1,6 @@
namespace FileTime.Core.ContentAccess;
public interface IStreamContainer : IDisposable
{
Stream GetStream();
}

View File

@@ -7,44 +7,14 @@ namespace FileTime.Core.CommandHandlers;
public class StreamCopyCommandHandler : ICommandHandler
{
private readonly IContentProviderRegistry _contentProviderRegistry;
private readonly IContentAccessorFactory _contentAccessorFactory;
public StreamCopyCommandHandler(
IContentProviderRegistry contentProviderRegistry,
IContentAccessorFactory contentAccessorFactory)
public StreamCopyCommandHandler(IContentAccessorFactory contentAccessorFactory)
{
_contentProviderRegistry = contentProviderRegistry;
_contentAccessorFactory = contentAccessorFactory;
}
public async Task<bool> CanHandleAsync(ICommand command)
{
if (command is not CopyCommand copyCommand) return false;
var targetSupportsContentStream =
(await _contentProviderRegistry
.ContentProviders
.ToAsyncEnumerable()
.FirstOrDefaultAwaitAsync(async p => await p.CanHandlePathAsync(copyCommand.Target))
)?.SupportsContentStreams ?? false;
var allSourcesSupportsContentStream =
(await copyCommand
.Sources
.ToAsyncEnumerable()
.SelectAwait(s =>
_contentProviderRegistry
.ContentProviders
.ToAsyncEnumerable()
.FirstOrDefaultAwaitAsync(async p => await p.CanHandlePathAsync(s))
)
.ToListAsync()
)
.All(p => p?.SupportsContentStreams ?? false);
return targetSupportsContentStream && allSourcesSupportsContentStream;
}
public Task<bool> CanHandleAsync(ICommand command) => Task.FromResult(command is CopyCommand);
public async Task ExecuteAsync(ICommand command)
{
@@ -53,7 +23,7 @@ public class StreamCopyCommandHandler : ICommandHandler
await copyCommand.ExecuteAsync(CopyElement);
}
public async Task CopyElement(AbsolutePath sourcePath, AbsolutePath targetPath, CopyCommandContext copyCommandContext)
private async Task CopyElement(AbsolutePath sourcePath, AbsolutePath targetPath, CopyCommandContext copyCommandContext)
{
if (copyCommandContext.CancellationToken.IsCancellationRequested) return;
@@ -72,25 +42,25 @@ public class StreamCopyCommandHandler : ICommandHandler
using var reader = await _contentAccessorFactory.GetContentReaderFactory(source.Provider).CreateContentReaderAsync(source);
using var writer = await _contentAccessorFactory.GetContentWriterFactory(target.Provider).CreateContentWriterAsync(target);
byte[] dataRead;
var readerStream = reader.GetStream();
var writerStream = writer.GetStream();
var dataRead = new byte[1024 * 1024];
long currentProgress = 0;
do
while (true)
{
if (copyCommandContext.CancellationToken.IsCancellationRequested) return;
dataRead = await reader.ReadBytesAsync(writer.PreferredBufferSize);
if (dataRead.Length > 0)
{
await writer.WriteBytesAsync(dataRead, cancellationToken: copyCommandContext.CancellationToken);
await writer.FlushAsync(copyCommandContext.CancellationToken);
currentProgress += dataRead.LongLength;
if (copyCommandContext.CurrentProgress is not null)
{
copyCommandContext.CurrentProgress.SetProgressSafe(currentProgress);
}
var readLength = await readerStream.ReadAsync(dataRead);
var actualDataRead = dataRead[..readLength];
if (actualDataRead.Length == 0) break;
await copyCommandContext.UpdateProgressAsync();
}
} while (dataRead.Length > 0);
await writerStream.WriteAsync(actualDataRead, cancellationToken: copyCommandContext.CancellationToken);
await writerStream.FlushAsync(copyCommandContext.CancellationToken);
currentProgress += actualDataRead.LongLength;
copyCommandContext.CurrentProgress?.SetProgressSafe(currentProgress);
await copyCommandContext.UpdateProgressAsync();
}
}
}

View File

@@ -5,44 +5,14 @@ namespace FileTime.Providers.Local;
public class LocalContentReader : IContentReader
{
private readonly FileStream _readerStream;
private readonly BinaryReader _binaryReader;
private bool _disposed;
public int PreferredBufferSize => 1024 * 1024;
public long? Position { get; private set; }
public LocalContentReader(FileStream readerStream)
{
_readerStream = readerStream;
_binaryReader = new BinaryReader(_readerStream);
}
public Task<byte[]> ReadBytesAsync(int bufferSize, int? offset = null)
{
var max = bufferSize > 0 && bufferSize < PreferredBufferSize ? bufferSize : PreferredBufferSize;
if (offset != null)
{
if (Position == null) Position = 0;
var buffer = new byte[max];
var bytesRead = _binaryReader.Read(buffer, offset.Value, max);
Position += bytesRead;
if (buffer.Length != bytesRead)
{
Array.Resize(ref buffer, bytesRead);
}
return Task.FromResult(buffer);
}
else
{
return Task.FromResult(_binaryReader.ReadBytes(max));
}
}
public void SetPosition(long position) => Position = position;
public Stream AsStream() => _binaryReader.BaseStream;
public Stream GetStream() => _readerStream;
~LocalContentReader()
{
@@ -62,7 +32,6 @@ public class LocalContentReader : IContentReader
if (disposing)
{
_readerStream.Dispose();
_binaryReader.Dispose();
}
}
_disposed = true;

View File

@@ -5,36 +5,14 @@ namespace FileTime.Providers.Local;
public class LocalContentWriter : IContentWriter
{
private readonly FileStream _writerStream;
private readonly BinaryWriter _binaryWriter;
private bool _disposed;
public int PreferredBufferSize => 1024 * 1024;
public LocalContentWriter(FileStream writerStream)
{
_writerStream = writerStream;
_binaryWriter = new BinaryWriter(_writerStream);
}
public Task WriteBytesAsync(byte[] data, int? index = null, CancellationToken cancellationToken = default)
{
if (index != null)
{
_binaryWriter.Write(data, index.Value, data.Length);
}
else
{
_binaryWriter.Write(data);
}
return Task.CompletedTask;
}
public Task FlushAsync(CancellationToken cancellationToken = default)
{
_binaryWriter.Flush();
return Task.CompletedTask;
}
public Stream AsStream() => _binaryWriter.BaseStream;
public Stream GetStream() => _writerStream;
~LocalContentWriter()
{
@@ -54,7 +32,6 @@ public class LocalContentWriter : IContentWriter
if (disposing)
{
_writerStream.Dispose();
_binaryWriter.Dispose();
}
}
_disposed = true;

View File

@@ -5,7 +5,7 @@ namespace FileTime.Providers.Remote;
public interface IRemoteContentProvider : IContentProvider
{
Task<IRemoteConnection> GetRemoteConnectionAsync();
ValueTask<IRemoteConnection> GetRemoteConnectionAsync();
string RemoteProviderName { get; }
Task InitializeChildren();
}

View File

@@ -0,0 +1,58 @@
namespace FileTime.Providers.Remote;
internal sealed class ProxyStream : Stream
{
private readonly RemoteContentWriter _remoteContentWriter;
public ProxyStream(RemoteContentWriter remoteContentWriter)
{
_remoteContentWriter = remoteContentWriter;
}
public override void Flush() => InitializeRemoteWriterAndRun(() => _remoteContentWriter.FlushAsync());
public override int Read(byte[] buffer, int offset, int count)
=> InitializeRemoteWriterAndRun(() => _remoteContentWriter.ReadAsync(buffer, offset, count));
public override long Seek(long offset, SeekOrigin origin)
=> InitializeRemoteWriterAndRun(() => _remoteContentWriter.SeekAsync(offset, origin));
public override void SetLength(long value) => InitializeRemoteWriterAndRun(() => _remoteContentWriter.SetLengthAsync(value));
public override void Write(byte[] buffer, int offset, int count) => InitializeRemoteWriterAndRun(() => _remoteContentWriter.WriteAsync(buffer, offset, count));
public override bool CanRead => InitializeRemoteWriterAndRun(() => _remoteContentWriter.CanReadAsync());
public override bool CanSeek => InitializeRemoteWriterAndRun(() => _remoteContentWriter.CanSeekAsync());
public override bool CanWrite => InitializeRemoteWriterAndRun(() => _remoteContentWriter.CanWriteAsync());
public override long Length => InitializeRemoteWriterAndRun(() => _remoteContentWriter.GetLengthAsync());
public override long Position
{
get => InitializeRemoteWriterAndRun(() => _remoteContentWriter.GetPositionAsync());
set => InitializeRemoteWriterAndRun(() => _remoteContentWriter.SetPositionAsync(value));
}
private void InitializeRemoteWriterAndRun(Func<Task> func)
{
var task = Task.Run(async () =>
{
await _remoteContentWriter.InitializeRemoteWriterAsync();
await func();
});
task.Wait();
}
private T InitializeRemoteWriterAndRun<T>(Func<Task<T>> func)
{
var task = Task.Run(async () =>
{
await _remoteContentWriter.InitializeRemoteWriterAsync();
return await func();
});
task.Wait();
return task.Result;
}
}

View File

@@ -11,7 +11,7 @@ namespace FileTime.Providers.Remote;
public sealed class RemoteContentProvider : ContentProviderBase, IRemoteContentProvider
{
private readonly IServiceProvider _serviceProvider;
private readonly Func<Task<IRemoteConnection>> _remoteConnectionProvider;
private readonly Func<ValueTask<IRemoteConnection>> _remoteConnectionProvider;
private readonly SemaphoreSlim _initializeSemaphore = new(1, 1);
private bool _initialized;
@@ -20,7 +20,7 @@ public sealed class RemoteContentProvider : ContentProviderBase, IRemoteContentP
public RemoteContentProvider(
ITimelessContentProvider timelessContentProvider,
IServiceProvider serviceProvider,
Func<Task<IRemoteConnection>> remoteConnectionProvider,
Func<ValueTask<IRemoteConnection>> remoteConnectionProvider,
string remoteName,
string name)
: base(name, timelessContentProvider)
@@ -30,8 +30,7 @@ public sealed class RemoteContentProvider : ContentProviderBase, IRemoteContentP
_remoteConnectionProvider = remoteConnectionProvider;
}
public async Task<IRemoteConnection> GetRemoteConnectionAsync()
=> await _remoteConnectionProvider();
public ValueTask<IRemoteConnection> GetRemoteConnectionAsync() => _remoteConnectionProvider();
public async Task InitializeChildren()
{

View File

@@ -30,25 +30,78 @@ public class RemoteContentWriter : IContentWriter, IInitable<IRemoteContentProvi
Task.Run(async () => await (await _remoteContentProvider.GetRemoteConnectionAsync()).CloseWriterAsync(_transactionId));
}
public int PreferredBufferSize => 10 * 1024 * 1024;
public Stream GetStream() => new ProxyStream(this);
public async Task WriteBytesAsync(byte[] data, int? index = null, CancellationToken cancellationToken = default)
{
if (!_isRemoteWriterInitialized) await InitializeRemoteWriter(_nativePath);
await (await _remoteContentProvider.GetRemoteConnectionAsync()).WriteBytesAsync(_transactionId, data, index, cancellationToken);
}
public async Task FlushAsync(CancellationToken cancellationToken = default)
{
if (!_isRemoteWriterInitialized) return;
await (await _remoteContentProvider.GetRemoteConnectionAsync()).FlushWriterAsync(_transactionId, cancellationToken);
}
public Stream AsStream() => new ContentAccessStream(this);
private async Task InitializeRemoteWriter(NativePath nativePath)
public async Task InitializeRemoteWriterAsync()
{
if (_isRemoteWriterInitialized) return;
_isRemoteWriterInitialized = true;
await (await _remoteContentProvider.GetRemoteConnectionAsync()).InitializeRemoteWriter(_remoteContentProviderId, _transactionId, nativePath);
await (await _remoteContentProvider.GetRemoteConnectionAsync()).InitializeRemoteWriter(_remoteContentProviderId, _transactionId, _nativePath);
}
public async Task FlushAsync()
{
await InitializeRemoteWriterAsync();
await (await _remoteContentProvider.GetRemoteConnectionAsync()).FlushAsync(_transactionId);
}
public async Task<int> ReadAsync(byte[] buffer, int offset, int count)
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).ReadAsync(_transactionId, buffer, offset, count);
}
public async Task<long> SeekAsync(long offset, SeekOrigin origin)
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).SeekAsync(_transactionId, offset, origin);
}
public async Task SetLengthAsync(long value)
{
await InitializeRemoteWriterAsync();
await (await _remoteContentProvider.GetRemoteConnectionAsync()).SetLengthAsync(_transactionId, value);
}
public async Task WriteAsync(byte[] buffer, int offset, int count)
{
await InitializeRemoteWriterAsync();
await (await _remoteContentProvider.GetRemoteConnectionAsync()).WriteAsync(_transactionId, buffer, offset, count);
}
public async Task<bool> CanReadAsync()
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).CanReadAsync(_transactionId);
}
public async Task<bool> CanSeekAsync()
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).CanSeekAsync(_transactionId);
}
public async Task<bool> CanWriteAsync()
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).CanWriteAsync(_transactionId);
}
public async Task<long> GetLengthAsync()
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).GetLengthAsync(_transactionId);
}
public async Task<long> GetPositionAsync()
{
await InitializeRemoteWriterAsync();
return await (await _remoteContentProvider.GetRemoteConnectionAsync()).GetPositionAsync(_transactionId);
}
public async Task SetPositionAsync(long position)
{
await InitializeRemoteWriterAsync();
await (await _remoteContentProvider.GetRemoteConnectionAsync()).SetPositionAsync(_transactionId, position);
}
}

View File

@@ -4,7 +4,7 @@ namespace FileTime.Server.Common.ContentAccess;
public interface IContentAccessManager
{
void AddContentWriter(string transactionId, IContentWriter contentWriter);
IContentWriter GetContentWriter(string transactionId);
void RemoveContentWriter(string transactionId);
void AddContentStreamContainer(string transactionId, IStreamContainer streamContainer);
Stream GetContentStream(string transactionId);
void RemoveContentStreamContainer(string transactionId);
}

View File

@@ -13,11 +13,21 @@ public interface IRemoteConnection
Task DeleteItemAsync(string contentProviderId, FullName fullName);
Task MoveItemAsync(string contentProviderId, FullName fullName, FullName newPath);
Task InitializeRemoteWriter(string contentProviderId, string transactionId, NativePath nativePath);
Task WriteBytesAsync(string transactionId, byte[] data, int? index, CancellationToken cancellationToken = default);
Task FlushWriterAsync(string transactionId, CancellationToken cancellationToken = default);
Task CloseWriterAsync(string transactionId);
Task<NativePath> GetNativePathAsync(string contentProviderId, FullName fullName);
Task FlushAsync(string transactionId);
Task<int> ReadAsync(string transactionId, byte[] buffer, int offset, int count);
Task<long> SeekAsync(string transactionId, long offset, SeekOrigin origin);
Task SetLengthAsync(string transactionId, long value);
Task WriteAsync(string transactionId, byte[] buffer, int offset, int count);
Task<bool> CanReadAsync(string transactionId);
Task<bool> CanSeekAsync(string transactionId);
Task<bool> CanWriteAsync(string transactionId);
Task<long> GetLengthAsync(string transactionId);
Task<long> GetPositionAsync(string transactionId);
Task SetPositionAsync(string transactionId, long position);
Task<ISerialized> GetItemByNativePathAsync(
string contentProviderId,
NativePath nativePath,

View File

@@ -13,15 +13,23 @@ public interface ISignalRHub
Task CreateElementAsync(string contentProviderId, string fullName);
Task DeleteItemAsync(string contentProviderId, string fullName);
Task MoveItemAsync(string contentProviderId, string fullName, string newPath);
//TODO: CancellationToken https://github.com/nenoNaninu/TypedSignalR.Client/issues/120
Task FlushWriterAsync(string transactionId);
Task InitializeRemoteWriter(string contentProviderId, string transactionId, string nativePath);
//TODO: CancellationToken https://github.com/nenoNaninu/TypedSignalR.Client/issues/120
Task WriteBytesAsync(string transactionId, string data, int index);
Task CloseWriterAsync(string transactionId);
Task<string> GetNativePathAsync(string contentProviderId, string fullNamePath);
Task FlushAsync(string transactionId);
Task<string> ReadAsync(string transactionId, int dataLength);
Task<long> SeekAsync(string transactionId, long offset, SeekOrigin origin);
Task SetLengthAsync(string transactionId, long value);
Task WriteAsync(string transactionId, string data);
Task<bool> CanReadAsync(string transactionId);
Task<bool> CanSeekAsync(string transactionId);
Task<bool> CanWriteAsync(string transactionId);
Task<long> GetLengthAsync(string transactionId);
Task<long> GetPositionAsync(string transactionId);
Task SetPositionAsync(string transactionId, long position);
Task<ISerialized> GetItemByNativePathAsync(
string contentProviderId,
NativePath nativePath,

View File

@@ -32,7 +32,7 @@ public class SignalRConnection : IRemoteConnection, IAsyncInitable<string, strin
await _client.SetClientIdentifier(providerName);
}
public static async Task<SignalRConnection> GetOrCreateForAsync(string baseUrl, string providerName)
public static async ValueTask<IRemoteConnection> GetOrCreateForAsync(string baseUrl, string providerName)
{
SignalRConnection? connection;
lock (ConnectionsLock)
@@ -70,12 +70,6 @@ public class SignalRConnection : IRemoteConnection, IAsyncInitable<string, strin
public async Task MoveItemAsync(string contentProviderId, FullName fullName, FullName newPath)
=> await _client.MoveItemAsync(contentProviderId, fullName.Path, newPath.Path);
public async Task WriteBytesAsync(string transactionId, byte[] data, int? index, CancellationToken cancellationToken = default)
=> await _client.WriteBytesAsync(transactionId, Convert.ToBase64String(data), index ?? -1);
public async Task FlushWriterAsync(string transactionId, CancellationToken cancellationToken = default)
=> await _client.FlushWriterAsync(transactionId);
public async Task InitializeRemoteWriter(string contentProviderId, string transactionId, NativePath nativePath)
=> await _client.InitializeRemoteWriter(contentProviderId, transactionId, nativePath.Path);
@@ -88,6 +82,40 @@ public class SignalRConnection : IRemoteConnection, IAsyncInitable<string, strin
return new NativePath(path);
}
public Task FlushAsync(string transactionId) => _client.FlushAsync(transactionId);
public async Task<int> ReadAsync(string transactionId, byte[] buffer, int offset, int count)
{
var dataString = await _client.ReadAsync(transactionId, count);
var data = GetDataFromString(dataString);
data.CopyTo(buffer.AsSpan(offset, data.Length));
return data.Length;
}
public Task<long> SeekAsync(string transactionId, long offset, SeekOrigin origin) => _client.SeekAsync(transactionId, offset, origin);
public Task SetLengthAsync(string transactionId, long value) => _client.SetLengthAsync(transactionId, value);
public Task WriteAsync(string transactionId, byte[] buffer, int offset, int count)
{
var data = GetStringFromData(buffer.AsSpan(offset, count));
return _client.WriteAsync(transactionId, data);
}
public Task<bool> CanReadAsync(string transactionId) => _client.CanReadAsync(transactionId);
public Task<bool> CanSeekAsync(string transactionId) => _client.CanSeekAsync(transactionId);
public Task<bool> CanWriteAsync(string transactionId) => _client.CanWriteAsync(transactionId);
public Task<long> GetLengthAsync(string transactionId) => _client.GetLengthAsync(transactionId);
public Task<long> GetPositionAsync(string transactionId) => _client.GetPositionAsync(transactionId);
public Task SetPositionAsync(string transactionId, long position) => _client.SetPositionAsync(transactionId, position);
public async Task<ISerialized> GetItemByNativePathAsync(
string contentProviderId,
NativePath nativePath,
@@ -110,4 +138,7 @@ public class SignalRConnection : IRemoteConnection, IAsyncInitable<string, strin
public async Task<SerializedAbsolutePath[]> GetChildren(string contentProviderId, string fullName)
=> await _client.GetChildren(contentProviderId, fullName);
private static byte[] GetDataFromString(string data) => Convert.FromBase64String(data);
private static string GetStringFromData(ReadOnlySpan<byte> data) => Convert.ToBase64String(data);
}

View File

@@ -4,10 +4,18 @@ namespace FileTime.Server.Common.ContentAccess;
public class ContentAccessManager : IContentAccessManager
{
private readonly Dictionary<string, IContentWriter> _contentWriters = new();
public void AddContentWriter(string transactionId, IContentWriter contentWriter)
=> _contentWriters.Add(transactionId, contentWriter);
private readonly Dictionary<string, IStreamContainer> _contentStreamContainers = new();
public IContentWriter GetContentWriter(string transactionId) => _contentWriters[transactionId];
public void RemoveContentWriter(string transactionId) => _contentWriters.Remove(transactionId);
public void AddContentStreamContainer(string transactionId, IStreamContainer streamContainer)
=> _contentStreamContainers.Add(transactionId, streamContainer);
public Stream GetContentStream(string transactionId) => _contentStreamContainers[transactionId].GetStream();
public void RemoveContentStreamContainer(string transactionId)
{
if (!_contentStreamContainers.TryGetValue(transactionId, out var streamContainer)) return;
streamContainer.Dispose();
_contentStreamContainers.Remove(transactionId);
}
}

View File

@@ -103,20 +103,13 @@ public class ConnectionHub : Hub<ISignalRClient>, ISignalRHub
throw new FileNotFoundException("Item is not an element", nativePath);
var contentWriter = await _contentAccessorFactory.GetContentWriterFactory(contentProvider).CreateContentWriterAsync(element);
_contentAccessManager.AddContentWriter(transactionId, contentWriter);
_contentAccessManager.AddContentStreamContainer(transactionId, contentWriter);
}
public async Task WriteBytesAsync(string transactionId, string data, int index)
=> await _contentAccessManager.GetContentWriter(transactionId).WriteBytesAsync(Convert.FromBase64String(data), index == -1 ? null : index);
public async Task FlushWriterAsync(string transactionId)
=> await _contentAccessManager.GetContentWriter(transactionId).FlushAsync();
public Task CloseWriterAsync(string transactionId)
{
_contentAccessManager.GetContentWriter(transactionId).Dispose();
_contentAccessManager.RemoveContentWriter(transactionId);
_contentAccessManager.GetContentStream(transactionId).Dispose();
_contentAccessManager.RemoveContentStreamContainer(transactionId);
return Task.CompletedTask;
}
@@ -167,8 +160,54 @@ public class ConnectionHub : Hub<ISignalRClient>, ISignalRHub
throw new NotSupportedException();
}
public async Task FlushAsync(string transactionId)
=> await _contentAccessManager.GetContentStream(transactionId).FlushAsync();
public async Task<string> ReadAsync(string transactionId, int dataLength)
{
// this might be stack allocated when dataLength is small
var data = new byte[dataLength];
var dataRead = await _contentAccessManager.GetContentStream(transactionId).ReadAsync(data);
return GetStringFromData(data.AsSpan()[..dataRead]);
}
public Task<long> SeekAsync(string transactionId, long offset, SeekOrigin origin)
=> Task.FromResult(_contentAccessManager.GetContentStream(transactionId).Seek(offset, origin));
public Task SetLengthAsync(string transactionId, long value)
{
_contentAccessManager.GetContentStream(transactionId).SetLength(value);
return Task.CompletedTask;
}
public async Task WriteAsync(string transactionId, string buffer)
{
var data = GetDataFromString(buffer);
await _contentAccessManager.GetContentStream(transactionId).WriteAsync(data);
}
public Task<bool> CanReadAsync(string transactionId) => Task.FromResult(_contentAccessManager.GetContentStream(transactionId).CanRead);
public Task<bool> CanSeekAsync(string transactionId) => Task.FromResult(_contentAccessManager.GetContentStream(transactionId).CanSeek);
public Task<bool> CanWriteAsync(string transactionId) => Task.FromResult(_contentAccessManager.GetContentStream(transactionId).CanWrite);
public Task<long> GetLengthAsync(string transactionId) => Task.FromResult(_contentAccessManager.GetContentStream(transactionId).Length);
public Task<long> GetPositionAsync(string transactionId) => Task.FromResult(_contentAccessManager.GetContentStream(transactionId).Position);
public Task SetPositionAsync(string transactionId, long position)
{
_contentAccessManager.GetContentStream(transactionId).Position = position;
return Task.CompletedTask;
}
private IContentProvider GetContentProvider(string contentProviderId)
=> _contentProviderRegistry
.ContentProviders
.First(p => p.Name == contentProviderId);
private static byte[] GetDataFromString(string data) => Convert.FromBase64String(data);
private static string GetStringFromData(ReadOnlySpan<byte> data) => Convert.ToBase64String(data);
}

View File

@@ -27,18 +27,11 @@ public class CompressOperation<TEntry, TVolume> : ICompressOperation
public async Task<IEnumerable<IDisposable>> CompressElement(IElement element, string key)
{
if (element.Provider.SupportsContentStreams)
{
var contentReader = await _contentAccessorFactory.GetContentReaderFactory(element.Provider).CreateContentReaderAsync(element);
var contentReader = await _contentAccessorFactory.GetContentReaderFactory(element.Provider).CreateContentReaderAsync(element);
var contentReaderStream = contentReader.GetStream();
_archive.AddEntry(key, contentReaderStream);
var contentReaderStream = contentReader.AsStream();
_archive.AddEntry(key, contentReaderStream);
return new IDisposable[] {contentReader, contentReaderStream};
}
return Enumerable.Empty<IDisposable>();
return new IDisposable[] {contentReader, contentReaderStream};
}
public void SaveTo(Stream stream)

View File

@@ -113,7 +113,7 @@ public class CompressCommand : CommandBase, IExecutableCommand, ITransportationC
await _contentAccessorFactory.GetItemCreator(resolvedParent.Provider).CreateElementAsync(resolvedParent.Provider, newItemName);
var targetElement = (IElement) await _timelessContentProvider.GetItemByFullNameAsync(newItemName, PointInTime.Present);
using var contentWriter = await _contentAccessorFactory.GetContentWriterFactory(resolvedParent.Provider).CreateContentWriterAsync(targetElement);
await using var contentWriterStream = contentWriter.AsStream();
await using var contentWriterStream = contentWriter.GetStream();
compressOperation.SaveTo(contentWriterStream);
await contentWriterStream.FlushAsync(_cancellationTokenSource.Token);

View File

@@ -29,7 +29,7 @@ public sealed class CompressedContentProvider : SubContentProviderBase, ICompres
var reader = parentElementContext.ContentReader;
var subPath = parentElementContext.SubNativePath.Path;
await using var readerStream = reader.AsStream();
await using var readerStream = reader.GetStream();
using var archive = ArchiveFactory.Open(readerStream);
var entry = archive.Entries.First(e => e.Key == subPath);

View File

@@ -8,9 +8,6 @@ public sealed class CompressedContentReader : IContentReader
private readonly IDisposable[] _disposables;
private readonly Stream _stream;
public int PreferredBufferSize => 1024 * 1024;
public long? Position => _stream.Position;
public CompressedContentReader(IArchiveEntry entry, IDisposable[] disposables)
{
_disposables = disposables;
@@ -26,15 +23,5 @@ public sealed class CompressedContentReader : IContentReader
}
}
public async Task<byte[]> ReadBytesAsync(int bufferSize, int? offset = null)
{
var data = new byte[bufferSize];
var read = await _stream.ReadAsync(data, offset ?? 0, bufferSize);
return data[..read].ToArray();
}
public void SetPosition(long position) => _stream.Seek(position, SeekOrigin.Begin);
public Stream AsStream() => _stream;
public Stream GetStream() => _stream;
}

View File

@@ -22,7 +22,7 @@ public sealed class CompressedContentReaderFactory : SubContentReaderBase<Compre
var reader = parentElementReaderContext.ContentReader;
var subPath = parentElementReaderContext.SubNativePath;
var readerStream = reader.AsStream();
var readerStream = reader.GetStream();
var archive = ArchiveFactory.Open(readerStream);
var entry = archive.Entries.First(e => e.Key == subPath.Path);

View File

@@ -41,7 +41,7 @@ public sealed class CompressedSubContentProvider : ICompressedSubContentProvider
ItemInitializationSettings itemInitializationSettings = default)
{
var parentContentReader = await _contentAccessorFactory.GetContentReaderFactory(parentElement.Provider).CreateContentReaderAsync(parentElement);
var parentContentReaderStream = parentContentReader.AsStream();
var parentContentReaderStream = parentContentReader.GetStream();
var archive = ArchiveFactory.Open(parentContentReaderStream);
var disposables = new IDisposable[] {parentContentReader, parentContentReaderStream, archive};

View File

@@ -95,7 +95,7 @@ public class DecompressCommand : CommandBase, IExecutableCommand, ITransportatio
var newItem = (IElement) await _timelessContentProvider.GetItemByFullNameAsync(entryPath, PointInTime.Present);
using var writer = await contentWriterFactory.CreateContentWriterAsync(newItem);
archiveEntry.WriteTo(writer.AsStream());
archiveEntry.WriteTo(writer.GetStream());
}
}
}
@@ -107,7 +107,7 @@ public class DecompressCommand : CommandBase, IExecutableCommand, ITransportatio
{
var targetElement = (IElement) await _timelessContentProvider.GetItemByFullNameAsync(source, PointInTime.Present);
var contentReader = await _contentAccessorFactory.GetContentReaderFactory(targetElement.Provider).CreateContentReaderAsync(targetElement);
var contentReaderStream = contentReader.AsStream();
var contentReaderStream = contentReader.GetStream();
_disposables.Add(contentReader);
using var archive = ArchiveFactory.Open(contentReaderStream);

View File

@@ -29,7 +29,7 @@ public sealed class VirtualDiskContentProvider : SubContentProviderBase, IVirtua
var reader = parentElementContext.ContentReader;
var subPath = parentElementContext.SubNativePath.Path;
await using var readerStream = reader.AsStream();
await using var readerStream = reader.GetStream();
using var discReader = new UdfReader(readerStream);
var fileInfo = discReader.GetFileInfo(subPath);

View File

@@ -6,8 +6,6 @@ public sealed class VirtualDiskContentReader : IContentReader
{
private readonly Stream _stream;
private readonly ICollection<IDisposable> _disposables;
public int PreferredBufferSize => 1024 * 1024;
public long? Position => _stream.Position;
public VirtualDiskContentReader(Stream stream, ICollection<IDisposable> disposables)
{
@@ -15,18 +13,7 @@ public sealed class VirtualDiskContentReader : IContentReader
_disposables = disposables;
}
public async Task<byte[]> ReadBytesAsync(int bufferSize, int? offset = null)
{
var data = new byte[bufferSize];
var read = await _stream.ReadAsync(data, offset ?? 0, bufferSize);
return data[..read].ToArray();
}
public void SetPosition(long position) => _stream.Seek(position, SeekOrigin.Begin);
public Stream AsStream() => _stream;
public Stream GetStream() => _stream;
public void Dispose()
{

View File

@@ -22,7 +22,7 @@ public sealed class VirtualDiskContentReaderFactory : SubContentReaderBase<Virtu
var reader = parentElementReaderContext.ContentReader;
var subPath = parentElementReaderContext.SubNativePath;
var readerStream = reader.AsStream();
var readerStream = reader.GetStream();
var discReader = new UdfReader(readerStream);
var fileInfo = discReader.GetFileInfo(subPath.Path);

View File

@@ -38,7 +38,7 @@ public sealed class VirtualDiskSubContentProvider : IVirtualDiskSubContentProvid
var contentReaderFactory = _contentAccessorFactory.GetContentReaderFactory(parentElement.Provider);
var reader = await contentReaderFactory.CreateContentReaderAsync(parentElement);
await using var readerStream = reader.AsStream();
await using var readerStream = reader.GetStream();
var discReader = new UdfReader(readerStream);
if (itemPath.Path.Length == 0 || itemPath.Path == Constants.SubContentProviderRootContainer)