-
Notifications
You must be signed in to change notification settings - Fork 995
Remove redis connection from top level #3218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| from typing import Iterable, Any, Union | ||
sgoggins marked this conversation as resolved.
Show resolved
Hide resolved
sgoggins marked this conversation as resolved.
Show resolved
Hide resolved
sgoggins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| from collections.abc import MutableSequence | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
| from augur.tasks.init.redis_connection import redis_connection as redis | ||
| from augur.tasks.init.redis_connection import get_redis_connection | ||
| from augur import instance_id | ||
| from redis import exceptions | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
| import numbers | ||
|
|
@@ -17,14 +17,15 @@ def __init__(self, scalar_name: str, default_value: int = 0, override_existing: | |
| self._scalar_name = scalar_name | ||
|
|
||
| self.__value = default_value | ||
| self.redis = get_redis_connection() | ||
|
|
||
| #Check redis to see if key exists in cache | ||
| if 1 != redis.exists(self.redis_scalar_key) or override_existing: | ||
| if 1 != self.redis.exists(self.redis_scalar_key) or override_existing: | ||
| #Set value | ||
| redis.set(self.redis_scalar_key,self.__value) | ||
| self.redis.set(self.redis_scalar_key,self.__value) | ||
| else: | ||
| #else get the value | ||
| self.__value = int(float(redis.get(self.redis_scalar_key))) | ||
| self.__value = int(float(self.redis.get(self.redis_scalar_key))) | ||
|
|
||
| @property | ||
| def value(self): | ||
|
|
@@ -34,4 +35,4 @@ def value(self): | |
| def value(self, otherVal): | ||
| if isinstance(otherVal, numbers.Number): | ||
| self.__value = otherVal | ||
| redis.set(self.redis_scalar_key,self.__value) | ||
| self.redis.set(self.redis_scalar_key,self.__value) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| from augur.tasks.init.redis_connection import redis_connection as conn | ||
| from augur.tasks.init.redis_connection import get_redis_connection | ||
| from redis.client import PubSub | ||
| from logging import Logger | ||
| from os import getpid | ||
|
|
@@ -29,8 +29,10 @@ def __init__(self, platform: str, logger: Logger): | |
| if not platform: | ||
| raise ValueError("Platform must not be empty") | ||
|
|
||
| self.stdout = conn | ||
| self.stdin: PubSub = conn.pubsub(ignore_subscribe_messages = True) | ||
| self.conn = get_redis_connection() | ||
|
|
||
| self.stdout = self.conn | ||
| self.stdin: PubSub = self.conn.pubsub(ignore_subscribe_messages = True) | ||
| self.stdin.subscribe(f"{self.REQUEST}-{self.id}") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
| self.platform = platform | ||
| self.logger = logger | ||
|
|
@@ -150,14 +152,15 @@ class KeyPublisher: | |
| as the process ID is used for async communication between | ||
| the publisher and the orchestrator. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| # Load channel names and IDs from the spec | ||
| for channel in spec["channels"]: | ||
| # IE: self.ANNOUNCE = "augur-oauth-announce" | ||
| setattr(self, channel["name"], channel["id"]) | ||
| self.conn = get_redis_connection() | ||
| self.id = getpid() | ||
| self.stdin: PubSub = conn.pubsub(ignore_subscribe_messages = True) | ||
| self.stdin: PubSub = self.conn.pubsub(ignore_subscribe_messages = True) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be fixed. |
||
| self.stdin.subscribe(f"{self.ANNOUNCE}-{self.id}") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| def publish(self, key: str, platform: str): | ||
|
|
@@ -172,7 +175,7 @@ def publish(self, key: str, platform: str): | |
| "key_platform": platform | ||
| } | ||
|
|
||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| def unpublish(self, key: str, platform: str): | ||
| """ Unpublish a key, and remove it from orchestration | ||
|
|
@@ -189,7 +192,7 @@ def unpublish(self, key: str, platform: str): | |
| "key_platform": platform | ||
| } | ||
|
|
||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| def wait(self, timeout_seconds = 30, republish = False): | ||
| """ Wait for ACK from the orchestrator | ||
|
|
@@ -215,7 +218,7 @@ def wait(self, timeout_seconds = 30, republish = False): | |
| } | ||
|
|
||
| listen_delta = 0.1 | ||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| # Just wait for and consume the next incoming message | ||
| while timeout_seconds >= 0: | ||
|
|
@@ -227,7 +230,7 @@ def wait(self, timeout_seconds = 30, republish = False): | |
| elif timeout_seconds < listen_delta: | ||
| break | ||
| elif republish: | ||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| time.sleep(listen_delta) | ||
| timeout_seconds -= listen_delta | ||
|
|
@@ -245,7 +248,7 @@ def list_platforms(self): | |
| "requester_id": self.id | ||
| } | ||
|
|
||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| reply = next(self.stdin.listen()) | ||
|
|
||
|
|
@@ -272,7 +275,7 @@ def list_keys(self, platform): | |
| "key_platform": platform | ||
| } | ||
|
|
||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| reply = next(self.stdin.listen()) | ||
|
|
||
|
|
@@ -302,7 +305,7 @@ def list_invalid_keys(self, platform): | |
| "key_platform": platform | ||
| } | ||
|
|
||
| conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps(message)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶 |
||
|
|
||
| reply = next(self.stdin.listen()) | ||
|
|
||
|
|
@@ -325,4 +328,4 @@ def shutdown(self): | |
| were sent prior to this message, and will then shut down | ||
| immediately upon processing of the shutdown command | ||
| """ | ||
| conn.publish(self.ANNOUNCE, json.dumps({"type": "SHUTDOWN"})) | ||
| self.conn.publish(self.ANNOUNCE, json.dumps({"type": "SHUTDOWN"})) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [pylint] reported by reviewdog 🐶
sgoggins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Uh oh!
There was an error while loading. Please reload this page.