Skip to content
41 changes: 41 additions & 0 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,47 @@ public void MergeManyChangeSetsWorksCorrectlyWithValueTypes()
results.Summary.Overall.Removes.Should().Be(PricesPerMarket);
}


[Theory]
[InlineData(true)]
[InlineData(false)]
public void OrderOfChangesIsPreserved(bool removeFirst)
{
// Arrange
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
AddUniquePrices(markets);
_marketCache.AddOrUpdate(markets);
var markets2 = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
AddUniquePrices(markets2);
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
(var firstReason, var nextReason, int expectedChanges) = removeFirst
? (ChangeReason.Remove, ChangeReason.Add, 2 * MarketCount * PricesPerMarket)
: (ChangeReason.Add, ChangeReason.Remove, 3 * MarketCount * PricesPerMarket);

// Act
_marketCache.Edit(updater =>
{
if (removeFirst)
{
updater.Clear();
updater.AddOrUpdate(markets2);
}
else
{

updater.AddOrUpdate(markets2);
updater.Clear();
}
});

// Assert
results.Messages.Count.Should().Be(2);
results.Messages[0].All(change => change.Reason is ChangeReason.Add).Should().BeTrue();
results.Messages[1].Count.Should().Be(expectedChanges);
results.Messages[1].Take(MarketCount * PricesPerMarket).All(change => change.Reason == firstReason).Should().BeTrue();
results.Messages[1].Skip(MarketCount * PricesPerMarket).All(change => change.Reason == nextReason).Should().BeTrue();
}

public void Dispose()
{
_marketCacheResults.Dispose();
Expand Down
2 changes: 1 addition & 1 deletion src/DynamicData/Cache/Internal/ChangeSetCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal sealed class ChangeSetCache<TObject, TKey>
where TKey : notnull
{
public ChangeSetCache(IObservable<IChangeSet<TObject, TKey>> source) =>
Source = source.IgnoreSameReferenceUpdate().Do(Cache.Clone);
Source = source.Do(Cache.Clone);

public Cache<TObject, TKey> Cache { get; } = new();

Expand Down
57 changes: 32 additions & 25 deletions src/DynamicData/Cache/Internal/DynamicGrouper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,44 @@ public void ProcessChangeSet(IChangeSet<TObject, TKey> changeSet, IObserver<IGro

foreach (var change in changeSet.ToConcreteType())
{
switch (change.Reason)
{
case ChangeReason.Add when _groupSelector is not null:
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
break;
ProcessChange(change, suspendTracker);
}

if (observer != null)
{
EmitChanges(observer);
}
}

case ChangeReason.Remove:
PerformRemove(change.Key, suspendTracker);
break;
public void ProcessChange(Change<TObject, TKey> change) => ProcessChange(change, _suspendTracker);

case ChangeReason.Update when _groupSelector is not null:
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
break;
private void ProcessChange(Change<TObject, TKey> change, SuspendTracker? suspendTracker)
{
switch (change.Reason)
{
case ChangeReason.Add when _groupSelector is not null:
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
break;

case ChangeReason.Update:
PerformUpdate(change.Key, suspendTracker);
break;
case ChangeReason.Remove:
PerformRemove(change.Key, suspendTracker);
break;

case ChangeReason.Refresh when _groupSelector is not null:
PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
break;
case ChangeReason.Update when _groupSelector is not null:
PerformAddOrUpdate(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
break;

case ChangeReason.Refresh:
PerformRefresh(change.Key, suspendTracker);
break;
}
}
case ChangeReason.Update:
PerformUpdate(change.Key, suspendTracker);
break;

if (observer != null)
{
EmitChanges(observer);
case ChangeReason.Refresh when _groupSelector is not null:
PerformRefresh(change.Key, _groupSelector(change.Current, change.Key), change.Current, suspendTracker);
break;

case ChangeReason.Refresh:
PerformRefresh(change.Key, suspendTracker);
break;
}
}

Expand Down
94 changes: 51 additions & 43 deletions src/DynamicData/Cache/Internal/GroupOnObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

Expand All @@ -13,49 +12,58 @@ internal sealed class GroupOnObservable<TObject, TKey, TGroupKey>(IObservable<IC
where TKey : notnull
where TGroupKey : notnull
{
public IObservable<IGroupChangeSet<TObject, TKey, TGroupKey>> Run() => Observable.Create<IGroupChangeSet<TObject, TKey, TGroupKey>>(observer =>
public IObservable<IGroupChangeSet<TObject, TKey, TGroupKey>> Run() =>
Observable.Create<IGroupChangeSet<TObject, TKey, TGroupKey>>(observer => new Subscription(source, selectGroup, observer));

// Maintains state for a single subscription
private sealed class Subscription : CacheParentSubscription<TObject, TKey, (TGroupKey, TObject), IGroupChangeSet<TObject, TKey, TGroupKey>>
{
var grouper = new DynamicGrouper<TObject, TKey, TGroupKey>();
var locker = InternalEx.NewLock();
var parentUpdate = false;

IObservable<TGroupKey> CreateGroupObservable(TObject item, TKey key) =>
selectGroup(item, key)
.DistinctUntilChanged()
.Synchronize(locker!)
.Do(
onNext: groupKey => grouper!.AddOrUpdate(key, groupKey, item, !parentUpdate ? observer : null),
onError: observer.OnError);

// Create a shared connection to the source
var shared = source
.Synchronize(locker)
.Do(_ => parentUpdate = true)
.Publish();

// First process the changesets
var subChanges = shared
.SubscribeSafe(
onNext: changeSet => grouper.ProcessChangeSet(changeSet),
onError: observer.OnError);

// Next process the Grouping observables created for each item
var subMergeMany = shared
.MergeMany(CreateGroupObservable)
.SubscribeSafe(
onError: observer.OnError,
onCompleted: observer.OnCompleted);

// Finally, emit the results
var subResults = shared
.SubscribeSafe(
onNext: _ =>
private readonly DynamicGrouper<TObject, TKey, TGroupKey> _grouper = new();
private readonly Func<TObject, TKey, IObservable<TGroupKey>> _selectGroup;

public Subscription(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<TGroupKey>> selectGroup, IObserver<IGroupChangeSet<TObject, TKey, TGroupKey>> observer)
: base(observer)
{
_selectGroup = selectGroup;
CreateParentSubscription(source);
}

protected override void ParentOnNext(IChangeSet<TObject, TKey> changes)
{
// Process all the changes at once to preserve the changeset order
foreach (var change in changes.ToConcreteType())
{
_grouper.ProcessChange(change);

switch (change.Reason)
{
grouper.EmitChanges(observer);
parentUpdate = false;
},
onError: observer.OnError);
// Shutdown existing sub (if any) and create a new one that
// Will update the group key for the current item
case ChangeReason.Add or ChangeReason.Update:
AddGroupSubscription(change.Current, change.Key);
break;

// Shutdown the existing subscription
case ChangeReason.Remove:
RemoveChildSubscription(change.Key);
break;
}
}
}

protected override void ChildOnNext((TGroupKey, TObject) tuple, TKey parentKey) =>
_grouper.AddOrUpdate(parentKey, tuple.Item1, tuple.Item2);

protected override void EmitChanges(IObserver<IGroupChangeSet<TObject, TKey, TGroupKey>> observer) =>
_grouper.EmitChanges(observer);

protected override void Dispose(bool disposing)
{
_grouper.Dispose();
base.Dispose(disposing);
}

return new CompositeDisposable(shared.Connect(), subMergeMany, subChanges, grouper);
});
private void AddGroupSubscription(TObject obj, TKey key) =>
AddChildSubscription(MakeChildObservable(_selectGroup(obj, key).DistinctUntilChanged().Select(groupKey => (groupKey, obj))), key);
}
}
4 changes: 2 additions & 2 deletions src/DynamicData/Cache/Internal/MergeChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChange
// Can optimize for the Add case because that's the only one that applies
#if NET9_0_OR_GREATER
private static Change<ChangeSetCache<TObject, TKey>, int> CreateChange(IObservable<IChangeSet<TObject, TKey>> source, int index, Lock locker) =>
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.Synchronize(locker)));
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.IgnoreSameReferenceUpdate().Synchronize(locker)));

// Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, Lock locker) =>
source.Select((src, index) => new ChangeSet<ChangeSetCache<TObject, TKey>, int>(new[] { CreateChange(src, index, locker) }));
#else
private static Change<ChangeSetCache<TObject, TKey>, int> CreateChange(IObservable<IChangeSet<TObject, TKey>> source, int index, object locker) =>
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.Synchronize(locker)));
new(ChangeReason.Add, index, new ChangeSetCache<TObject, TKey>(source.IgnoreSameReferenceUpdate().Synchronize(locker)));

// Create a ChangeSet Observable that produces ChangeSets with a single Add event for each new sub-observable
private static IObservable<IChangeSet<ChangeSetCache<TObject, TKey>, int>> CreateContainerObservable(IObservable<IObservable<IChangeSet<TObject, TKey>>> source, object locker) =>
Expand Down
98 changes: 56 additions & 42 deletions src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;

Expand All @@ -11,53 +10,68 @@ namespace DynamicData.Cache.Internal;
/// <summary>
/// Operator that is similiar to MergeMany but intelligently handles Cache ChangeSets.
/// </summary>
internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDestinationKey>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> selector, IEqualityComparer<TDestination>? equalityComparer, IComparer<TDestination>? comparer)
internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDestinationKey>(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> changeSetSelector, IEqualityComparer<TDestination>? equalityComparer, IComparer<TDestination>? comparer)
where TObject : notnull
where TKey : notnull
where TDestination : notnull
where TDestinationKey : notnull
{
public IObservable<IChangeSet<TDestination, TDestinationKey>> Run() => Observable.Create<IChangeSet<TDestination, TDestinationKey>>(
observer =>
observer => new Subscription(source, changeSetSelector, observer, equalityComparer, comparer));

// Maintains state for a single subscription
private sealed class Subscription : CacheParentSubscription<ChangeSetCache<TDestination, TDestinationKey>, TKey, IChangeSet<TDestination, TDestinationKey>, IChangeSet<TDestination, TDestinationKey>>
{
private readonly Cache<ChangeSetCache<TDestination, TDestinationKey>, TKey> _cache = new();
private readonly ChangeSetMergeTracker<TDestination, TDestinationKey> _changeSetMergeTracker;

public Subscription(
IObservable<IChangeSet<TObject, TKey>> source,
Func<TObject, TKey, IObservable<IChangeSet<TDestination, TDestinationKey>>> changeSetSelector,
IObserver<IChangeSet<TDestination, TDestinationKey>> observer,
IEqualityComparer<TDestination>? equalityComparer,
IComparer<TDestination>? comparer)
: base(observer)
{
_changeSetMergeTracker = new(() => _cache.Items, comparer, equalityComparer);

// Child Observable has to go into the ChangeSetCache so the locking protects it
CreateParentSubscription(source.Transform((obj, key) =>
new ChangeSetCache<TDestination, TDestinationKey>(MakeChildObservable(changeSetSelector(obj, key).IgnoreSameReferenceUpdate()))));
}

protected override void ParentOnNext(IChangeSet<ChangeSetCache<TDestination, TDestinationKey>, TKey> changes)
{
var locker = InternalEx.NewLock();
var cache = new Cache<ChangeSetCache<TDestination, TDestinationKey>, TKey>();
var parentUpdate = false;

// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => cache.Items, comparer, equalityComparer);

// Transform to a cache changeset of child caches, synchronize, update the local copy, and publish.
var shared = source
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key).Synchronize(locker)))
.Synchronize(locker)
.Do(changes =>
// Process all the changes at once to preserve the changeset order
foreach (var change in changes.ToConcreteType())
{
switch (change.Reason)
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();

// Merge the child changeset changes together and apply to the tracker
var subMergeMany = shared
.MergeMany(cacheChangeSet => cacheChangeSet.Source)
.SubscribeSafe(
changes => changeTracker.ProcessChangeSet(changes, !parentUpdate ? observer : null),
observer.OnError,
observer.OnCompleted);

// When a source item is removed, all of its sub-items need to be removed
var subRemove = shared
.OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues))
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);

return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
// Shutdown existing sub (if any) and create a new one that
// Will update the cache and emit the changes
case ChangeReason.Add or ChangeReason.Update:
_cache.AddOrUpdate(change.Current, change.Key);
AddChildSubscription(change.Current.Source, change.Key);
if (change.Previous.HasValue)
{
_changeSetMergeTracker.RemoveItems(change.Previous.Value.Cache.KeyValues);
}
break;

// Shutdown the existing subscription and remove from the cache
case ChangeReason.Remove:
_cache.Remove(change.Key);
RemoveChildSubscription(change.Key);
_changeSetMergeTracker.RemoveItems(change.Current.Cache.KeyValues);
break;
}
}
}

protected override void ChildOnNext(IChangeSet<TDestination, TDestinationKey> changes, TKey parentKey) =>
_changeSetMergeTracker.ProcessChangeSet(changes, null);

protected override void EmitChanges(IObserver<IChangeSet<TDestination, TDestinationKey>> observer) =>
_changeSetMergeTracker.EmitChanges(observer);
}
}
Loading
Loading