Skip to content

Commit

Permalink
Update to streamstone v3 with latest Azure Data APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
kzu committed Jun 14, 2024
1 parent f515fa7 commit 1131a0f
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 50 deletions.
1 change: 0 additions & 1 deletion src/CloudActors.Package/CloudActors.Package.msbuildproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NuGetizer" Version="1.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CloudActors.CodeAnaysis\CloudActors.CodeAnaysis.csproj" />
Expand Down
5 changes: 2 additions & 3 deletions src/CloudActors.Streamstone/CloudActors.Streamstone.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Devlooped.CloudStorageAccount" Version="1.2.0" />
<PackageReference Include="Microsoft.Orleans.Runtime" Version="7.2.1" />
<PackageReference Include="NuGetizer" Version="1.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.Azure.Cosmos.Table" Version="1.0.8" />
<PackageReference Include="Streamstone" Version="2.3.1" />
<PackageReference Include="Streamstone" Version="3.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.ComponentModel;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
Expand Down
49 changes: 27 additions & 22 deletions src/CloudActors.Streamstone/StreamstoneStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Table;
using Azure;
using Azure.Data.Tables;
using Orleans;
using Orleans.Runtime;
using Orleans.Storage;
using Streamstone;
using Streamstone.Utility;

namespace Devlooped.CloudActors;

public class StreamstoneStorage : IGrainStorage
{
// We cache table names to avoid running CreateIfNotExistsAsync on each access.
readonly ConcurrentDictionary<string, Task<CloudTable>> tables = new();
readonly ConcurrentDictionary<string, Task<TableClient>> tables = new();
readonly CloudStorageAccount storage;
readonly StreamstoneOptions options;

Expand All @@ -25,7 +27,7 @@ public StreamstoneStorage(CloudStorageAccount storage, StreamstoneOptions? optio
public async Task ClearStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
{
var table = await GetTable(storage, stateName);
await table.ExecuteAsync(TableOperation.Delete(new TableEntity(table.Name, grainId.Key.ToString()!)));
await table.SubmitTransactionAsync([new TableTransactionAction(TableTransactionActionType.Delete, new TableEntity(table.Name, grainId.Key.ToString()!))]);
}

public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainState<T> grainState)
Expand All @@ -46,9 +48,8 @@ public async Task ReadStateAsync<T>(string stateName, GrainId grainId, IGrainSta
if (options.AutoSnapshot)
{
// See if we can quickly load from most recent snapshot.
var result = await table.ExecuteAsync(TableOperation.Retrieve<EventEntity>(rowId, typeof(T).FullName));
if (result.HttpStatusCode == 200 &&
result.Result is EventEntity entity &&
var result = await table.GetEntityIfExistsAsync<EventEntity>(rowId, typeof(T).FullName ?? typeof(T).Name);
if (result.HasValue && result.Value is EventEntity entity &&
typeof(T).Assembly.GetName() is { } asm &&
// We only apply snapshots where major.minor matches the current version, otherwise,
// we might be losing important business logic changes.
Expand Down Expand Up @@ -76,16 +77,16 @@ entity.Data is string data &&
}
else
{
var result = await table.ExecuteAsync(TableOperation.Retrieve<EventEntity>(table.Name, rowId));
if (result.HttpStatusCode == 404 ||
result.Result is not EventEntity entity ||
var result = await table.GetEntityIfExistsAsync<EventEntity>(table.Name, rowId);
if (!result.HasValue ||
result.Value is not EventEntity entity ||
entity.Data is not string data ||
// TODO: how to deal with versioning in this case?
JsonSerializer.Deserialize<T>(data, options.JsonOptions) is not { } instance)
return;

grainState.State = instance;
grainState.ETag = result.Etag;
grainState.ETag = entity.ETag.ToString("G");
grainState.RecordExists = true;
}
}
Expand All @@ -112,18 +113,17 @@ public async Task WriteStateAsync<T>(string stateName, GrainId grainId, IGrainSt
try
{
var includes = options.AutoSnapshot ?
new ITableEntity[]
{
[
new EventEntity
{
PartitionKey = table.Name,
RowKey = typeof(T).FullName,
RowKey = typeof(T).FullName ?? typeof(T).Name,
Data = JsonSerializer.Serialize(grainState.State, options.JsonOptions),
DataVersion = new Version(asm.Version?.Major ?? 0, asm.Version?.Minor ?? 0).ToString(),
Type = $"{type.FullName}, {asm.Name}",
Version = stream.Version + state.Events.Count
}
} : Array.Empty<ITableEntity>();
] : Array.Empty<ITableEntity>();

await Stream.WriteAsync(partition,
int.TryParse(grainState.ETag, out var version) ? version : 0,
Expand All @@ -141,36 +141,41 @@ await Stream.WriteAsync(partition,
}
else
{
var result = await table.ExecuteAsync(TableOperation.InsertOrReplace(new EventEntity
var result = await table.SubmitTransactionAsync([new TableTransactionAction(TableTransactionActionType.UpsertReplace, new EventEntity
{
PartitionKey = table.Name,
RowKey = rowId,
ETag = grainState.ETag,
RowKey = rowId!,
ETag = new ETag(grainState.ETag),
Data = JsonSerializer.Serialize(grainState.State, options.JsonOptions),
DataVersion = new Version(asm.Version?.Major ?? 0, asm.Version?.Minor ?? 0).ToString(),
Type = $"{type.FullName}, {asm.Name}",
}));
})]);

grainState.ETag = result.Etag;
grainState.ETag = result.Value[0].Headers.ETag?.ToString();
grainState.RecordExists = true;
}
}

async Task<CloudTable> GetTable(CloudStorageAccount storage, string name)
async Task<TableClient> GetTable(CloudStorageAccount storage, string name)
{
var getTable = tables.GetOrAdd(name, async key =>
{
var client = storage.CreateCloudTableClient();
var table = client.GetTableReference(key);
var table = client.GetTableClient(key);
await table.CreateIfNotExistsAsync();
return table;
});

return await getTable;
}

class EventEntity : TableEntity
class EventEntity : ITableEntity
{
public string PartitionKey { get; set; } = "";
public string RowKey { get; set; } = "";
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }

public string? Data { get; set; }
public string? DataVersion { get; set; }
public string? Type { get; set; }
Expand Down
1 change: 0 additions & 1 deletion src/CloudActors/CloudActors.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

<ItemGroup>
<PackageReference Include="NuGetizer" Version="1.2.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="Microsoft.Orleans.Serialization.Abstractions" Version="7.2.1" />
<PackageReference Include="PolySharp" PrivateAssets="All" Version="1.13.2" />
</ItemGroup>
Expand Down
9 changes: 3 additions & 6 deletions src/Tests/Account.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.AspNetCore.Builder;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
Expand All @@ -32,14 +32,11 @@ public class TestAccounts : IAsyncDisposable
{
public TestAccounts() => CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference(nameof(Account))
.DeleteIfExistsAsync()
.Wait();
.DeleteTable(nameof(Account));

public async ValueTask DisposeAsync() => await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference(nameof(Account))
.DeleteIfExistsAsync();
.DeleteTableAsync(nameof(Account));

[Fact]
public async Task HostedGrain()
Expand Down
5 changes: 2 additions & 3 deletions src/Tests/Customer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.DependencyInjection;
using Orleans;
using Xunit.Abstractions;
Expand All @@ -19,8 +19,7 @@ public async Task HostedGrain()
{
await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference("customer")
.DeleteIfExistsAsync();
.DeleteTableAsync("customer");

var bus = fixture.Cluster.ServiceProvider.GetRequiredService<IActorBus>();

Expand Down
9 changes: 1 addition & 8 deletions src/Tests/OrleansTest.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Orleans;
using Orleans.Core;
using Orleans.Hosting;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Storage;
using Orleans.TestingHost;
Expand Down
8 changes: 3 additions & 5 deletions src/Tests/StreamstoneTests.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Devlooped;
using Devlooped.CloudActors;
using Microsoft.Azure.Cosmos.Table;
using Moq;
using Newtonsoft.Json;
using Orleans;
Expand All @@ -18,8 +18,7 @@ public async Task ReadWrite()
{
await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference(nameof(Account))
.DeleteIfExistsAsync();
.DeleteTableAsync(nameof(Account));

var account = new Account("1");
account.Deposit(new Deposit(100));
Expand All @@ -42,8 +41,7 @@ public async Task ReadWriteComplexObject()
{
await CloudStorageAccount.DevelopmentStorageAccount
.CreateCloudTableClient()
.GetTableReference("CloudActorWallet")
.DeleteIfExistsAsync();
.DeleteTableAsync("CloudActorWallet");

var wallet = new Wallet("1");
wallet.AddFunds("USD", 100);
Expand Down

0 comments on commit 1131a0f

Please sign in to comment.