-
Notifications
You must be signed in to change notification settings - Fork 280
Fluentd Logging System Part 1 #652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
aa6e1db
f43fa27
c43e313
a05443a
4525f96
d7bd73a
f0dae98
00c2d91
3b5b1ca
b255c4b
90d480a
bb6a990
09fc089
0418be7
dff44da
7bf1f1b
132baa7
a0c8501
0f5af00
43cfb4b
80db695
3500eff
b546b16
18bef18
2d3657a
d9cd323
008d0ac
9aec964
5c9f25f
2ea53ce
6f5673a
0977ad5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,30 +5,35 @@ | |
| import docker | ||
| import logging | ||
| import os | ||
| import sys | ||
| import random | ||
| import time | ||
| import json | ||
| import tempfile | ||
| from ..container_manager import ( | ||
| create_model_container_label, parse_model_container_label, | ||
| ContainerManager, CLIPPER_DOCKER_LABEL, CLIPPER_MODEL_CONTAINER_LABEL, | ||
| CLIPPER_QUERY_FRONTEND_CONTAINER_LABEL, | ||
| CLIPPER_MGMT_FRONTEND_CONTAINER_LABEL, CLIPPER_INTERNAL_RPC_PORT, | ||
| CLIPPER_INTERNAL_QUERY_PORT, CLIPPER_INTERNAL_MANAGEMENT_PORT, | ||
| CLIPPER_INTERNAL_MANAGEMENT_PORT, | ||
| CLIPPER_INTERNAL_METRIC_PORT, CLIPPER_INTERNAL_REDIS_PORT, | ||
| CLIPPER_DOCKER_PORT_LABELS, CLIPPER_METRIC_CONFIG_LABEL, ClusterAdapter) | ||
| from ..exceptions import ClipperException | ||
| CLIPPER_DOCKER_PORT_LABELS, CLIPPER_METRIC_CONFIG_LABEL, ClusterAdapter, | ||
| CLIPPER_FLUENTD_CONFIG_LABEL) | ||
| from requests.exceptions import ConnectionError | ||
| from .docker_metric_utils import * | ||
| from clipper_admin.docker.logging.docker_logging_utils import ( | ||
| get_logs_from_containers, | ||
| get_default_log_config | ||
| ) | ||
| from clipper_admin.docker.logging.fluentd.fluentd import Fluentd | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class DockerContainerManager(ContainerManager): | ||
| # Logging-TODO Add SQLITE support | ||
| def __init__(self, | ||
| cluster_name="default-cluster", | ||
| docker_ip_address="localhost", | ||
| use_centralized_log=False, | ||
| fluentd_port=24224, | ||
|
rkooo567 marked this conversation as resolved.
Outdated
|
||
| clipper_query_port=1337, | ||
| clipper_management_port=1338, | ||
| clipper_rpc_port=7000, | ||
|
|
@@ -47,6 +52,10 @@ def __init__(self, | |
| The public hostname or IP address at which the Clipper Docker | ||
| containers can be accessed via their exposed ports. This should almost always | ||
| be "localhost". Only change if you know what you're doing! | ||
| use_centralized_log: bool, optional | ||
| If it is True, Clipper sets up Fluentd and DB (Currently SQlite) to centralize logs | ||
| fluentd_port : int, optional | ||
| The port on which the fluentd logging driver should listen to centralize logs. | ||
| clipper_query_port : int, optional | ||
| The port on which the query frontend should listen for incoming prediction requests. | ||
| clipper_management_port : int, optional | ||
|
|
@@ -80,6 +89,7 @@ def __init__(self, | |
| self.external_redis = True | ||
| self.redis_port = redis_port | ||
| self.prometheus_port = prometheus_port | ||
| self.centralize_log = use_centralized_log | ||
| if docker_network is "host": | ||
| raise ClipperException( | ||
| "DockerContainerManager does not support running Clipper on the " | ||
|
|
@@ -113,6 +123,21 @@ def __init__(self, | |
| 'cluster_name': self.cluster_identifier | ||
| }) | ||
|
|
||
| # Setting Docker cluster logging. | ||
| # Logging-TODO Add SQLITE support | ||
|
rkooo567 marked this conversation as resolved.
Outdated
|
||
| self.logging_system = Fluentd | ||
| self.log_config = get_default_log_config() | ||
| self.logging_system_instance = None | ||
|
|
||
| if self.centralize_log: | ||
| self.logging_system_instance = self.logging_system( | ||
| self.logger, | ||
| self.cluster_name, | ||
| self.docker_client, | ||
| port=find_unbound_port(fluentd_port) | ||
| ) | ||
| self.log_config = self.logging_system_instance.get_log_config() | ||
|
|
||
| def start_clipper(self, | ||
| query_frontend_image, | ||
| mgmt_frontend_image, | ||
|
|
@@ -152,6 +177,11 @@ def start_clipper(self, | |
| "Please use ClipperConnection.connect() to connect to it.". | ||
| format(self.cluster_name)) | ||
|
|
||
| if self.centralize_log: | ||
| # Logging-TODO Initialize SQLite Logging DB | ||
| self.logging_system_instance.start(self.common_labels, self.extra_container_kwargs) | ||
|
|
||
| # Redis for cluster configuration | ||
| if not self.external_redis: | ||
| self.logger.info("Starting managed Redis instance in Docker") | ||
| self.redis_port = find_unbound_port(self.redis_port) | ||
|
|
@@ -161,6 +191,7 @@ def start_clipper(self, | |
| redis_container = self.docker_client.containers.run( | ||
| 'redis:alpine', | ||
| "redis-server --port %s" % CLIPPER_INTERNAL_REDIS_PORT, | ||
| log_config=self.log_config, | ||
| name="redis-{}".format(random.randint( | ||
| 0, 100000)), # generate a random name | ||
| ports={ | ||
|
|
@@ -170,6 +201,7 @@ def start_clipper(self, | |
| **self.extra_container_kwargs) | ||
| self.redis_ip = redis_container.name | ||
|
|
||
| # frontend management | ||
| mgmt_cmd = "--redis_ip={redis_ip} --redis_port={redis_port}".format( | ||
| redis_ip=self.redis_ip, redis_port=CLIPPER_INTERNAL_REDIS_PORT) | ||
| self.clipper_management_port = find_unbound_port( | ||
|
|
@@ -181,6 +213,7 @@ def start_clipper(self, | |
| self.docker_client.containers.run( | ||
| mgmt_frontend_image, | ||
| mgmt_cmd, | ||
| log_config=self.log_config, | ||
| name="mgmt_frontend-{}".format(random.randint( | ||
| 0, 100000)), # generate a random name | ||
| ports={ | ||
|
|
@@ -190,6 +223,7 @@ def start_clipper(self, | |
| labels=mgmt_labels, | ||
| **self.extra_container_kwargs) | ||
|
|
||
| # query frontend | ||
| query_cmd = ("--redis_ip={redis_ip} --redis_port={redis_port} " | ||
| "--prediction_cache_size={cache_size} " | ||
| "--thread_pool_size={thread_pool_size} " | ||
|
|
@@ -214,6 +248,7 @@ def start_clipper(self, | |
| self.docker_client.containers.run( | ||
| query_frontend_image, | ||
| query_cmd, | ||
| log_config=self.log_config, | ||
| name=query_name, | ||
| ports={ | ||
| '%s/tcp' % CLIPPER_INTERNAL_QUERY_PORT: | ||
|
|
@@ -229,7 +264,7 @@ def start_clipper(self, | |
| run_query_frontend_metric_image( | ||
| query_frontend_metric_name, self.docker_client, query_name, | ||
| frontend_exporter_image, self.common_labels, | ||
| self.extra_container_kwargs) | ||
| self.log_config, self.extra_container_kwargs) | ||
|
|
||
| self.prom_config_path = tempfile.NamedTemporaryFile( | ||
| 'w', suffix='.yml', delete=False).name | ||
|
|
@@ -247,7 +282,7 @@ def start_clipper(self, | |
| metric_labels[CLIPPER_METRIC_CONFIG_LABEL] = self.prom_config_path | ||
| run_metric_image(self.docker_client, metric_labels, | ||
| self.prometheus_port, self.prom_config_path, | ||
| self.extra_container_kwargs) | ||
| self.log_config, self.extra_container_kwargs) | ||
|
|
||
| self.connect() | ||
|
|
||
|
|
@@ -278,6 +313,19 @@ def connect(self): | |
| self.prometheus_port = all_labels[CLIPPER_DOCKER_PORT_LABELS['metric']] | ||
| self.prom_config_path = all_labels[CLIPPER_METRIC_CONFIG_LABEL] | ||
|
|
||
| if self._is_valid_logging_state_to_connect(all_labels): | ||
| self.centralize_log= True | ||
| self.logging_system_instance = \ | ||
| self.logging_system( | ||
| self.logger, | ||
| self.cluster_name, | ||
| self.docker_client, | ||
| port=all_labels[CLIPPER_DOCKER_PORT_LABELS['fluentd']], | ||
| conf_path=all_labels[CLIPPER_FLUENTD_CONFIG_LABEL] | ||
| ) | ||
| self.log_config = self.logging_system_instance.get_log_config() | ||
| # Logging-TODO Add a Sqlite support | ||
|
|
||
| def deploy_model(self, name, version, input_type, image, num_replicas=1): | ||
| # Parameters | ||
| # ---------- | ||
|
|
@@ -335,11 +383,13 @@ def _add_replica(self, name, version, input_type, image): | |
|
|
||
| model_container_name = model_container_label + '-{}'.format( | ||
| random.randint(0, 100000)) | ||
|
|
||
| self.docker_client.containers.run( | ||
| image, | ||
| name=model_container_name, | ||
| environment=env_vars, | ||
| labels=labels, | ||
| log_config=self.log_config, | ||
| **self.extra_container_kwargs) | ||
|
|
||
| # Metric Section | ||
|
|
@@ -396,30 +446,10 @@ def set_num_replicas(self, name, version, input_type, image, num_replicas): | |
| self.prometheus_port) | ||
|
|
||
| def get_logs(self, logging_dir): | ||
| containers = self.docker_client.containers.list( | ||
| filters={ | ||
| "label": | ||
| "{key}={val}".format( | ||
| key=CLIPPER_DOCKER_LABEL, val=self.cluster_name) | ||
| }) | ||
| logging_dir = os.path.abspath(os.path.expanduser(logging_dir)) | ||
|
|
||
| log_files = [] | ||
| if not os.path.exists(logging_dir): | ||
| os.makedirs(logging_dir) | ||
| self.logger.info("Created logging directory: %s" % logging_dir) | ||
| for c in containers: | ||
| log_file_name = "image_{image}:container_{id}.log".format( | ||
| image=c.image.short_id, id=c.short_id) | ||
| log_file = os.path.join(logging_dir, log_file_name) | ||
| if sys.version_info < (3, 0): | ||
| with open(log_file, "w") as lf: | ||
| lf.write(c.logs(stdout=True, stderr=True)) | ||
| else: | ||
| with open(log_file, "wb") as lf: | ||
| lf.write(c.logs(stdout=True, stderr=True)) | ||
| log_files.append(log_file) | ||
| return log_files | ||
| if self.centralize_log: | ||
| return self.logging_system_instance.get_logs(logging_dir) | ||
| else: | ||
| return get_logs_from_containers(self, logging_dir) | ||
|
|
||
| def stop_models(self, models): | ||
| containers = self.docker_client.containers.list( | ||
|
|
@@ -459,6 +489,26 @@ def stop_all(self, graceful=True): | |
| else: | ||
| c.kill() | ||
|
|
||
| def _is_valid_logging_state_to_connect(self, all_labels): | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will change the logic of this part. I will make new clipper connection to turn on
|
||
| if self.centralize_log and not self.logging_system.container_is_running(all_labels): | ||
| raise ConnectionError( | ||
| "Invalid state detected. " | ||
| "log centralization is {log_centralization_state}, " | ||
| "but cannot find fluentd instance running. " | ||
| "Please change your use_centralized_log parameter of DockerContainermanager" | ||
| .format(log_centralization_state=self.centralize_log) | ||
| ) | ||
| elif self.logging_system.container_is_running(all_labels) and not self.centralize_log: | ||
| raise ConnectionError( | ||
| "Invalid state detected. " | ||
| "Fluentd instance is running, " | ||
| "but log centralization state is {log_centralization_state}. " | ||
| "Please change your use_centralized_log parameter of DockerContainerManager to True" | ||
| .format(log_centralization_state=self.centralize_log) | ||
| ) | ||
| else: | ||
| return self.logging_system.container_is_running(all_labels) | ||
|
|
||
| def get_admin_addr(self): | ||
| return "{host}:{port}".format( | ||
| host=self.public_hostname, port=self.clipper_management_port) | ||
|
|
@@ -514,3 +564,5 @@ def find_unbound_port(start=None, | |
| start += 1 | ||
| else: | ||
| start = random.randint(*port_range) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| # Clipper Logging with Fluentd | ||
|
|
||
| ## Log Centralization (Beta) | ||
| Clipper uses Fluentd (https://www.fluentd.org/) for centralizing logs from Docker containers within Clipper cluster. | ||
| It is currently a beta version. It only supports centralizing logs into Fluentd instance for now, but we will add various functinoalities | ||
| like monitoring and debugging on the top of it. Please create an issue if you want any functionality. | ||
| Also, please don't hesistate to contribute if you add any features. | ||
|
|
||
| ## How to guide | ||
| Firstly, when you define `DockerContainerManager`, you should set `use_centralized_log` parameter to be `True` | ||
|
|
||
| ```python | ||
| clipper_conn = ClipperConnection(DockerContainerManager(use_centralized_log=True)) | ||
| clipper_conn.start_clipper() | ||
| ``` | ||
|
|
||
| Once you start up the clipper cluster, you can check fluentd Docker container running. | ||
|
|
||
| ```bash | ||
| $docker ps | ||
| CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES | ||
| 170000ec75d7 default-cluster-simple-example:1 "/container/containe…" 11 seconds ago Up 10 seconds (healthy) simple-example_1-71538 | ||
| 5b533ff2fd3a prom/prometheus:v2.1.0 "/bin/prometheus --c…" 13 seconds ago Up 12 seconds 0.0.0.0:9090->9090/tcp metric_frontend-7206 | ||
| b71b557a0001 clipper/frontend-exporter:develop "python /usr/src/app…" 14 seconds ago Up 13 seconds query_frontend_exporter-55488 | ||
| bc8a7cc31754 clipper/query_frontend:develop "/clipper/release/sr…" 15 seconds ago Up 14 seconds 0.0.0.0:1337->1337/tcp, 0.0.0.0:7000->7000/tcp query_frontend-55488 | ||
| d04f33c654fd clipper/management_frontend:develop "/clipper/release/sr…" 15 seconds ago Up 15 seconds 0.0.0.0:1338->1338/tcp mgmt_frontend-60461 | ||
| 30103e84e2a1 redis:alpine "docker-entrypoint.s…" 16 seconds ago Up 15 seconds 0.0.0.0:30356->6379/tcp redis-82152 | ||
| b78c3242c3e7 fluent/fluentd:v1.3-debian-1 "tini -- /bin/entryp…" 17 seconds ago Up 16 seconds 5140/tcp, 0.0.0.0:24224->24224/tcp, 0.0.0.0:24224->24224/udp fluentd-51374 | ||
| ``` | ||
|
|
||
| You can see centralized logs from fluentd container's stdout. Type | ||
|
|
||
| ```bash | ||
| $docker logs <fluentd_container_id> | ||
| ``` | ||
|
|
||
| Currently, it just prints out huge amount of logs centralized. It is because this feature is in the beggining phase. | ||
| We will add persistent storage for logs and query feature in the upcoming version. | ||
|
|
||
| ## How to customize | ||
| Currently, we don't recommend customizing a logging feature or using it for production. It is immature and unstable. Some APIs can be drastically changed. | ||
| If you still want to use it, you can directly modify fluentd conf file. It is mounted in a temp folder which you can easily find through python interactive shell. | ||
|
|
||
| ```python | ||
| >>> # Make sure you already ran clipper_conn.clipper_conn.start_clipper() with DockerContainerManager(use_centralized_log=True). Also, it is the python shell. | ||
| >>> clipper_conn = ClipperConnection(DockerContainerManager(use_centralized_log=True)) | ||
| >>> clipper_conn.connect() | ||
| 19-03-21:10:36:58 INFO [clipper_admin.py:157] [default-cluster] Successfully connected to Clipper cluster at localhost:1337 | ||
| >>> cm = clipper_conn.cm | ||
| >>> cm.logging_system_instance.conf_path | ||
| # It will show you conf file path mounted on your local machine. | ||
| ``` |
Uh oh!
There was an error while loading. Please reload this page.