@@ -37,47 +37,52 @@ internal sealed class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination
3737 // joined is the final cache
3838 var joinedCache = new ChangeAwareCache < TDestination , ( TLeftKey , TRightKey ) > ( ) ;
3939
40- var leftLoader = leftCache . Connect ( ) . Select ( changes =>
41- {
42- foreach ( var change in changes . ToConcreteType ( ) )
43- {
44- var leftCurrent = change . Current ;
45- var rightLookup = rightGrouped . Lookup ( change . Key ) ;
40+ var hasInitialized = false ;
4641
47- if ( rightLookup . HasValue )
42+ var leftLoader = leftCache . Connect ( )
43+ . Select ( changes =>
44+ {
45+ foreach ( var change in changes . ToConcreteType ( ) )
4846 {
49- switch ( change . Reason )
47+ var leftCurrent = change . Current ;
48+ var rightLookup = rightGrouped . Lookup ( change . Key ) ;
49+
50+ if ( rightLookup . HasValue )
5051 {
51- case ChangeReason . Add :
52- case ChangeReason . Update :
53- foreach ( var keyvalue in rightLookup . Value . KeyValues )
54- {
55- joinedCache . AddOrUpdate ( _resultSelector ( ( change . Key , keyvalue . Key ) , leftCurrent , keyvalue . Value ) , ( change . Key , keyvalue . Key ) ) ;
56- }
57-
58- break ;
59-
60- case ChangeReason . Remove :
61- foreach ( var keyvalue in rightLookup . Value . KeyValues )
62- {
63- joinedCache . Remove ( ( change . Key , keyvalue . Key ) ) ;
64- }
65-
66- break ;
67-
68- case ChangeReason . Refresh :
69- foreach ( var key in rightLookup . Value . Keys )
70- {
71- joinedCache . Refresh ( ( change . Key , key ) ) ;
72- }
73-
74- break ;
52+ switch ( change . Reason )
53+ {
54+ case ChangeReason . Add :
55+ case ChangeReason . Update :
56+ foreach ( var keyvalue in rightLookup . Value . KeyValues )
57+ {
58+ joinedCache . AddOrUpdate ( _resultSelector ( ( change . Key , keyvalue . Key ) , leftCurrent , keyvalue . Value ) , ( change . Key , keyvalue . Key ) ) ;
59+ }
60+
61+ break ;
62+
63+ case ChangeReason . Remove :
64+ foreach ( var keyvalue in rightLookup . Value . KeyValues )
65+ {
66+ joinedCache . Remove ( ( change . Key , keyvalue . Key ) ) ;
67+ }
68+
69+ break ;
70+
71+ case ChangeReason . Refresh :
72+ foreach ( var key in rightLookup . Value . Keys )
73+ {
74+ joinedCache . Refresh ( ( change . Key , key ) ) ;
75+ }
76+
77+ break ;
78+ }
7579 }
7680 }
77- }
7881
79- return joinedCache . CaptureChanges ( ) ;
80- } ) ;
82+ return joinedCache . CaptureChanges ( ) ;
83+ } )
84+ // Don't forward initial changesets from the left side, only the right
85+ . Where ( _ => hasInitialized ) ;
8186
8287 var rightLoader = rightCache . Connect ( ) . Select ( changes =>
8388 {
@@ -119,7 +124,11 @@ internal sealed class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination
119124
120125 lock ( locker )
121126 {
122- return new CompositeDisposable ( leftLoader . Merge ( rightLoader ) . SubscribeSafe ( observer ) , leftCache , rightCache , rightShare . Connect ( ) ) ;
127+ var observerSubscription = leftLoader . Merge ( rightLoader ) . SubscribeSafe ( observer ) ;
128+
129+ hasInitialized = true ;
130+
131+ return new CompositeDisposable ( observerSubscription , leftCache , rightCache , rightShare . Connect ( ) ) ;
123132 }
124133 } ) ;
125134}
0 commit comments