Skip to content

Commit ff91f78

Browse files
committed
fix(subscription): remove database semantics from payload api
1 parent 893e2ea commit ff91f78

File tree

4 files changed

+13
-49
lines changed

4 files changed

+13
-49
lines changed

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ private Optional<SubscriptionMessage> pollFile(
724724
// construct temporary message to nack
725725
nack(
726726
Collections.singletonList(
727-
new SubscriptionMessage(commitContext, file.getAbsolutePath(), null)));
727+
new SubscriptionMessage(commitContext, file.getAbsolutePath())));
728728
throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
729729
}
730730
}
@@ -874,8 +874,7 @@ private Optional<SubscriptionMessage> pollFileInternal(
874874

875875
// generate subscription message
876876
inFlightFilesCommitContextSet.remove(commitContext);
877-
return Optional.of(
878-
new SubscriptionMessage(commitContext, file.getAbsolutePath(), null));
877+
return Optional.of(new SubscriptionMessage(commitContext, file.getAbsolutePath()));
879878
}
880879
case ERROR:
881880
{
@@ -925,7 +924,7 @@ private Optional<SubscriptionMessage> pollTablets(
925924
// construct temporary message to nack
926925
nack(
927926
Collections.singletonList(
928-
new SubscriptionMessage(response.getCommitContext(), Collections.emptyMap())));
927+
new SubscriptionMessage(response.getCommitContext(), Collections.emptyList())));
929928
throw new SubscriptionRuntimeNonCriticalException(e.getMessage(), e);
930929
}
931930
}
@@ -946,8 +945,7 @@ private Optional<SubscriptionMessage> pollTabletsInternal(
946945
LOGGER.warn(errorMessage);
947946
throw new SubscriptionRuntimeNonCriticalException(errorMessage);
948947
}
949-
return Optional.of(
950-
new SubscriptionMessage(commitContext, Collections.singletonMap(null, tablets)));
948+
return Optional.of(new SubscriptionMessage(commitContext, tablets));
951949
}
952950

953951
timer.update();

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionMessage.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@
2222
import org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandlerException;
2323
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
2424

25-
import org.apache.thrift.annotation.Nullable;
2625
import org.apache.tsfile.write.record.Tablet;
2726

2827
import java.util.Iterator;
2928
import java.util.List;
30-
import java.util.Map;
3129
import java.util.Objects;
3230

3331
public class SubscriptionMessage implements Comparable<SubscriptionMessage> {
@@ -39,19 +37,17 @@ public class SubscriptionMessage implements Comparable<SubscriptionMessage> {
3937
private final SubscriptionMessageHandler handler;
4038

4139
public SubscriptionMessage(
42-
final SubscriptionCommitContext commitContext, final Map<String, List<Tablet>> tablets) {
40+
final SubscriptionCommitContext commitContext, final List<Tablet> tablets) {
4341
this.commitContext = commitContext;
4442
this.messageType = SubscriptionMessageType.RECORD_HANDLER.getType();
4543
this.handler = new SubscriptionRecordHandler(tablets);
4644
}
4745

4846
public SubscriptionMessage(
49-
final SubscriptionCommitContext commitContext,
50-
final String absolutePath,
51-
@Nullable final String databaseName) {
47+
final SubscriptionCommitContext commitContext, final String absolutePath) {
5248
this.commitContext = commitContext;
5349
this.messageType = SubscriptionMessageType.TS_FILE.getType();
54-
this.handler = new SubscriptionTsFileHandler(absolutePath, databaseName);
50+
this.handler = new SubscriptionTsFileHandler(absolutePath);
5551
}
5652

5753
public SubscriptionCommitContext getCommitContext() {

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionRecordHandler.java

Lines changed: 5 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.isession.ISessionDataSet;
2323

24-
import org.apache.thrift.annotation.Nullable;
2524
import org.apache.tsfile.enums.TSDataType;
2625
import org.apache.tsfile.read.common.Field;
2726
import org.apache.tsfile.read.common.RowRecord;
@@ -36,7 +35,6 @@
3635
import java.util.Collections;
3736
import java.util.Iterator;
3837
import java.util.List;
39-
import java.util.Map;
4038
import java.util.Objects;
4139
import java.util.stream.Collectors;
4240

@@ -46,20 +44,13 @@ public class SubscriptionRecordHandler
4644

4745
private final List<SubscriptionResultSet> resultSets;
4846

49-
public SubscriptionRecordHandler(final Map<String, List<Tablet>> tablets) {
47+
public SubscriptionRecordHandler(final List<Tablet> tablets) {
5048
final List<SubscriptionResultSet> resultSets = new ArrayList<>();
51-
for (final Map.Entry<String, List<Tablet>> entry : tablets.entrySet()) {
52-
final String databaseName = entry.getKey();
53-
final List<Tablet> tabletList = entry.getValue();
54-
if (Objects.isNull(tabletList)) {
49+
for (final Tablet tablet : tablets) {
50+
if (Objects.isNull(tablet)) {
5551
continue;
5652
}
57-
for (final Tablet tablet : tabletList) {
58-
if (Objects.isNull(tablet)) {
59-
continue;
60-
}
61-
resultSets.add(new SubscriptionResultSet(tablet, databaseName));
62-
}
53+
resultSets.add(new SubscriptionResultSet(tablet));
6354
}
6455
this.resultSets = Collections.unmodifiableList(resultSets);
6556
}
@@ -77,8 +68,6 @@ public static class SubscriptionResultSet implements ISessionDataSet {
7768

7869
private Tablet tablet;
7970

80-
@Nullable private final String databaseName;
81-
8271
private final List<RowPosition> sortedRowPositions;
8372

8473
private int rowIndex = -1;
@@ -87,20 +76,15 @@ public static class SubscriptionResultSet implements ISessionDataSet {
8776

8877
private List<String> columnTypeList;
8978

90-
private SubscriptionResultSet(final Tablet tablet, @Nullable final String databaseName) {
79+
private SubscriptionResultSet(final Tablet tablet) {
9180
this.tablet = tablet;
92-
this.databaseName = databaseName;
9381
this.sortedRowPositions = generateSortedRowPositions(tablet);
9482
}
9583

9684
public Tablet getTablet() {
9785
return tablet;
9886
}
9987

100-
public String getDatabaseName() {
101-
return databaseName;
102-
}
103-
10488
@Override
10589
public List<String> getColumnNames() {
10690
if (Objects.nonNull(columnNameList)) {
@@ -111,12 +95,6 @@ public List<String> getColumnNames() {
11195
columnNameList.add("Time");
11296

11397
final List<MeasurementSchema> schemas = tablet.getSchemas();
114-
if (Objects.nonNull(databaseName)) {
115-
columnNameList.addAll(
116-
schemas.stream().map(MeasurementSchema::getMeasurementId).collect(Collectors.toList()));
117-
return columnNameList;
118-
}
119-
12098
final String deviceId = tablet.deviceId;
12199
columnNameList.addAll(
122100
schemas.stream()

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionTsFileHandler.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,18 @@
1919

2020
package org.apache.iotdb.session.subscription.payload;
2121

22-
import org.apache.thrift.annotation.Nullable;
2322
import org.apache.tsfile.read.TsFileReader;
2423
import org.apache.tsfile.read.TsFileSequenceReader;
2524

2625
import java.io.IOException;
2726

2827
public class SubscriptionTsFileHandler extends SubscriptionFileHandler {
2928

30-
@Nullable private final String databaseName;
31-
32-
public SubscriptionTsFileHandler(final String absolutePath, @Nullable final String databaseName) {
29+
public SubscriptionTsFileHandler(final String absolutePath) {
3330
super(absolutePath);
34-
this.databaseName = databaseName;
3531
}
3632

3733
public TsFileReader openReader() throws IOException {
3834
return new TsFileReader(new TsFileSequenceReader(absolutePath));
3935
}
40-
41-
public String getDatabaseName() {
42-
return databaseName;
43-
}
4436
}

0 commit comments

Comments
 (0)