-
Notifications
You must be signed in to change notification settings - Fork 65
feat: Add real-time feature flags support via SSE #389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
09b9e46
1fb9990
6030a57
c6d2b20
e0b040a
2cadaa8
f7e693a
7428cae
ded7305
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -192,6 +192,8 @@ def __init__( | |||||||||||||||||||||||||||||
| capture_exception_code_variables=False, | ||||||||||||||||||||||||||||||
| code_variables_mask_patterns=None, | ||||||||||||||||||||||||||||||
| code_variables_ignore_patterns=None, | ||||||||||||||||||||||||||||||
| realtime_flags=False, | ||||||||||||||||||||||||||||||
| on_feature_flags_update=None, | ||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| Initialize a new PostHog client instance. | ||||||||||||||||||||||||||||||
|
|
@@ -248,6 +250,10 @@ def __init__( | |||||||||||||||||||||||||||||
| self.exception_capture = None | ||||||||||||||||||||||||||||||
| self.privacy_mode = privacy_mode | ||||||||||||||||||||||||||||||
| self.enable_local_evaluation = enable_local_evaluation | ||||||||||||||||||||||||||||||
| self.realtime_flags = realtime_flags | ||||||||||||||||||||||||||||||
| self.on_feature_flags_update = on_feature_flags_update | ||||||||||||||||||||||||||||||
| self.sse_connection = None # type: Optional[Any] | ||||||||||||||||||||||||||||||
| self.sse_connected = False | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self.capture_exception_code_variables = capture_exception_code_variables | ||||||||||||||||||||||||||||||
| self.code_variables_mask_patterns = ( | ||||||||||||||||||||||||||||||
|
|
@@ -1190,6 +1196,10 @@ def join(self): | |||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||
| self.log.error(f"[FEATURE FLAGS] Cache provider shutdown error: {e}") | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Close SSE connection | ||||||||||||||||||||||||||||||
| if self.sse_connection: | ||||||||||||||||||||||||||||||
| self._close_sse_connection() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def shutdown(self): | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| Flush all messages and cleanly shutdown the client. Call this before the process ends in serverless environments to avoid data loss. | ||||||||||||||||||||||||||||||
|
|
@@ -1315,6 +1325,10 @@ def _fetch_feature_flags_from_api(self): | |||||||||||||||||||||||||||||
| self.log.error(f"[FEATURE FLAGS] Cache provider store error: {e}") | ||||||||||||||||||||||||||||||
| # Flags are already in memory, so continue normally | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Setup SSE connection if realtime_flags is enabled | ||||||||||||||||||||||||||||||
| if self.realtime_flags and not self.sse_connected: | ||||||||||||||||||||||||||||||
| self._setup_sse_connection() | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| except APIError as e: | ||||||||||||||||||||||||||||||
| if e.status == 401: | ||||||||||||||||||||||||||||||
| self.log.error( | ||||||||||||||||||||||||||||||
|
|
@@ -2220,6 +2234,180 @@ def _add_local_person_and_group_properties( | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| return all_person_properties, all_group_properties | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def _setup_sse_connection(self): | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| Establish a real-time connection using Server-Sent Events to receive feature flag updates. | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| if not self.personal_api_key: | ||||||||||||||||||||||||||||||
| self.log.warning( | ||||||||||||||||||||||||||||||
| "[FEATURE FLAGS] Cannot establish real-time connection without personal_api_key" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if self.sse_connected: | ||||||||||||||||||||||||||||||
| self.log.debug("[FEATURE FLAGS] SSE connection already established") | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| import threading | ||||||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Use requests with stream=True for SSE | ||||||||||||||||||||||||||||||
| import requests | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| url = f"{self.host}/flags/definitions/stream?api_key={self.api_key}" | ||||||||||||||||||||||||||||||
| headers = { | ||||||||||||||||||||||||||||||
| "Authorization": f"Bearer {self.personal_api_key}", | ||||||||||||||||||||||||||||||
| "Accept": "text/event-stream", | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| def sse_listener(): | ||||||||||||||||||||||||||||||
| """Background thread to listen for SSE messages""" | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| with requests.get( | ||||||||||||||||||||||||||||||
| url, headers=headers, stream=True, timeout=None | ||||||||||||||||||||||||||||||
| ) as response: | ||||||||||||||||||||||||||||||
| if response.status_code != 200: | ||||||||||||||||||||||||||||||
| self.log.warning( | ||||||||||||||||||||||||||||||
| f"[FEATURE FLAGS] SSE connection failed with status {response.status_code}" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| self.sse_connected = False | ||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| self.sse_connected = True | ||||||||||||||||||||||||||||||
|
gustavohstrassburger marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||
| self.log.debug("[FEATURE FLAGS] SSE connection established") | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Process the stream line by line | ||||||||||||||||||||||||||||||
| for line in response.iter_lines(): | ||||||||||||||||||||||||||||||
| if not line: | ||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| line = line.decode("utf-8") | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # SSE format: "data: {...}" | ||||||||||||||||||||||||||||||
| if line.startswith("data: "): | ||||||||||||||||||||||||||||||
| data_str = line[6:] # Remove "data: " prefix | ||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||
| flag_data = json.loads(data_str) | ||||||||||||||||||||||||||||||
| self._process_flag_update(flag_data) | ||||||||||||||||||||||||||||||
| except json.JSONDecodeError as e: | ||||||||||||||||||||||||||||||
| self.log.warning( | ||||||||||||||||||||||||||||||
| f"[FEATURE FLAGS] Failed to parse SSE message: {e}" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||
| self.log.warning( | ||||||||||||||||||||||||||||||
| f"[FEATURE FLAGS] SSE connection error: {e}. Reconnecting in 5 seconds..." | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
| self.sse_connected = False | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Attempt to reconnect after 5 seconds if realtime_flags is still enabled | ||||||||||||||||||||||||||||||
| if self.realtime_flags: | ||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| time.sleep(5) | ||||||||||||||||||||||||||||||
| self._setup_sse_connection() | ||||||||||||||||||||||||||||||
|
gustavohstrassburger marked this conversation as resolved.
Comment on lines
+2327
to
+2331
|
||||||||||||||||||||||||||||||
| if self.realtime_flags: | |
| import time | |
| time.sleep(5) | |
| self._setup_sse_connection() | |
| import time | |
| time.sleep(5) | |
| with self._sse_lock: | |
| if self.realtime_flags and not self.sse_connected: | |
| self._setup_sse_connection() |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an ImportError or other exception occurs during SSE setup (lines 2345-2350), the sse_connected flag is never reset to False. The flag was set to True at line 2266 before the connection attempt, but if the thread creation or initialization fails with an exception, the flag remains True even though no connection was established.
This prevents future reconnection attempts because _setup_sse_connection checks sse_connected at line 2263 and returns early if it's already True. Consider resetting sse_connected to False in the exception handlers at lines 2345-2350.
| self.log.warning( | |
| "[FEATURE FLAGS] requests library required for real-time flags" | |
| ) | |
| except Exception as e: | |
| self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}") | |
| with self._sse_lock: | |
| self.sse_connected = False | |
| self.log.warning( | |
| "[FEATURE FLAGS] requests library required for real-time flags" | |
| ) | |
| except Exception as e: | |
| with self._sse_lock: | |
| self.sse_connected = False | |
| self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the SSE connection returns a non-200 status code, the response object is not properly closed before returning. The response is assigned at line 2286 but only the
sse_connectedflag is updated before the early return at line 2294. This could lead to a resource leak as the HTTP connection remains open.Consider closing the response before returning, or ensure it's closed in the finally block which currently only checks
self.sse_responseafter it's been set.