Skip to content
Closed
188 changes: 188 additions & 0 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ def __init__(
capture_exception_code_variables=False,
code_variables_mask_patterns=None,
code_variables_ignore_patterns=None,
realtime_flags=False,
on_feature_flags_update=None,
):
"""
Initialize a new PostHog client instance.
Expand Down Expand Up @@ -248,6 +250,10 @@ def __init__(
self.exception_capture = None
self.privacy_mode = privacy_mode
self.enable_local_evaluation = enable_local_evaluation
self.realtime_flags = realtime_flags
self.on_feature_flags_update = on_feature_flags_update
self.sse_connection = None # type: Optional[Any]
self.sse_connected = False

self.capture_exception_code_variables = capture_exception_code_variables
self.code_variables_mask_patterns = (
Expand Down Expand Up @@ -1190,6 +1196,10 @@ def join(self):
except Exception as e:
self.log.error(f"[FEATURE FLAGS] Cache provider shutdown error: {e}")

# Close SSE connection
if self.sse_connection:
self._close_sse_connection()

def shutdown(self):
"""
Flush all messages and cleanly shutdown the client. Call this before the process ends in serverless environments to avoid data loss.
Expand Down Expand Up @@ -1315,6 +1325,10 @@ def _fetch_feature_flags_from_api(self):
self.log.error(f"[FEATURE FLAGS] Cache provider store error: {e}")
# Flags are already in memory, so continue normally

# Setup SSE connection if realtime_flags is enabled
if self.realtime_flags and not self.sse_connected:
self._setup_sse_connection()

except APIError as e:
if e.status == 401:
self.log.error(
Expand Down Expand Up @@ -2220,6 +2234,180 @@ def _add_local_person_and_group_properties(

return all_person_properties, all_group_properties

def _setup_sse_connection(self):
"""
Establish a real-time connection using Server-Sent Events to receive feature flag updates.
"""
if not self.personal_api_key:
self.log.warning(
"[FEATURE FLAGS] Cannot establish real-time connection without personal_api_key"
)
return

if self.sse_connected:
self.log.debug("[FEATURE FLAGS] SSE connection already established")
return

try:
import threading
import json

# Use requests with stream=True for SSE
import requests

url = f"{self.host}/flags/definitions/stream?api_key={self.api_key}"
headers = {
"Authorization": f"Bearer {self.personal_api_key}",
"Accept": "text/event-stream",
}

def sse_listener():
"""Background thread to listen for SSE messages"""
try:
with requests.get(
url, headers=headers, stream=True, timeout=None
) as response:
if response.status_code != 200:
self.log.warning(
f"[FEATURE FLAGS] SSE connection failed with status {response.status_code}"
)
self.sse_connected = False
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the SSE connection returns a non-200 status code, the response object is not properly closed before returning. The response is assigned at line 2286 but only the sse_connected flag is updated before the early return at line 2294. This could lead to a resource leak as the HTTP connection remains open.

Consider closing the response before returning, or ensure it's closed in the finally block which currently only checks self.sse_response after it's been set.

Suggested change
self.sse_connected = False
self.sse_connected = False
response.close()

Copilot uses AI. Check for mistakes.
return

self.sse_connected = True
Comment thread
gustavohstrassburger marked this conversation as resolved.
Outdated
self.log.debug("[FEATURE FLAGS] SSE connection established")

# Process the stream line by line
for line in response.iter_lines():
if not line:
continue

line = line.decode("utf-8")

# SSE format: "data: {...}"
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
try:
flag_data = json.loads(data_str)
self._process_flag_update(flag_data)
except json.JSONDecodeError as e:
self.log.warning(
f"[FEATURE FLAGS] Failed to parse SSE message: {e}"
)
except Exception as e:
self.log.warning(
f"[FEATURE FLAGS] SSE connection error: {e}. Reconnecting in 5 seconds..."
)
self.sse_connected = False

# Attempt to reconnect after 5 seconds if realtime_flags is still enabled
if self.realtime_flags:
import time

time.sleep(5)
self._setup_sse_connection()
Comment thread
gustavohstrassburger marked this conversation as resolved.
Comment on lines +2327 to +2331
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reconnection logic checks self.realtime_flags without holding any lock. This creates a race condition: if _close_sse_connection is called from another thread between the check at line 2327 and the call to _setup_sse_connection at line 2331, the reconnection could proceed even though shutdown was requested.

This could cause the SSE connection to be re-established after shutdown has been initiated, leading to resource leaks or unexpected behavior.

Consider checking self.realtime_flags with proper synchronization, or checking sse_connected with the _sse_lock to determine if reconnection should proceed.

Suggested change
if self.realtime_flags:
import time
time.sleep(5)
self._setup_sse_connection()
import time
time.sleep(5)
with self._sse_lock:
if self.realtime_flags and not self.sse_connected:
self._setup_sse_connection()

Copilot uses AI. Check for mistakes.

# Start the SSE listener in a daemon thread
sse_thread = threading.Thread(target=sse_listener, daemon=True)
sse_thread.start()
self.sse_connection = sse_thread
Comment thread
gustavohstrassburger marked this conversation as resolved.
Outdated

except ImportError:
self.log.warning(
"[FEATURE FLAGS] requests library required for real-time flags"
)
except Exception as e:
self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}")

Comment on lines +2346 to +2351
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an ImportError or other exception occurs during SSE setup (lines 2345-2350), the sse_connected flag is never reset to False. The flag was set to True at line 2266 before the connection attempt, but if the thread creation or initialization fails with an exception, the flag remains True even though no connection was established.

This prevents future reconnection attempts because _setup_sse_connection checks sse_connected at line 2263 and returns early if it's already True. Consider resetting sse_connected to False in the exception handlers at lines 2345-2350.

Suggested change
self.log.warning(
"[FEATURE FLAGS] requests library required for real-time flags"
)
except Exception as e:
self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}")
with self._sse_lock:
self.sse_connected = False
self.log.warning(
"[FEATURE FLAGS] requests library required for real-time flags"
)
except Exception as e:
with self._sse_lock:
self.sse_connected = False
self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}")

Copilot uses AI. Check for mistakes.
def _close_sse_connection(self):
"""
Close the active SSE connection.
"""
if self.sse_connection:
self.log.debug("[FEATURE FLAGS] Closing SSE connection")
# Note: We can't directly stop the thread, but setting sse_connected to False
# will prevent reconnection attempts
self.sse_connected = False
self.sse_connection = None

def _process_flag_update(self, flag_data):
"""
Process incoming flag updates from SSE messages.

Args:
flag_data: The flag data from the SSE message
"""
try:
flag_key = flag_data.get("key")
if not flag_key:
self.log.warning("[FEATURE FLAGS] Received flag update without key")
return

is_deleted = flag_data.get("deleted", False)

# Handle flag deletion
if is_deleted:
self.log.debug(f"[FEATURE FLAGS] Deleting flag: {flag_key}")
if self.feature_flags_by_key and flag_key in self.feature_flags_by_key:
del self.feature_flags_by_key[flag_key]

# Also remove from the array
if self.feature_flags:
self.feature_flags = [
f for f in self.feature_flags if f.get("key") != flag_key
]
Comment thread
gustavohstrassburger marked this conversation as resolved.
Outdated

# Invalidate cache for this flag
if self.flag_cache:
old_version = self.flag_definition_version
self.flag_definition_version += 1
self.flag_cache.invalidate_version(old_version)

else:
# Update or add flag
self.log.debug(f"[FEATURE FLAGS] Updating flag: {flag_key}")

if self.feature_flags_by_key is None:
self.feature_flags_by_key = {}

if self.feature_flags is None:
self.feature_flags = []

# Update the lookup table
self.feature_flags_by_key[flag_key] = flag_data

# Update or add to the array
flag_exists = False
for i, f in enumerate(self.feature_flags):
if f.get("key") == flag_key:
self.feature_flags[i] = flag_data
flag_exists = True
break

if not flag_exists:
self.feature_flags.append(flag_data)

# Invalidate cache when flag definitions change
if self.flag_cache:
old_version = self.flag_definition_version
self.flag_definition_version += 1
self.flag_cache.invalidate_version(old_version)

# Call the user's callback if provided
if self.on_feature_flags_update:
try:
self.on_feature_flags_update(
flag_key=flag_key,
flag_data=flag_data,
)
except Exception as e:
self.log.exception(
f"[FEATURE FLAGS] Error in on_feature_flags_update callback: {e}"
)

except Exception as e:
self.log.exception(f"[FEATURE FLAGS] Error processing flag update: {e}")


def stringify_id(val):
if val is None:
Expand Down
Loading
Loading