Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions stubs/txredisapi.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,28 @@ class RedisFactory(protocol.ReconnectingClientFactory):

class SubscriberFactory(RedisFactory):
def __init__(self) -> None: ...

class Sentinel:
def __init__(
self,
sentinels: list[tuple[str, int]],
min_other_sentinels: int = ...,
sentinel_kwargs: dict[str, Any] | None = ...,
): ...
def master_for(
self,
service_name: str,
dbid: int | None = ...,
password: str | None = ...,
poolsize: int = ...,
**kwargs: Any,
) -> ConnectionHandler: ...
def slave_for(
self,
service_name: str,
dbid: int | None = ...,
password: str | None = ...,
poolsize: int = ...,
**kwargs: Any,
) -> ConnectionHandler: ...
def disconnect(self) -> "Deferred[None]": ...
39 changes: 39 additions & 0 deletions synapse/config/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
These are mutually incompatible.
"""

VALID_REDIS_TYPES = {"standalone", "sentinel", "cluster"}


class RedisConfig(Config):
section = "redis"
Expand Down Expand Up @@ -67,6 +69,43 @@ def read_config(
),
).strip()

self.redis_type = redis_config.get("type", "standalone")
if self.redis_type not in VALID_REDIS_TYPES:
raise ConfigError(
"Invalid value for redis.type: expected one of "
"'standalone', 'sentinel', or 'cluster'",
("redis", "type"),
)

self.redis_sentinel_master = redis_config.get("sentinel_master")
self.redis_sentinel_hosts = redis_config.get("sentinel_hosts", [])
self.redis_sentinel_password = redis_config.get("sentinel_password")
if self.redis_sentinel_password and not allow_secrets_in_config:
raise ConfigError(
"Config options that expect an in-line secret as value are disabled",
("redis", "sentinel_password"),
)

# Validate sentinel configuration
if self.redis_type == "sentinel":
if not self.redis_sentinel_master:
raise ConfigError(
"redis.sentinel_master is required when redis.type is 'sentinel'",
("redis", "sentinel_master"),
)
if not self.redis_sentinel_hosts:
raise ConfigError(
"redis.sentinel_hosts is required when redis.type is 'sentinel'",
("redis", "sentinel_hosts"),
)
# Validate sentinel_hosts format - should be list of tuples or strings
for idx, host_entry in enumerate(self.redis_sentinel_hosts):
if not isinstance(host_entry, (list, tuple)) or len(host_entry) != 2:
raise ConfigError(
f"Each entry in redis.sentinel_hosts must be a list/tuple of [host, port], got: {host_entry}",
("redis", "sentinel_hosts", idx),
)

self.redis_use_tls = redis_config.get("use_tls", False)
self.redis_certificate = redis_config.get("certificate_file", None)
self.redis_private_key = redis_config.get("private_key_file", None)
Expand Down
48 changes: 48 additions & 0 deletions synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,51 @@ def lazyUnixConnection(
)

return factory.handler


def lazySentinelConnection(
hs: "HomeServer",
sentinel_hosts: list[tuple[str, int]],
master_name: str,
dbid: int | None = None,
password: str | None = None,
sentinel_password: str | None = None,
replyTimeout: int = 30,
) -> ConnectionHandler:
"""Creates a connection to Redis via Sentinel that is lazily set up and
automatically handles failover.

Args:
hs: The HomeServer instance
sentinel_hosts: List of (host, port) tuples for sentinel servers
master_name: The name of the master service in sentinel
dbid: The redis database ID to use
password: Password for the redis master/slave
sentinel_password: Password for the sentinel servers (if different from redis password)
replyTimeout: Timeout for redis operations

Returns:
A ConnectionHandler that will automatically connect to the current master
"""
from txredisapi import Sentinel

# Build sentinel kwargs if sentinel has its own password
sentinel_kwargs = {}
if sentinel_password:
sentinel_kwargs["password"] = sentinel_password

# Create the sentinel instance
sentinel = Sentinel(
sentinel_addresses=sentinel_hosts,
**sentinel_kwargs,
)

# Get a connection handler to the master
connection = sentinel.master_for(
service_name=master_name,
dbid=dbid,
password=password,
poolsize=1,
)

return connection
43 changes: 33 additions & 10 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,24 +1201,31 @@ def get_outbound_redis_connection(self) -> "ConnectionHandler":

# We only want to import redis module if we're using it, as we have
# `txredisapi` as an optional dependency.
from synapse.replication.tcp.redis import lazyConnection, lazyUnixConnection
from synapse.replication.tcp.redis import (
lazyConnection,
lazySentinelConnection,
lazyUnixConnection,
)

if self.config.redis.redis_path is None:
# Handle sentinel mode
if self.config.redis.redis_type == "sentinel":
logger.info(
"Connecting to redis (host=%r port=%r) for external cache",
self.config.redis.redis_host,
self.config.redis.redis_port,
"Connecting to redis via sentinel (master=%r sentinels=%r) for external cache",
self.config.redis.redis_sentinel_master,
self.config.redis.redis_sentinel_hosts,
)

return lazyConnection(
return lazySentinelConnection(
hs=self,
host=self.config.redis.redis_host,
port=self.config.redis.redis_port,
sentinel_hosts=self.config.redis.redis_sentinel_hosts,
master_name=self.config.redis.redis_sentinel_master,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
sentinel_password=self.config.redis.redis_sentinel_password,
)
else:

# Handle Unix socket connection
if self.config.redis.redis_path is not None:
logger.info(
"Connecting to redis (path=%r) for external cache",
self.config.redis.redis_path,
Expand All @@ -1232,6 +1239,22 @@ def get_outbound_redis_connection(self) -> "ConnectionHandler":
reconnect=True,
)

# Handle standalone mode (TCP connection)
logger.info(
"Connecting to redis (host=%r port=%r) for external cache",
self.config.redis.redis_host,
self.config.redis.redis_port,
)

return lazyConnection(
hs=self,
host=self.config.redis.redis_host,
port=self.config.redis.redis_port,
dbid=self.config.redis.redis_dbid,
password=self.config.redis.redis_password,
reconnect=True,
)

def should_send_federation(self) -> bool:
"Should this server be sending federation traffic directly?"
return self.config.worker.send_federation
Expand Down