Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
element.getOpenTelemetryContext()));
element.getOpenTelemetryContext(),
element.getValueKind()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
element.getOpenTelemetryContext()));
element.getOpenTelemetryContext(),
element.getValueKind()));
}

@Override
Expand All @@ -476,7 +477,8 @@ public <T> void outputWindowedValue(
element.getRecordId(),
element.getRecordOffset(),
element.causedByDrain(),
element.getOpenTelemetryContext()));
element.getOpenTelemetryContext(),
element.getValueKind()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ public String getErrorContext() {
read.getRecordId(),
read.getRecordOffset(),
CausedByDrain.CAUSED_BY_DRAIN,
read.getOpenTelemetryContext());
read.getOpenTelemetryContext(),
read.getValueKind());
}
elementAndRestriction = KV.of(read, restrictionState.read());
watermarkEstimatorStateT = watermarkEstimatorState.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
Expand Down Expand Up @@ -1415,6 +1416,11 @@ public PaneInfo getPaneInfo() {
return null;
}

@Override
public ValueKind getValueKind() {
return ValueKind.INSERT;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.ValueKindUtil;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
Expand Down Expand Up @@ -139,13 +141,15 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
* drain happened upstream
*/
CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
drainingValueFromUpstream =
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}
if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
Expand All @@ -164,7 +168,8 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
null,
null,
drainingValueFromUpstream,
null);
null,
valueKind);
} else {
notifyElementRead(data.available() + metadata.available());
// todo #37030 parse context from previous stage
Expand All @@ -176,7 +181,8 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
null,
null,
drainingValueFromUpstream,
null);
null,
valueKind);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.ValueKindUtil;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate;
Expand Down Expand Up @@ -148,18 +150,28 @@ public Iterable<TimerData> timersIterable() {
* drain happened upstream
*/
CausedByDrain drainingValueFromUpstream = CausedByDrain.NORMAL;
ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
drainingValueFromUpstream =
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
return WindowedValues.of(
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream, null);
value,
timestamp,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream,
null,
valueKind);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cover with test runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see testDrainPropagated as example
Also noticed a bug , it's missing

@After
  public void tearDown() {
    WindowedValues.WindowedValueCoder.setMetadataNotSupported();
  }

Copy link
Copy Markdown
Contributor Author

@ahmedabu98 ahmedabu98 May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test and ended both tests with setMetadataNotSupported()

} catch (RuntimeException | IOException e) {
if (!skipUndecodableElements) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -71,6 +72,11 @@ public CausedByDrain causedByDrain() {
return null;
}

@Override
public ValueKind getValueKind() {
return ValueKind.INSERT;
}

@Override
public Iterable<WindowedValue<T>> explodeWindows() {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -374,6 +375,48 @@ public void testDrainPropagated() throws Exception {
assertThat(
keyedWorkItem.timersIterable(),
Matchers.contains(makeDrainingTimer(STATE_NAMESPACE_2, 3, TimeDomain.EVENT_TIME)));
WindowedValues.WindowedValueCoder.setMetadataNotSupported();
}

@Test
public void testValueKindPropagated() throws Exception {
WindowedValues.WindowedValueCoder.setMetadataSupported();
Windmill.WorkItem.Builder workItem =
Windmill.WorkItem.newBuilder().setKey(SERIALIZED_KEY).setWorkToken(17);
Windmill.InputMessageBundle.Builder chunk1 = workItem.addMessageBundlesBuilder();
chunk1.setSourceComputationId("computation");
addElementWithMetadata(
chunk1,
5,
"hello",
WINDOW_1,
paneInfo(0),
BeamFnApi.Elements.ElementMetadata.newBuilder()
.setValueKind(BeamFnApi.Elements.ValueKind.Enum.UPDATE_AFTER)
.build());
addElementWithMetadata(
chunk1,
7,
"world",
WINDOW_2,
paneInfo(2),
BeamFnApi.Elements.ElementMetadata.newBuilder()
.setValueKind(BeamFnApi.Elements.ValueKind.Enum.DELETE)
.build());
KeyedWorkItem<String, String> keyedWorkItem =
new WindmillKeyedWorkItem<>(
KEY,
workItem.build(),
WINDOW_CODER,
WINDOWS_CODER,
VALUE_CODER,
windmillTagEncoding,
true);

Iterator<WindowedValue<String>> iterator = keyedWorkItem.elementsIterable().iterator();
Assert.assertEquals(ValueKind.UPDATE_AFTER, iterator.next().getValueKind());
Assert.assertEquals(ValueKind.DELETE, iterator.next().getValueKind());
WindowedValues.WindowedValueCoder.setMetadataNotSupported();
}

private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.ValueKind;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -127,6 +128,11 @@ public CausedByDrain causedByDrain() {
return CausedByDrain.NORMAL;
}

@Override
public ValueKind getValueKind() {
return ValueKind.INSERT;
}

@Override
public @Nullable Long getRecordOffset() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {

OutputBuilder<T> setOpenTelemetryContext(@Nullable Context openTelemetryContext);

OutputBuilder<T> setValueKind(ValueKind valueKind);

void output();
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public T getValue() {

public abstract @Nullable Context getOpenTelemetryContext();

public abstract ValueKind getValueKind();

// todo #33176 specify additional metadata in the future
public static <T> ValueInSingleWindow<T> of(
T value,
Expand All @@ -81,6 +83,28 @@ public static <T> ValueInSingleWindow<T> of(
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
@Nullable Context openTelemetryContext) {
return of(
value,
timestamp,
window,
paneInfo,
currentRecordId,
currentRecordOffset,
causedByDrain,
openTelemetryContext,
ValueKind.INSERT);
}

public static <T> ValueInSingleWindow<T> of(
T value,
Instant timestamp,
BoundedWindow window,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain,
@Nullable Context openTelemetryContext,
ValueKind valueKind) {
return new AutoValue_ValueInSingleWindow<>(
value,
timestamp,
Expand All @@ -89,12 +113,22 @@ public static <T> ValueInSingleWindow<T> of(
currentRecordId,
currentRecordOffset,
causedByDrain,
openTelemetryContext);
openTelemetryContext,
valueKind);
}

public static <T> ValueInSingleWindow<T> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL, null);
return of(
value,
timestamp,
window,
paneInfo,
null,
null,
CausedByDrain.NORMAL,
null,
ValueKind.INSERT);
}

/** A coder for {@link ValueInSingleWindow}. */
Expand Down Expand Up @@ -144,9 +178,9 @@ public void encode(
io.opentelemetry.context.Context openTelemetryContext =
windowedElem.getOpenTelemetryContext();
if (openTelemetryContext != null) {

OpenTelemetryContextPropagator.set(openTelemetryContext, builder);
}
builder.setValueKind(ValueKindUtil.toProto(windowedElem.getValueKind()));
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
}
Expand All @@ -168,6 +202,7 @@ public ValueInSingleWindow<T> decode(
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
CausedByDrain causedByDrain = CausedByDrain.NORMAL;
io.opentelemetry.context.@Nullable Context openTelemetryContext = null;
ValueKind valueKind = ValueKind.INSERT;
if (WindowedValues.WindowedValueCoder.isMetadataSupported() && paneInfo.isElementMetadata()) {
BeamFnApi.Elements.ElementMetadata elementMetadata =
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
Expand All @@ -176,12 +211,21 @@ public ValueInSingleWindow<T> decode(
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
openTelemetryContext = OpenTelemetryContextPropagator.read(elementMetadata);
valueKind = ValueKindUtil.fromProto(elementMetadata.getValueKind());
}

T value = valueCoder.decode(inStream, context);
// todo #33176 specify additional metadata in the future
return new AutoValue_ValueInSingleWindow<>(
value, timestamp, window, paneInfo, null, null, causedByDrain, openTelemetryContext);
value,
timestamp,
window,
paneInfo,
null,
null,
causedByDrain,
openTelemetryContext,
valueKind);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;

/** The type of change operation represented by a Change Data Capture (CDC) record. */
public enum ValueKind {
/** Indicates a new record was created in the source system. */
INSERT,

/**
* Indicates the state of a record immediately <b>before</b> an update occurred. This is typically
* used to identify the previous values of modified columns or to locate the record via its
* primary key.
*/
UPDATE_BEFORE,

/**
* Indicates the state of a record immediately <b>after</b> an update occurred. Represents the
* current, valid state of the record following the change.
*/
UPDATE_AFTER,

/** Indicates that an existing record was removed from the source system. */
DELETE
}
Loading
Loading