Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1292ae6
parsing both json formats (untested), not handling final return format
Sep 16, 2017
b2cc750
reverting to develop code
Sep 19, 2017
0e5f5b5
decode_and_handle_predict changes only
Sep 19, 2017
9e37507
initial changes to add_application
Sep 19, 2017
a0fd8d2
code compiles, not tested
Sep 19, 2017
5a4a287
batch prediction working for type double inputs
Nov 13, 2017
0d42e43
supporting floats and ints
Nov 13, 2017
1b144d7
byte and string types
Nov 17, 2017
8c02958
old frontend tests now compatible with new decode_and_handle_predict
Nov 17, 2017
f43114c
added tests (passing) for int, double and string types
Nov 17, 2017
7a9cfd3
added test for wrong input type error handling
Nov 20, 2017
94e2e5a
Merge branch 'develop' into batch_predict
Nov 20, 2017
b3ec5c2
reverted some docker files, scripts back to versions on develop branch
Nov 20, 2017
51ecf46
ran ./bin/format_code.sh, passing ./bin/check_format.sh
Nov 21, 2017
4404547
formatted code with correct version of clang-format
dcrankshaw Nov 27, 2017
3f5a431
Merge remote-tracking branch 'ucbrise/develop' into batch_predict
dcrankshaw Nov 27, 2017
fe5fb5d
revert files to develop branch
Nov 30, 2017
0d9ca9f
revert more files to develop
Nov 30, 2017
efd2d49
back to fe5fb5dfcd36f2cd53bebf0681f518b170bf3091
Nov 30, 2017
ddb9c9f
back to remote version
Nov 30, 2017
dbfac94
revert some files to develop branch
Nov 30, 2017
08191c8
refactored code, not formatted
Nov 30, 2017
6c923be
revert redis.hpp
Dec 1, 2017
6eaa7f4
Merge branch 'develop' into batch_predict
Dec 1, 2017
d59f0fd
single json response, array in output field
Dec 1, 2017
3461df5
batch prediction integration test
Dec 1, 2017
9d23155
typo fix
Dec 1, 2017
bed6827
format code
dcrankshaw Dec 11, 2017
3f06542
new model and app name for extra test
Dec 11, 2017
9a29ef1
Merge branch 'batch_predict' of github.com:tatevikstep/clipper into b…
Dec 11, 2017
c35f793
batch query example
Dec 11, 2017
d31c9fa
Merge branch 'develop' into batch_predict
dcrankshaw Jan 3, 2018
7aa122c
format code
dcrankshaw Jan 3, 2018
0db4c2b
Merge branch 'develop' into batch_predict
dcrankshaw Jan 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/benchmarks/src/bench_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ int get_int(const std::string &key,
std::unordered_map<std::string, std::string> &config);

/**
* Returns the value corresponding to `key` in `config` as a long
*/
* Returns the value corresponding to `key` in `config` as a long
*/
long get_long(const std::string &key,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert these changes (to keep the commit history clean).

std::unordered_map<std::string, std::string> &config);

Expand Down
6 changes: 3 additions & 3 deletions src/container/include/container/container_rpc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ class RPC {
std::shared_ptr<boost::circular_buffer<RPCLogItem>> event_log_;

/**
* @return `true` if the received heartbeat is a request for container metadata.
* `false` otherwise.
*/
* @return `true` if the received heartbeat is a request for container
* metadata. `false` otherwise.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert

bool handle_heartbeat(zmq::socket_t& socket) const;

void send_heartbeat(zmq::socket_t& socket) const;
Expand Down
2 changes: 1 addition & 1 deletion src/container/include/container/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ class CircularBuffer {
std::vector<T> items_;
const size_t capacity_;
};
}
} // namespace container

#endif // CLIPPER_UTIL_HPP
73 changes: 47 additions & 26 deletions src/frontends/src/query_frontend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,25 +318,35 @@ class RequestHandler {
}
}

folly::Future<Response> prediction = decode_and_handle_predict(
request->content.string(), name, versioned_models, policy,
latency_slo_micros, input_type);

prediction
.then([response, app_metrics](Response r) {
// Update metrics
if (r.output_is_default_) {
app_metrics.default_pred_ratio_->increment(1, 1);
} else {
app_metrics.default_pred_ratio_->increment(0, 1);
folly::Future<std::vector<folly::Try<Response>>> predictions =
decode_and_handle_predict(request->content.string(), name,
versioned_models, policy,
latency_slo_micros, input_type);

predictions
.then([response,
app_metrics](std::vector<folly::Try<Response>> tries) {
std::string final_content;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a std::stringstream to accumulate the full response.

for (auto t : tries) {
try {
Response r = t.value();
if (r.output_is_default_) {
app_metrics.default_pred_ratio_->increment(1, 1);
} else {
app_metrics.default_pred_ratio_->increment(0, 1);
}
app_metrics.latency_->insert(r.duration_micros_);
app_metrics.num_predictions_->increment(1);
app_metrics.throughput_->mark(1);

std::string content = get_prediction_response_content(r);
final_content += content + "\n";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will turn into a stringstream appends (e.g. final_content << content << "\n";)

} catch (const std::exception& e) {
// case: returned a response before all predictions in the
// batch were ready
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log this error

}
}
app_metrics.latency_->insert(r.duration_micros_);
app_metrics.num_predictions_->increment(1);
app_metrics.throughput_->mark(1);

std::string content = get_prediction_response_content(r);
respond_http(content, "200 OK", response);

respond_http(final_content, "200 OK", response);
})
.onError([response](const std::exception& e) {
clipper::log_error_formatted(clipper::LOGGING_TAG_CLIPPER,
Expand Down Expand Up @@ -479,22 +489,33 @@ class RequestHandler {
* JSON format for prediction query request:
* {
* "input" := [double] | [int] | [string] | [byte] | [float]
* "input_batch" := [[double] | [int] | [byte] | [float] | string]
* }
*/
folly::Future<Response> decode_and_handle_predict(
folly::Future<std::vector<folly::Try<Response>>> decode_and_handle_predict(
std::string json_content, std::string name,
std::vector<VersionedModelId> models, std::string policy,
long latency_slo_micros, InputType input_type) {
rapidjson::Document d;
clipper::json::parse_json(json_content, d);
long uid = 0;
// NOTE: We will eventually support personalization again so this commented
// out code is intentionally left in as a placeholder.
// long uid = clipper::json::get_long(d, "uid");
std::shared_ptr<Input> input = clipper::json::parse_input(input_type, d);
auto prediction = query_processor_.predict(
Query{name, uid, input, latency_slo_micros, policy, models});
return prediction;
std::vector<folly::Future<Response>> predictions;

if (d.HasMember("input")) {
std::shared_ptr<Input> input = clipper::json::parse_input(input_type, d);
auto prediction = query_processor_.predict(
Query{name, uid, input, latency_slo_micros, policy, models});
predictions.push_back(std::move(prediction));
} else { // d.HasMember("input_batch") instead
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to explicitly check for d.hasMember("input_batch") here, rather than leaving it as an else statement?

std::vector<std::shared_ptr<Input>> input_batch =
clipper::json::parse_input_batch(input_type, d);
for (auto input : input_batch) {
auto prediction = query_processor_.predict(
Query{name, uid, input, latency_slo_micros, policy, models});
predictions.push_back(std::move(prediction));
}
}
return folly::collectAll(predictions);
}

/*
Expand Down
87 changes: 83 additions & 4 deletions src/frontends/src/query_frontend_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ class QueryFrontendTest : public ::testing::Test {

TEST_F(QueryFrontendTest, TestDecodeCorrectInputInts) {
std::string test_json_ints = "{\"input\": [1,2,3,4]}";
Response response =
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json_ints, "test", {}, "test_policy",
30000, InputType::Ints)
.get();
Response response = responses[0].value();

Query parsed_query = response.query_;

Expand All @@ -80,12 +81,34 @@ TEST_F(QueryFrontendTest, TestDecodeCorrectInputInts) {
EXPECT_EQ(parsed_query.selection_policy_, "test_policy");
}

TEST_F(QueryFrontendTest, TestDecodeCorrectInputIntsBatch) {
std::string test_json_ints =
"{\"input_batch\": [[1, 2], [10, 20], [100, 200]]}";
std::vector<std::vector<int>> expected_input{{1, 2}, {10, 20}, {100, 200}};
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json_ints, "test", {}, "test_policy",
30000, InputType::Ints)
.get();
for (size_t index = 0; index < responses.size(); ++index) {
Response response = responses[index].value();
Query parsed_query = response.query_;

const std::vector<int>& parsed_input =
std::static_pointer_cast<IntVector>(parsed_query.input_)->get_data();
EXPECT_EQ(parsed_input, expected_input[index]);
EXPECT_EQ(parsed_query.label_, "test");
EXPECT_EQ(parsed_query.latency_budget_micros_, 30000);
EXPECT_EQ(parsed_query.selection_policy_, "test_policy");
}
}

TEST_F(QueryFrontendTest, TestDecodeCorrectInputDoubles) {
std::string test_json_doubles = "{\"input\": [1.4,2.23,3.243242,0.3223424]}";
Response response =
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json_doubles, "test", {},
"test_policy", 30000, InputType::Doubles)
.get();
Response response = responses[0].value();

Query parsed_query = response.query_;

Expand All @@ -98,14 +121,37 @@ TEST_F(QueryFrontendTest, TestDecodeCorrectInputDoubles) {
EXPECT_EQ(parsed_query.selection_policy_, "test_policy");
}

TEST_F(QueryFrontendTest, TestDecodeCorrectInputDoublesBatch) {
std::string test_json_doubles =
"{\"input_batch\": [[1.1, 2.2], [10.1, 20.2], [100.1, 200.2]]}";
std::vector<std::vector<double>> expected_input{
{1.1, 2.2}, {10.1, 20.2}, {100.1, 200.2}};
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json_doubles, "test", {},
"test_policy", 30000, InputType::Doubles)
.get();
for (size_t index = 0; index < responses.size(); ++index) {
Response response = responses[index].value();
Query parsed_query = response.query_;

const std::vector<double>& parsed_input =
std::static_pointer_cast<DoubleVector>(parsed_query.input_)->get_data();
EXPECT_EQ(parsed_input, expected_input[index]);
EXPECT_EQ(parsed_query.label_, "test");
EXPECT_EQ(parsed_query.latency_budget_micros_, 30000);
EXPECT_EQ(parsed_query.selection_policy_, "test_policy");
}
}

TEST_F(QueryFrontendTest, TestDecodeCorrectInputString) {
std::string test_json_string =
"{\"input\": \"hello world. This is a test string with "
"punctionation!@#$Y#;}#\"}";
Response response =
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json_string, "test", {}, "test_policy",
30000, InputType::Strings)
.get();
Response response = responses[0].value();

Query parsed_query = response.query_;

Expand All @@ -120,6 +166,28 @@ TEST_F(QueryFrontendTest, TestDecodeCorrectInputString) {
EXPECT_EQ(parsed_query.selection_policy_, "test_policy");
}

TEST_F(QueryFrontendTest, TestDecodeCorrectInputStringBatch) {
std::string test_json_strings =
"{\"input_batch\": [ \"this\", \"is\", \"a\", \"test\" ]}";
std::vector<std::string> expected_input{"this", "is", "a", "test"};
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json_strings, "test", {},
"test_policy", 30000, InputType::Strings)
.get();
for (size_t index = 0; index < responses.size(); ++index) {
Response response = responses[index].value();
Query parsed_query = response.query_;

const std::string& parsed_input =
std::static_pointer_cast<SerializableString>(parsed_query.input_)
->get_data();
EXPECT_EQ(parsed_input, expected_input[index]);
EXPECT_EQ(parsed_query.label_, "test");
EXPECT_EQ(parsed_query.latency_budget_micros_, 30000);
EXPECT_EQ(parsed_query.selection_policy_, "test_policy");
}
}

TEST_F(QueryFrontendTest, TestDecodeMalformedJSON) {
std::string gibberish_string1 =
"{\"uid\": 2hkdshfdshffhkj32kjhh{dskjfh32r\"3r32";
Expand Down Expand Up @@ -155,6 +223,15 @@ TEST_F(QueryFrontendTest, TestDecodeWrongInputType) {
json_semantic_error);
}

TEST_F(QueryFrontendTest, TestDecodeWrongInputTypeInBatch) {
std::string test_json_doubles =
"{\"uid\": 23, \"input_batch\": [[1,2], [3.243242,0.3223424]]}";
ASSERT_THROW(
rh_.decode_and_handle_predict(test_json_doubles, "test", {},
"test_policy", 30000, InputType::Ints),
json_semantic_error);
}

TEST_F(QueryFrontendTest, TestDecodeCorrectUpdate) {
std::string update_json =
"{\"uid\": 23, \"input\": [1.4,2.23,3.243242,0.3223424], \"label\": 1.0}";
Expand Down Expand Up @@ -200,10 +277,12 @@ TEST_F(QueryFrontendTest, TestAddManyApplications) {
TEST_F(QueryFrontendTest,
TestJsonResponseForSuccessfulPredictionFormattedCorrectly) {
std::string test_json = "{\"uid\": 1, \"input\": [1,2,3]}";
Response response =
std::vector<folly::Try<Response>> responses =
rh_.decode_and_handle_predict(test_json, "test", {}, "test_policy", 30000,
InputType::Ints)
.get();
Response response = responses[0].value();

std::string json_response = rh_.get_prediction_response_content(response);
rapidjson::Document parsed_response;
json::parse_json(json_response, parsed_response);
Expand Down
2 changes: 1 addition & 1 deletion src/libclipper/include/clipper/containers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ class ActiveContainers {
std::map<int, std::shared_ptr<ModelContainer>>>
containers_;
};
}
} // namespace clipper

#endif
2 changes: 1 addition & 1 deletion src/libclipper/include/clipper/datatypes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,5 +417,5 @@ struct hash<clipper::VersionedModelId> {
return seed;
}
};
}
} // namespace std
#endif // CLIPPER_LIB_DATATYPES_H
15 changes: 15 additions & 0 deletions src/libclipper/include/clipper/json_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ std::vector<int> get_int_array(rapidjson::Value& d, const char* key_name);
std::vector<std::string> get_string_array(rapidjson::Value& d,
const char* key_name);

std::vector<std::vector<double>> get_double_arrays(rapidjson::Value& d,
const char* key_name);

std::vector<std::vector<float>> get_float_arrays(rapidjson::Value& d,
const char* key_name);

std::vector<std::vector<int>> get_int_arrays(rapidjson::Value& d,
const char* key_name);

std::vector<std::vector<uint8_t>> get_base64_encoded_byte_arrays(
rapidjson::Value& d, const char* key_name);

std::vector<VersionedModelId> get_candidate_models(rapidjson::Value& d,
const char* key_name);

Expand All @@ -93,6 +105,9 @@ void parse_json(const std::string& json_content, rapidjson::Document& d);

std::shared_ptr<Input> parse_input(InputType input_type, rapidjson::Value& d);

std::vector<std::shared_ptr<Input>> parse_input_batch(InputType input_type,
rapidjson::Value& d);

/* Utilities for serialization into JSON */
void add_kv_pair(rapidjson::Document& d, const char* key_name,
rapidjson::Value& value_to_add);
Expand Down
38 changes: 19 additions & 19 deletions src/libclipper/include/clipper/redis.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ bool contains_prohibited_chars_for_group(std::string value);
/**
* Issues a command to Redis and checks return code.
* \return Returns true if the command was successful.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert all these whitespace changes

*/
*/
template <class ReplyT>
bool send_cmd_no_reply(redox::Redox& redis,
const std::vector<std::string>& cmd_vec) {
Expand Down Expand Up @@ -319,13 +319,13 @@ std::unordered_map<std::string, std::string> get_application_by_key(
std::vector<std::string> get_all_application_names(redox::Redox& redis);

/**
* Subscribes to changes in the model table. The
* callback is called with the string key of the model
* that changed and the Redis event type. The key can
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds, updates, and deletes if necessary.
*/
* Subscribes to changes in the model table. The
* callback is called with the string key of the model
* that changed and the Redis event type. The key can
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds, updates, and deletes if necessary.
*/
void subscribe_to_model_changes(
redox::Subscriber& subscriber,
std::function<void(const std::string&, const std::string&)> callback);
Expand All @@ -336,7 +336,7 @@ void subscribe_to_model_changes(
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds, updates, and deletes if necessary.
*/
*/
void subscribe_to_container_changes(
redox::Subscriber& subscriber,
std::function<void(const std::string&, const std::string&)> callback);
Expand All @@ -348,7 +348,7 @@ void subscribe_to_container_changes(
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds, updates, and deletes if necessary.
*/
*/
void subscribe_to_application_changes(
redox::Subscriber& subscriber,
std::function<void(const std::string&, const std::string&)> callback);
Expand All @@ -360,20 +360,20 @@ void subscribe_to_application_changes(
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds and deletes if necessary.
*/
*/
void subscribe_to_model_link_changes(
redox::Subscriber& subscriber,
std::function<void(const std::string&, const std::string&)> callback);

/**
* Subscribes to changes in model versions.
*
* The callback is called with the string key of the model
* that changed and the Redis event type. The key can
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds, updates, and deletes if necessary.
*/
* Subscribes to changes in model versions.
*
* The callback is called with the string key of the model
* that changed and the Redis event type. The key can
* be used to look up the new value. The message type identifies
* what type of change was detected. This allows subscribers
* to differentiate between adds, updates, and deletes if necessary.
*/
void subscribe_to_model_version_changes(
redox::Subscriber& subscriber,
std::function<void(const std::string&, const std::string&)> callback);
Expand Down
Loading