-
Notifications
You must be signed in to change notification settings - Fork 280
Batch predict #321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Batch predict #321
Changes from 16 commits
1292ae6
b2cc750
0e5f5b5
9e37507
a0fd8d2
5a4a287
0d42e43
1b144d7
8c02958
f43114c
7a9cfd3
94e2e5a
b3ec5c2
51ecf46
4404547
3f5a431
fe5fb5d
0d9ca9f
efd2d49
ddb9c9f
dbfac94
08191c8
6c923be
6eaa7f4
d59f0fd
3461df5
9d23155
bed6827
3f06542
9a29ef1
c35f793
d31c9fa
7aa122c
0db4c2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -155,9 +155,9 @@ class RPC { | |
| std::shared_ptr<boost::circular_buffer<RPCLogItem>> event_log_; | ||
|
|
||
| /** | ||
| * @return `true` if the received heartbeat is a request for container metadata. | ||
| * `false` otherwise. | ||
| */ | ||
| * @return `true` if the received heartbeat is a request for container | ||
| * metadata. `false` otherwise. | ||
| */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert |
||
| bool handle_heartbeat(zmq::socket_t& socket) const; | ||
|
|
||
| void send_heartbeat(zmq::socket_t& socket) const; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -318,25 +318,35 @@ class RequestHandler { | |
| } | ||
| } | ||
|
|
||
| folly::Future<Response> prediction = decode_and_handle_predict( | ||
| request->content.string(), name, versioned_models, policy, | ||
| latency_slo_micros, input_type); | ||
|
|
||
| prediction | ||
| .then([response, app_metrics](Response r) { | ||
| // Update metrics | ||
| if (r.output_is_default_) { | ||
| app_metrics.default_pred_ratio_->increment(1, 1); | ||
| } else { | ||
| app_metrics.default_pred_ratio_->increment(0, 1); | ||
| folly::Future<std::vector<folly::Try<Response>>> predictions = | ||
| decode_and_handle_predict(request->content.string(), name, | ||
| versioned_models, policy, | ||
| latency_slo_micros, input_type); | ||
|
|
||
| predictions | ||
| .then([response, | ||
| app_metrics](std::vector<folly::Try<Response>> tries) { | ||
| std::string final_content; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a |
||
| for (auto t : tries) { | ||
| try { | ||
| Response r = t.value(); | ||
| if (r.output_is_default_) { | ||
| app_metrics.default_pred_ratio_->increment(1, 1); | ||
| } else { | ||
| app_metrics.default_pred_ratio_->increment(0, 1); | ||
| } | ||
| app_metrics.latency_->insert(r.duration_micros_); | ||
| app_metrics.num_predictions_->increment(1); | ||
| app_metrics.throughput_->mark(1); | ||
|
|
||
| std::string content = get_prediction_response_content(r); | ||
| final_content += content + "\n"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will turn into a stringstream appends (e.g. |
||
| } catch (const std::exception& e) { | ||
| // case: returned a response before all predictions in the | ||
| // batch were ready | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Log this error |
||
| } | ||
| } | ||
| app_metrics.latency_->insert(r.duration_micros_); | ||
| app_metrics.num_predictions_->increment(1); | ||
| app_metrics.throughput_->mark(1); | ||
|
|
||
| std::string content = get_prediction_response_content(r); | ||
| respond_http(content, "200 OK", response); | ||
|
|
||
| respond_http(final_content, "200 OK", response); | ||
| }) | ||
| .onError([response](const std::exception& e) { | ||
| clipper::log_error_formatted(clipper::LOGGING_TAG_CLIPPER, | ||
|
|
@@ -479,22 +489,33 @@ class RequestHandler { | |
| * JSON format for prediction query request: | ||
| * { | ||
| * "input" := [double] | [int] | [string] | [byte] | [float] | ||
| * "input_batch" := [[double] | [int] | [byte] | [float] | string] | ||
| * } | ||
| */ | ||
| folly::Future<Response> decode_and_handle_predict( | ||
| folly::Future<std::vector<folly::Try<Response>>> decode_and_handle_predict( | ||
| std::string json_content, std::string name, | ||
| std::vector<VersionedModelId> models, std::string policy, | ||
| long latency_slo_micros, InputType input_type) { | ||
| rapidjson::Document d; | ||
| clipper::json::parse_json(json_content, d); | ||
| long uid = 0; | ||
| // NOTE: We will eventually support personalization again so this commented | ||
| // out code is intentionally left in as a placeholder. | ||
| // long uid = clipper::json::get_long(d, "uid"); | ||
| std::shared_ptr<Input> input = clipper::json::parse_input(input_type, d); | ||
| auto prediction = query_processor_.predict( | ||
| Query{name, uid, input, latency_slo_micros, policy, models}); | ||
| return prediction; | ||
| std::vector<folly::Future<Response>> predictions; | ||
|
|
||
| if (d.HasMember("input")) { | ||
| std::shared_ptr<Input> input = clipper::json::parse_input(input_type, d); | ||
| auto prediction = query_processor_.predict( | ||
| Query{name, uid, input, latency_slo_micros, policy, models}); | ||
| predictions.push_back(std::move(prediction)); | ||
| } else { // d.HasMember("input_batch") instead | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense to explicitly check for |
||
| std::vector<std::shared_ptr<Input>> input_batch = | ||
| clipper::json::parse_input_batch(input_type, d); | ||
| for (auto input : input_batch) { | ||
| auto prediction = query_processor_.predict( | ||
| Query{name, uid, input, latency_slo_micros, policy, models}); | ||
| predictions.push_back(std::move(prediction)); | ||
| } | ||
| } | ||
| return folly::collectAll(predictions); | ||
| } | ||
|
|
||
| /* | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,7 @@ bool contains_prohibited_chars_for_group(std::string value); | |
| /** | ||
| * Issues a command to Redis and checks return code. | ||
| * \return Returns true if the command was successful. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert all these whitespace changes |
||
| */ | ||
| */ | ||
| template <class ReplyT> | ||
| bool send_cmd_no_reply(redox::Redox& redis, | ||
| const std::vector<std::string>& cmd_vec) { | ||
|
|
@@ -319,13 +319,13 @@ std::unordered_map<std::string, std::string> get_application_by_key( | |
| std::vector<std::string> get_all_application_names(redox::Redox& redis); | ||
|
|
||
| /** | ||
| * Subscribes to changes in the model table. The | ||
| * callback is called with the string key of the model | ||
| * that changed and the Redis event type. The key can | ||
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds, updates, and deletes if necessary. | ||
| */ | ||
| * Subscribes to changes in the model table. The | ||
| * callback is called with the string key of the model | ||
| * that changed and the Redis event type. The key can | ||
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds, updates, and deletes if necessary. | ||
| */ | ||
| void subscribe_to_model_changes( | ||
| redox::Subscriber& subscriber, | ||
| std::function<void(const std::string&, const std::string&)> callback); | ||
|
|
@@ -336,7 +336,7 @@ void subscribe_to_model_changes( | |
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds, updates, and deletes if necessary. | ||
| */ | ||
| */ | ||
| void subscribe_to_container_changes( | ||
| redox::Subscriber& subscriber, | ||
| std::function<void(const std::string&, const std::string&)> callback); | ||
|
|
@@ -348,7 +348,7 @@ void subscribe_to_container_changes( | |
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds, updates, and deletes if necessary. | ||
| */ | ||
| */ | ||
| void subscribe_to_application_changes( | ||
| redox::Subscriber& subscriber, | ||
| std::function<void(const std::string&, const std::string&)> callback); | ||
|
|
@@ -360,20 +360,20 @@ void subscribe_to_application_changes( | |
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds and deletes if necessary. | ||
| */ | ||
| */ | ||
| void subscribe_to_model_link_changes( | ||
| redox::Subscriber& subscriber, | ||
| std::function<void(const std::string&, const std::string&)> callback); | ||
|
|
||
| /** | ||
| * Subscribes to changes in model versions. | ||
| * | ||
| * The callback is called with the string key of the model | ||
| * that changed and the Redis event type. The key can | ||
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds, updates, and deletes if necessary. | ||
| */ | ||
| * Subscribes to changes in model versions. | ||
| * | ||
| * The callback is called with the string key of the model | ||
| * that changed and the Redis event type. The key can | ||
| * be used to look up the new value. The message type identifies | ||
| * what type of change was detected. This allows subscribers | ||
| * to differentiate between adds, updates, and deletes if necessary. | ||
| */ | ||
| void subscribe_to_model_version_changes( | ||
| redox::Subscriber& subscriber, | ||
| std::function<void(const std::string&, const std::string&)> callback); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert these changes (to keep the commit history clean).