Skip to content

Commit 307c047

Browse files
branch-2.1: [fix](Prepared Statment) Fix exec prepared insert stmt in non master error (#48689) (#52265)
backport: #48689 Co-authored-by: Lijia Liu <liutang123@yeah.net>
1 parent f0ce208 commit 307c047

8 files changed

Lines changed: 85 additions & 9 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PrepareCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
109109
}
110110
ctx.addPreparedStatementContext(name,
111111
new PreparedStatementContext(this, ctx, ctx.getStatementContext(), name));
112-
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE) {
112+
if (ctx.getCommand() == MysqlCommand.COM_STMT_PREPARE && !ctx.isProxy()) {
113113
executor.sendStmtPrepareOK(Integer.parseInt(name), labels);
114114
}
115115
}

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,16 @@
6969
import com.google.common.collect.Maps;
7070
import com.google.common.collect.Sets;
7171
import io.netty.util.concurrent.FastThreadLocal;
72+
import lombok.Getter;
73+
import lombok.Setter;
7274
import org.apache.commons.lang3.StringUtils;
7375
import org.apache.logging.log4j.LogManager;
7476
import org.apache.logging.log4j.Logger;
7577
import org.json.JSONObject;
7678
import org.xnio.StreamConnection;
7779

7880
import java.io.IOException;
81+
import java.nio.ByteBuffer;
7982
import java.util.HashMap;
8083
import java.util.List;
8184
import java.util.Map;
@@ -239,6 +242,10 @@ public enum ConnectType {
239242
// it's default thread-safe
240243
private boolean isProxy = false;
241244

245+
@Getter
246+
@Setter
247+
private ByteBuffer prepareExecuteBuffer;
248+
242249
private MysqlHandshakePacket mysqlHandshakePacket;
243250

244251
public void setUserQueryTimeout(int queryTimeout) {

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.apache.doris.nereids.parser.NereidsParser;
6262
import org.apache.doris.nereids.stats.StatsErrorEstimator;
6363
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
64+
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
6465
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
6566
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
6667
import org.apache.doris.plugin.DialectConverterPlugin;
@@ -87,6 +88,7 @@
8788
import java.io.IOException;
8889
import java.io.StringReader;
8990
import java.nio.ByteBuffer;
91+
import java.nio.ByteOrder;
9092
import java.util.ArrayList;
9193
import java.util.List;
9294
import java.util.Map;
@@ -732,8 +734,24 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
732734
UUID uuid = UUID.randomUUID();
733735
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
734736
}
735-
736-
executor.execute(queryId);
737+
if (request.isSetPrepareExecuteBuffer()) {
738+
ctx.setCommand(MysqlCommand.COM_STMT_PREPARE);
739+
executor.execute();
740+
ctx.setCommand(MysqlCommand.COM_STMT_EXECUTE);
741+
String preparedStmtId = executor.getPrepareStmtName();
742+
PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(preparedStmtId);
743+
if (preparedStatementContext == null) {
744+
if (LOG.isDebugEnabled()) {
745+
LOG.debug("Something error, just support nereids preparedStmtId:{}", preparedStmtId);
746+
}
747+
throw new RuntimeException("Prepare failed when proxy execute");
748+
}
749+
handleExecute(preparedStatementContext.command, Long.parseLong(preparedStmtId),
750+
preparedStatementContext,
751+
ByteBuffer.wrap(request.getPrepareExecuteBuffer()).order(ByteOrder.LITTLE_ENDIAN), queryId);
752+
} else {
753+
executor.execute(queryId);
754+
}
737755
} catch (IOException e) {
738756
// Client failed.
739757
LOG.warn("Process one query failed because IOException: ", e);
@@ -796,4 +814,10 @@ private Map<String, LiteralExpr> userVariableFromThrift(Map<String, TExprNode> t
796814
throw new TException(e.getMessage());
797815
}
798816
}
817+
818+
819+
protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
820+
ByteBuffer packetBuf, TUniqueId queryId) {
821+
throw new NotSupportedException("Just MysqlConnectProcessor support execute");
822+
}
799823
}

fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.doris.common.ClientPool;
2424
import org.apache.doris.common.DdlException;
2525
import org.apache.doris.common.ErrorCode;
26+
import org.apache.doris.mysql.MysqlCommand;
2627
import org.apache.doris.thrift.FrontendService;
2728
import org.apache.doris.thrift.TExpr;
2829
import org.apache.doris.thrift.TExprNode;
@@ -183,6 +184,12 @@ private TMasterOpRequest buildStmtForwardParams() {
183184
if (null != ctx.queryId()) {
184185
params.setQueryId(ctx.queryId());
185186
}
187+
188+
if (ctx.getCommand() == MysqlCommand.COM_STMT_EXECUTE) {
189+
if (null != ctx.getPrepareExecuteBuffer()) {
190+
params.setPrepareExecuteBuffer(ctx.getPrepareExecuteBuffer());
191+
}
192+
}
186193
return params;
187194
}
188195

fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.doris.nereids.trees.plans.PlaceholderId;
4747
import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand;
4848
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
49+
import org.apache.doris.thrift.TUniqueId;
4950

5051
import com.google.common.base.Preconditions;
5152
import com.google.common.base.Strings;
@@ -170,14 +171,31 @@ private void handleExecute(PrepareStmt prepareStmt, long stmtId) {
170171
}
171172
}
172173

173-
private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx) {
174+
private String getHexStr(ByteBuffer packetBuf) {
175+
byte[] bytes = packetBuf.array();
176+
StringBuilder hex = new StringBuilder();
177+
for (int i = packetBuf.position(); i < packetBuf.limit(); ++i) {
178+
hex.append(String.format("%02X ", bytes[i]));
179+
}
180+
return hex.toString();
181+
}
182+
183+
@Override
184+
protected void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedStatementContext prepCtx,
185+
ByteBuffer packetBuf, TUniqueId queryId) {
174186
int paramCount = prepareCommand.placeholderCount();
175187
LOG.debug("execute prepared statement {}, paramCount {}", stmtId, paramCount);
176188
// null bitmap
177189
String stmtStr = "";
178190
try {
179191
StatementContext statementContext = prepCtx.statementContext;
180192
if (paramCount > 0) {
193+
if (LOG.isDebugEnabled()) {
194+
LOG.debug("execute param buf: {}, array: {}", packetBuf, getHexStr(packetBuf));
195+
}
196+
if (!ctx.isProxy()) {
197+
ctx.setPrepareExecuteBuffer(packetBuf.duplicate());
198+
}
181199
byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
182200
packetBuf.get(nullbitmapData);
183201
// new_params_bind_flag
@@ -218,7 +236,11 @@ private void handleExecute(PrepareCommand prepareCommand, long stmtId, PreparedS
218236
stmt.setOrigStmt(prepareCommand.getOriginalStmt());
219237
executor = new StmtExecutor(ctx, stmt);
220238
ctx.setExecutor(executor);
221-
executor.execute();
239+
if (null != queryId) {
240+
executor.execute(queryId);
241+
} else {
242+
executor.execute();
243+
}
222244
if (ctx.getSessionVariable().isEnablePreparedStmtAuditLog()) {
223245
stmtStr = executeStmt.toSql();
224246
stmtStr = stmtStr + " /*originalSql = " + prepareCommand.getOriginalStmt().originStmt + "*/";
@@ -266,7 +288,7 @@ private void handleExecute() {
266288
"msg: Not supported such prepared statement");
267289
return;
268290
}
269-
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext);
291+
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext, packetBuf, null);
270292
}
271293
}
272294

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
144144
import org.apache.doris.nereids.minidump.MinidumpUtils;
145145
import org.apache.doris.nereids.parser.NereidsParser;
146+
import org.apache.doris.nereids.trees.expressions.Placeholder;
146147
import org.apache.doris.nereids.trees.plans.commands.Command;
147148
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
148149
import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
@@ -269,6 +270,7 @@ public class StmtExecutor {
269270
private boolean isCached;
270271
private String stmtName;
271272
private StatementBase prepareStmt = null;
273+
private String prepareStmtName; // for prox
272274
private String mysqlLoadId;
273275
// Distinguish from prepare and execute command
274276
private boolean isExecuteStmt = false;
@@ -682,8 +684,12 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
682684
}
683685
long stmtId = Config.prepared_stmt_start_id > 0
684686
? Config.prepared_stmt_start_id : context.getPreparedStmtId();
685-
logicalPlan = new PrepareCommand(String.valueOf(stmtId),
686-
logicalPlan, statementContext.getPlaceholders(), originStmt);
687+
this.prepareStmtName = String.valueOf(stmtId);
688+
// When proxy executing, this.statementContext is created in constructor.
689+
// But context.statementContext is created in LogicalPlanBuilder.
690+
List<Placeholder> placeholders = context == null
691+
? statementContext.getPlaceholders() : context.getStatementContext().getPlaceholders();
692+
logicalPlan = new PrepareCommand(prepareStmtName, logicalPlan, placeholders, originStmt);
687693
}
688694
// when we in transaction mode, we only support insert into command and transaction command
689695
if (context.isTxnModel()) {
@@ -3488,4 +3494,8 @@ public void sendProxyQueryResult() throws IOException {
34883494
context.getMysqlChannel().sendOnePacket(byteBuffer);
34893495
}
34903496
}
3497+
3498+
public String getPrepareStmtName() {
3499+
return this.prepareStmtName;
3500+
}
34913501
}

gensrc/thrift/FrontendService.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ struct TMasterOpRequest {
571571
// transaction load
572572
29: optional TTxnLoadInfo txnLoadInfo
573573
30: optional TGroupCommitInfo groupCommitInfo
574+
31: optional binary prepareExecuteBuffer
574575
}
575576

576577
struct TColumnDefinition {

regression-test/suites/query_p0/test_forward_qeury.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,12 @@ suite("test_forward_query", 'docker') {
4343

4444
cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.forward_all_queries' : [forwardAllQueries:true, execute:1]])
4545

46-
def ret = sql """ SELECT * FROM ${tbl} """
46+
def stmt = prepareStatement("""INSERT INTO ${tbl} VALUES(?);""")
47+
stmt.setInt(1, 2)
48+
stmt.executeUpdate()
49+
50+
def ret = sql """ SELECT * FROM ${tbl} order by k1"""
4751
assertEquals(ret[0][0], 1)
52+
assertEquals(ret[1][0], 2)
4853
}
4954
}

0 commit comments

Comments
 (0)