Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ envoy_stream_intel PlatformBridgeFilter::streamIntel() {
RELEASE_ASSERT(decoder_callbacks_, "StreamInfo accessed before filter callbacks are set");
auto& info = decoder_callbacks_->streamInfo();
// FIXME: Stream handle cannot currently be set from the filter context.
envoy_stream_intel stream_intel{-1, -1, 0};
envoy_stream_intel stream_intel{-1, -1, 0, 0};
if (info.upstreamInfo()) {
stream_intel.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
Expand Down
30 changes: 18 additions & 12 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,27 @@ void Client::DirectStreamCallbacks::encodeHeaders(const ResponseHeaderMap& heade

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
if (end_stream) {
closeStream();
}

// Capture some metadata before potentially closing the stream.
absl::string_view alpn = "";
uint64_t response_status = Utility::getResponseStatus(headers);
if (direct_stream_.request_decoder_ &&
direct_stream_.request_decoder_->streamInfo().upstreamInfo() &&
direct_stream_.request_decoder_->streamInfo().upstreamInfo()->upstreamSslConnection()) {
alpn = direct_stream_.request_decoder_->streamInfo()
.upstreamInfo()
->upstreamSslConnection()
->alpn();
if (direct_stream_.request_decoder_) {
direct_stream_.saveLatestStreamIntel();
const auto& info = direct_stream_.request_decoder_->streamInfo();
// Set the initial number of bytes consumed for the non terminal callbacks.
direct_stream_.stream_intel_.consumed_bytes_from_response =
info.getUpstreamBytesMeter() ? info.getUpstreamBytesMeter()->headerBytesReceived() : 0;
// Capture the alpn if available.
if (info.upstreamInfo() && info.upstreamInfo()->upstreamSslConnection()) {
alpn = info.upstreamInfo()->upstreamSslConnection()->alpn();
}
}

if (end_stream) {
closeStream();
}

// Track success for later bookkeeping (stream could still be reset).
uint64_t response_status = Utility::getResponseStatus(headers);
success_ = CodeUtility::is2xx(response_status);

ENVOY_LOG(debug, "[S{}] dispatching to platform response headers for stream (end_stream={}):\n{}",
Expand Down Expand Up @@ -123,6 +127,8 @@ void Client::DirectStreamCallbacks::sendDataToBridge(Buffer::Instance& data, boo

// Cap by bytes_to_send_ if and only if applying explicit flow control.
uint32_t bytes_to_send = calculateBytesToSend(data, bytes_to_send_);
// Update the number of bytes consumed by this non terminal callback.
direct_stream_.stream_intel_.consumed_bytes_from_response += bytes_to_send;
// Only send end stream if all data is being sent.
bool send_end_stream = end_stream && (bytes_to_send == data.length());

Expand Down
2 changes: 1 addition & 1 deletion library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0};
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_{-1, -1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, 0, 0, 0, 0};
StreamInfo::BytesMeterSharedPtr bytes_meter_;
Expand Down
3 changes: 2 additions & 1 deletion library/common/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ jbyteArray native_data_to_array(JNIEnv* env, envoy_data data) {
}

jlongArray native_stream_intel_to_array(JNIEnv* env, envoy_stream_intel stream_intel) {
jlongArray j_array = env->NewLongArray(3);
jlongArray j_array = env->NewLongArray(4);
jlong* critical_array = static_cast<jlong*>(env->GetPrimitiveArrayCritical(j_array, nullptr));
RELEASE_ASSERT(critical_array != nullptr, "unable to allocate memory in jni_utility");
critical_array[0] = static_cast<jlong>(stream_intel.stream_id);
critical_array[1] = static_cast<jlong>(stream_intel.connection_id);
critical_array[2] = static_cast<jlong>(stream_intel.attempt_count);
critical_array[3] = static_cast<jlong>(stream_intel.consumed_bytes_from_response);
// Here '0' (for which there is no named constant) indicates we want to commit the changes back
// to the JVM and free the c array, where applicable.
env->ReleasePrimitiveArrayCritical(j_array, critical_array, 0);
Expand Down
6 changes: 6 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ typedef struct {
int64_t connection_id;
// The number of internal attempts to carry out a request/operation. 0 if not present.
uint64_t attempt_count;
// Number of bytes consumed by the non terminal callbacks out of the response.
// NOTE: on terminal callbacks (on_complete, on_error_, on_cancel), this value will not be equal
// to envoy_final_stream_intel.received_byte_count. The latter represents the real number
// of bytes received before decompression. consumed_bytes_from_response omits the number
// number of bytes related to the Status Line, and is after decompression.
uint64_t consumed_bytes_from_response;
} envoy_stream_intel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ class EnvoyStreamIntelImpl implements EnvoyStreamIntel {
private long streamId;
private long connectionId;
private long attemptCount;
private long consumedBytesFromResponse;

EnvoyStreamIntelImpl(long[] values) {
streamId = values[0];
connectionId = values[1];
attemptCount = values[2];
consumedBytesFromResponse = values[3];
}

@Override
Expand All @@ -27,4 +29,9 @@ public long getConnectionId() {
public long getAttemptCount() {
return attemptCount;
}

@Override
public long getConsumedBytesFromResponse() {
return consumedBytesFromResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

/**
* Exposes internal HTTP stream metrics, context, and other details sent once on stream end.
*
* Note: a value of -1 means "not present" for any field where the name is suffixed with "Ms".
*/
public interface EnvoyFinalStreamIntel {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Exposes internal HTTP stream metrics, context, and other details.
*/
public interface EnvoyStreamIntel {

/**
* An internal identifier for the stream.
*/
Expand All @@ -18,4 +19,14 @@ public interface EnvoyStreamIntel {
* The number of internal attempts to carry out a request/operation.
*/
public long getAttemptCount();

/**
* The number of bytes consumed by the non terminal callbacks, from the response.
*
* <p>>NOTE: on terminal callbacks (on_complete, on_error_, on_cancel), this value will not be
* equal to {@link EnvoyFinalStreamIntel#getReceivedByteCount()}. The latter represents the real
* number of bytes received before decompression. getConsumedBytesFromResponse() omits the number
* number of bytes related to the Status Line, and is after decompression.
*/
public long getConsumedBytesFromResponse();
}
Loading