Skip to content

Commit e21b1c3

Browse files
committed
Retire SemigroupVariable
1 parent 50ff048 commit e21b1c3

7 files changed

Lines changed: 24 additions & 75 deletions

File tree

differential-dataflow/examples/monoid-bfs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,11 @@ where
129129
// repeatedly update minimal distances each node can be reached from each root
130130
roots.scope().iterative::<u32,_,_>(|scope| {
131131

132-
use differential_dataflow::operators::iterate::SemigroupVariable;
132+
use differential_dataflow::operators::iterate::Variable;
133133
use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder};
134134

135135
use timely::order::Product;
136-
let (variable, collection) = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
136+
let (variable, collection) = Variable::new(scope, Product::new(Default::default(), 1));
137137

138138
let edges = edges.enter(scope);
139139
let roots = roots.enter(scope);

differential-dataflow/src/algorithms/graphs/propagate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ where
7979

8080
nodes.scope().iterative::<usize,_,_>(|scope| {
8181

82-
use crate::operators::iterate::SemigroupVariable;
82+
use crate::operators::iterate::Variable;
8383
use crate::trace::implementations::{ValBuilder, ValSpine};
8484

8585
use timely::order::Product;
8686

8787
let edges = edges.enter(scope);
8888
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
8989

90-
let (proposals_bind, proposals) = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
90+
let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize));
9191

9292
let labels =
9393
proposals

differential-dataflow/src/operators/iterate.rs

Lines changed: 6 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl<G: Scope<Timestamp: Lattice>, D: Ord+Data+Debug, R: Semigroup+'static> Iter
110110
// wrapped by `variable`, but it also results in substantially more
111111
// diffs produced; `result` is post-consolidation, and means fewer
112112
// records are yielded out of the loop.
113-
let (variable, collection) = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1));
113+
let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1));
114114
let result = logic(subgraph.clone(), collection);
115115
variable.set(result.clone());
116116
result.leave()
@@ -166,7 +166,7 @@ pub type VecVariable<G, D, R> = Variable<G, Vec<(D, <G as ScopeParent>::Timestam
166166
impl<G, C: Container> Variable<G, C>
167167
where
168168
G: Scope<Timestamp: Lattice>,
169-
C: crate::collection::containers::Negate + crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
169+
C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
170170
{
171171
/// Creates a new initially empty `Variable`.
172172
///
@@ -179,10 +179,10 @@ where
179179
}
180180

181181
/// Creates a new `Variable` from a supplied `source` stream.
182-
pub fn new_from(source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) where C: Clone {
182+
pub fn new_from(source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) where C: Clone + crate::collection::containers::Negate {
183183
let (feedback, updates) = source.inner.scope().feedback(step.clone());
184184
let collection = Collection::<G, C>::new(updates).concat(source.clone());
185-
(Variable { feedback, source: Some(source), step }, collection)
185+
(Variable { feedback, source: Some(source.negate()), step }, collection)
186186
}
187187

188188
/// Set the definition of the `Variable` to a collection.
@@ -191,61 +191,10 @@ where
191191
/// which may be recursively defined in terms of the variable itself.
192192
pub fn set(mut self, mut result: Collection<G, C>) {
193193
if let Some(source) = self.source.take() {
194-
result = result.concat(source.negate());
194+
result = result.concat(source);
195195
}
196-
self.set_concat(result)
197-
}
198-
199-
/// Set the definition of the `Variable` to a collection concatenated to `self`.
200-
///
201-
/// This method is a specialization of `set` which has the effect of concatenating
202-
/// `result` and `self` before calling `set`. This method avoids some dataflow
203-
/// complexity related to retracting the initial input, and will do less work in
204-
/// that case.
205-
///
206-
/// This behavior can also be achieved by using `new` to create an empty initial
207-
/// collection, and then using `self.set(self.concat(result))`.
208-
pub fn set_concat(self, result: Collection<G, C>) {
209-
let step = self.step;
210-
result
211-
.results_in(step)
212-
.inner
213-
.connect_loop(self.feedback);
214-
}
215-
}
216-
217-
/// A recursively defined collection that only "grows".
218-
///
219-
/// `SemigroupVariable` is a weakening of `Variable` to allow difference types
220-
/// that do not implement `Abelian` and only implement `Semigroup`. This means
221-
/// that it can be used in settings where the difference type does not support
222-
/// negation.
223-
pub struct SemigroupVariable<G, C>
224-
where
225-
G: Scope<Timestamp: Lattice>,
226-
C: Container,
227-
{
228-
feedback: Handle<G, C>,
229-
step: <G::Timestamp as Timestamp>::Summary,
230-
}
231-
232-
impl<G, C: Container> SemigroupVariable<G, C>
233-
where
234-
G: Scope<Timestamp: Lattice>,
235-
C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
236-
{
237-
/// Creates a new initially empty `SemigroupVariable`.
238-
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> (Self, Collection<G, C>) {
239-
let (feedback, updates) = scope.feedback(step.clone());
240-
let collection = Collection::<G,C>::new(updates);
241-
(SemigroupVariable { feedback, step }, collection)
242-
}
243-
244-
/// Adds a new source of data to `self`.
245-
pub fn set(self, result: Collection<G, C>) {
246-
let step = self.step;
247196
result
248-
.results_in(step)
197+
.results_in(self.step)
249198
.inner
250199
.connect_loop(self.feedback);
251200
}

experiments/src/bin/deals.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatch
1010
use differential_dataflow::operators::arrange::TraceAgent;
1111
use differential_dataflow::operators::arrange::Arranged;
1212
use differential_dataflow::operators::arrange::Arrange;
13-
use differential_dataflow::operators::iterate::SemigroupVariable;
13+
use differential_dataflow::operators::iterate::Variable;
1414
use differential_dataflow::difference::Present;
1515

1616
type EdgeArranged<G, K, V, R> = Arranged<G, TraceAgent<ValSpine<K, V, <G as ScopeParent>::Timestamp, R>>>;
@@ -88,7 +88,7 @@ fn tc<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> V
8888
// repeatedly update minimal distances each node can be reached from each root
8989
edges.stream.scope().iterative::<Iter,_,_>(|scope| {
9090

91-
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
91+
let inner = Variable::new(scope, Product::new(Default::default(), 1));
9292
let edges = edges.enter(&inner.scope());
9393

9494
let result =
@@ -115,7 +115,7 @@ fn sg<G: Scope<Timestamp=()>>(edges: &EdgeArranged<G, Node, Node, Present>) -> V
115115
// repeatedly update minimal distances each node can be reached from each root
116116
peers.scope().iterative::<Iter,_,_>(|scope| {
117117

118-
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
118+
let inner = Variable::new(scope, Product::new(Default::default(), 1));
119119
let edges = edges.enter(&inner.scope());
120120
let peers = peers.enter(&inner.scope());
121121

experiments/src/bin/graphs-static.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use timely::dataflow::operators::ToStream;
66
use differential_dataflow::input::Input;
77
use differential_dataflow::VecCollection;
88
use differential_dataflow::operators::*;
9-
use differential_dataflow::operators::iterate::SemigroupVariable;
9+
use differential_dataflow::operators::iterate::Variable;
1010
use differential_dataflow::AsCollection;
1111

1212
use graph_map::GraphMMap;
@@ -118,7 +118,7 @@ fn reach<G: Scope<Timestamp = ()>> (
118118
let graph = graph.enter(scope);
119119
let roots = roots.enter(scope);
120120

121-
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
121+
let inner = Variable::new(scope, Product::new(Default::default(), 1));
122122

123123
let result =
124124
graph.join_core(inner.arrange_by_self(), |_src,&dst,&()| Some(dst))
@@ -144,7 +144,7 @@ fn bfs<G: Scope<Timestamp = ()>> (
144144
let graph = graph.enter(scope);
145145
let roots = roots.enter(scope);
146146

147-
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
147+
let inner = Variable::new(scope, Product::new(Default::default(), 1));
148148
let result =
149149
graph.join_core(inner.arrange_by_key(), |_src,&dest,&dist| [(dest, dist+1)])
150150
.concat(roots)
@@ -176,7 +176,7 @@ fn connected_components<G: Scope<Timestamp = ()>>(
176176
let reverse = reverse.enter(scope);
177177
let nodes = nodes.enter(scope);
178178

179-
let inner = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
179+
let inner = Variable::new(scope, Product::new(Default::default(), 1));
180180

181181
let labels = inner.arrange_by_key();
182182
let f_prop = labels.join_core(forward, |_k,l,d| Some((*d,*l)));

experiments/src/bin/graspan1.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use differential_dataflow::input::Input;
99
use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine};
1010
use differential_dataflow::operators::*;
1111
use differential_dataflow::operators::arrange::Arrange;
12-
use differential_dataflow::operators::iterate::SemigroupVariable;
12+
use differential_dataflow::operators::iterate::Variable;
1313

1414
type Node = u32;
1515
type Time = ();
@@ -41,7 +41,7 @@ fn main() {
4141
let nodes = nodes.enter(inner).map(|(a,b)| (b,a));
4242
let edges = edges.enter(inner);
4343

44-
let labels = SemigroupVariable::new(inner, Product::new(Default::default(), 1));
44+
let labels = Variable::new(inner, Product::new(Default::default(), 1));
4545

4646
let next =
4747
labels.join_core(edges, |_b, a, c| Some((*c, *a)))

experiments/src/bin/graspan2.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::fs::File;
44
use timely::dataflow::Scope;
55
use timely::order::Product;
66

7-
use differential_dataflow::operators::iterate::SemigroupVariable;
7+
use differential_dataflow::operators::iterate::Variable;
88

99
use differential_dataflow::VecCollection;
1010
use differential_dataflow::input::Input;
@@ -57,8 +57,8 @@ fn unoptimized() {
5757
let assignment = assignment.enter(scope);
5858
let dereference = dereference.enter(scope);
5959

60-
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
61-
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
60+
let value_flow = Variable::new(scope, Product::new(Default::default(), 1));
61+
let memory_alias = Variable::new(scope, Product::new(Default::default(), 1));
6262

6363
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
6464
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
@@ -182,8 +182,8 @@ fn optimized() {
182182
let assignment = assignment.enter(scope);
183183
let dereference = dereference.enter(scope);
184184

185-
let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
186-
let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1));
185+
let value_flow = Variable::new(scope, Product::new(Default::default(), 1));
186+
let memory_alias = Variable::new(scope, Product::new(Default::default(), 1));
187187

188188
let value_flow_arranged = value_flow.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();
189189
let memory_alias_arranged = memory_alias.arrange::<ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>();

0 commit comments

Comments
 (0)