Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ envoy_stream_intel PlatformBridgeFilter::streamIntel() {
RELEASE_ASSERT(decoder_callbacks_, "StreamInfo accessed before filter callbacks are set");
auto& info = decoder_callbacks_->streamInfo();
// FIXME: Stream handle cannot currently be set from the filter context.
envoy_stream_intel stream_intel{-1, -1, 0};
envoy_stream_intel stream_intel{-1, -1, 0, 0};
if (info.upstreamInfo()) {
stream_intel.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
Expand Down
11 changes: 7 additions & 4 deletions library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void Client::DirectStreamCallbacks::encodeHeaders(const ResponseHeaderMap& heade

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
direct_stream_.saveLatestStreamIntel(headerBytesReceived());
if (end_stream) {
closeStream();
}
Expand Down Expand Up @@ -84,7 +84,7 @@ void Client::DirectStreamCallbacks::encodeData(Buffer::Instance& data, bool end_

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
direct_stream_.saveLatestStreamIntel(bytesReceived());
if (end_stream) {
closeStream();
}
Expand Down Expand Up @@ -147,7 +147,7 @@ void Client::DirectStreamCallbacks::encodeTrailers(const ResponseTrailerMap& tra

ASSERT(http_client_.getStream(direct_stream_.stream_handle_,
GetStreamFilters::ALLOW_FOR_ALL_STREAMS));
direct_stream_.saveLatestStreamIntel();
direct_stream_.saveLatestStreamIntel(bytesReceived());
closeStream(); // Trailers always indicate the end of the stream.

// For explicit flow control, don't send data unless prompted.
Expand Down Expand Up @@ -291,20 +291,23 @@ envoy_final_stream_intel& Client::DirectStreamCallbacks::finalStreamIntel() {
return direct_stream_.envoy_final_stream_intel_;
}

void Client::DirectStream::saveLatestStreamIntel() {
void Client::DirectStream::saveLatestStreamIntel(uint64_t received_byte_count) {
const auto& info = request_decoder_->streamInfo();
if (info.upstreamInfo()) {
stream_intel_.connection_id = info.upstreamInfo()->upstreamConnectionId().value_or(-1);
}
stream_intel_.stream_id = static_cast<uint64_t>(stream_handle_);
stream_intel_.attempt_count = info.attemptCount().value_or(0);
stream_intel_.received_byte_count = received_byte_count;
}

void Client::DirectStream::saveFinalStreamIntel() {
if (!request_decoder_ || !parent_.getStream(stream_handle_, ALLOW_ONLY_FOR_OPEN_STREAMS)) {
return;
}
StreamInfo::setFinalStreamIntel(request_decoder_->streamInfo(), envoy_final_stream_intel_);
// stream_intel_ may have an outdated received_byte_count - the final one is correct.
stream_intel_.received_byte_count = envoy_final_stream_intel_.received_byte_count;
}

envoy_error Client::DirectStreamCallbacks::streamError() {
Expand Down
17 changes: 15 additions & 2 deletions library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ class Client : public Logger::Loggable<Logger::Id::http> {

private:
bool hasBufferedData() { return response_data_.get() && response_data_->length() != 0; }
const StreamInfo::StreamInfo& streamInfo() {
return direct_stream_.request_decoder_->streamInfo();
}
uint64_t headerBytesReceived() {
return streamInfo().getUpstreamBytesMeter()
? streamInfo().getUpstreamBytesMeter()->headerBytesReceived()
: 0;
}
uint64_t bytesReceived() {
return streamInfo().getUpstreamBytesMeter()
? streamInfo().getUpstreamBytesMeter()->wireBytesReceived()
: 0;
}

void sendDataToBridge(Buffer::Instance& data, bool end_stream);
void sendTrailersToBridge(const ResponseTrailerMap& trailers);
Expand Down Expand Up @@ -248,7 +261,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
}

// Latches stream information as it may not be available when accessed.
void saveLatestStreamIntel();
void saveLatestStreamIntel(uint64_t received_byte_count);

// Latches latency info from stream info before it goes away.
void saveFinalStreamIntel();
Expand Down Expand Up @@ -279,7 +292,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// read faster than the mobile caller can process it.
bool explicit_flow_control_ = false;
// Latest intel data retrieved from the StreamInfo.
envoy_stream_intel stream_intel_{-1, -1, 0};
envoy_stream_intel stream_intel_{-1, -1, 0, 0};
envoy_final_stream_intel envoy_final_stream_intel_{-1, -1, -1, -1, -1, -1, -1,
-1, -1, -1, -1, 0, 0, 0};
StreamInfo::BytesMeterSharedPtr bytes_meter_;
Expand Down
3 changes: 2 additions & 1 deletion library/common/jni/jni_utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,13 @@ jbyteArray native_data_to_array(JNIEnv* env, envoy_data data) {
}

jlongArray native_stream_intel_to_array(JNIEnv* env, envoy_stream_intel stream_intel) {
jlongArray j_array = env->NewLongArray(3);
jlongArray j_array = env->NewLongArray(4);
jlong* critical_array = static_cast<jlong*>(env->GetPrimitiveArrayCritical(j_array, nullptr));
RELEASE_ASSERT(critical_array != nullptr, "unable to allocate memory in jni_utility");
critical_array[0] = static_cast<jlong>(stream_intel.stream_id);
critical_array[1] = static_cast<jlong>(stream_intel.connection_id);
critical_array[2] = static_cast<jlong>(stream_intel.attempt_count);
critical_array[3] = static_cast<jlong>(stream_intel.received_byte_count);
// Here '0' (for which there is no named constant) indicates we want to commit the changes back
// to the JVM and free the c array, where applicable.
env->ReleasePrimitiveArrayCritical(j_array, critical_array, 0);
Expand Down
2 changes: 2 additions & 0 deletions library/common/types/c_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ typedef struct {
int64_t connection_id;
// The number of internal attempts to carry out a request/operation. 0 if not present.
uint64_t attempt_count;
// The number of bytes received from upstream.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is close but not 100% accurate right?
Generally we'll include all the body bytes read, except on the call to pass headers up at which point we only send up header bytes. Do you think clarifying is going to cause more confusion than it solves?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concocted something - done.

uint64_t received_byte_count;
} envoy_stream_intel;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ class EnvoyStreamIntelImpl implements EnvoyStreamIntel {
private long streamId;
private long connectionId;
private long attemptCount;
private long receivedByteCount;

EnvoyStreamIntelImpl(long[] values) {
streamId = values[0];
connectionId = values[1];
attemptCount = values[2];
receivedByteCount = values[3];
}

@Override
Expand All @@ -27,4 +29,9 @@ public long getConnectionId() {
public long getAttemptCount() {
return attemptCount;
}

@Override
public long getReceivedByteCount() {
return receivedByteCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

/**
* Exposes internal HTTP stream metrics, context, and other details sent once on stream end.
*
* Note: a value of -1 means "not present" for any field where the name is suffixed with "Ms".
*/
public interface EnvoyFinalStreamIntel {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ public interface EnvoyStreamIntel {
* The number of internal attempts to carry out a request/operation.
*/
public long getAttemptCount();
/*
* The number of bytes received from upstream.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update commments?

Copy link
Copy Markdown
Contributor Author

@carloseltuerto carloseltuerto Jan 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments updated, and also using the same names everywhere. Thanks.

*/
public long getReceivedByteCount();
}
Loading