Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19556.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement experimental support for [MSC4429: Profile Updates for Legacy Sync](https://github.com/matrix-org/matrix-spec-proposals/pull/4429).
3 changes: 2 additions & 1 deletion docker/complement/conf/start_for_complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
federation_inbound, \
federation_reader, \
federation_sender, \
profile_updates, \
synchrotron, \
client_reader, \
appservice, \
pusher, \
device_lists:2, \
stream_writers=account_data+presence+receipts+to_device+typing"
stream_writers=account_data+presence+profile_updates+receipts+to_device+typing"

fi
log "Workers requested: $SYNAPSE_WORKER_TYPES"
Expand Down
2 changes: 2 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ experimental_features:
msc4354_enabled: true
# `/sync` `state_after`
msc4222_enabled: true
# Profile updates down legacy /sync
msc4429_enabled: true

server_notices:
system_mxid_localpart: _server
Expand Down
15 changes: 14 additions & 1 deletion docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
# The [^/] differentiates this endpoint from
# `ProfileRestFieldsServlet`, which we want to instead go to the
# `profile_updates` worker below.
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/[^/]+",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
Expand Down Expand Up @@ -308,6 +311,15 @@
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"profile_updates": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
"endpoint_patterns": [
"^/_matrix/client/(unstable/uk.tcpip.msc4133|api/v1|r0|v3|unstable)/profile/.+/"
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"device_lists": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"],
Expand Down Expand Up @@ -517,6 +529,7 @@ def add_worker_roles_to_shared_config(
"typing",
"push_rules",
"thread_subscriptions",
"profile_updates",
}

# Worker-type specific sharding config. Now a single worker can fulfill multiple
Expand Down
28 changes: 28 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,34 @@ each upgrade are complete before moving on to the next upgrade, to avoid
stacking them up. You can monitor the currently running background updates with
[the Admin API](usage/administration/admin_api/background_updates.html#status).

# Upgrading to v1.151.0

## Profile Updates Stream Writer Workers

This version of Synapse adds a new `profile_updates` writer stream. The
following endpoints may now only be handled by either the main process, or a
worker that is designed a "profile_updates" writer. If you are already routing
the following endpoints to a worker:

```
/_matrix/client/(api/v1|r0|v3)/profile/<user_id>/(<field_name>?)
/_matrix/client/unstable/uk.tcpip.msc4133/profile/<user_id>/(<field_name>?)
```

those worker(s) need to be marked as a stream writer for the `profile_updates`
stream in the shared config, using the
[`stream_writers`](https://element-hq.github.io/synapse/v1.151/usage/configuration/config_documentation.html#stream_writers)
config option:

```yaml
stream_writers:
profile_updates: worker1
```

as well as included in the
[`instance_map`](https://element-hq.github.io/synapse/v1.151/usage/configuration/config_documentation.html#instance_map)
config option.

# Upgrading to v1.150.0

## Removal of the `systemd` pip extra
Expand Down
2 changes: 2 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -4475,6 +4475,8 @@ This setting has the following sub-options:

* `device_lists` (string): Name of a worker assigned to the `device_lists` stream.

* `profile_updates` (string): Name of a worker assigned to the `profile_updates` stream.

Example configuration:
```yaml
stream_writers:
Expand Down
3 changes: 3 additions & 0 deletions schema/synapse-config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5522,6 +5522,9 @@ properties:
device_lists:
type: string
description: Name of a worker assigned to the `device_lists` stream.
profile_updates:
type: string
description: Name of a worker assigned to the `profile_updates` stream.
default: {}
examples:
- events: worker1
Expand Down
25 changes: 25 additions & 0 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,24 @@
"filter": FILTER_SCHEMA,
"room_filter": ROOM_FILTER_SCHEMA,
"room_event_filter": ROOM_EVENT_FILTER_SCHEMA,
"profile_fields_filter": {
"type": "object",
"properties": {
"ids": {"type": "array", "items": {"type": "string"}},
},
"additionalProperties": True,
},
},
"properties": {
"presence": {"$ref": "#/definitions/filter"},
"account_data": {"$ref": "#/definitions/filter"},
"room": {"$ref": "#/definitions/room_filter"},
"event_format": {"type": "string", "enum": ["client", "federation"]},
"event_fields": {"type": "array", "items": {"type": "string"}},
"profile_fields": {"$ref": "#/definitions/profile_fields_filter"},
"org.matrix.msc4429.profile_fields": {
"$ref": "#/definitions/profile_fields_filter"
},
},
"additionalProperties": True, # Allow new fields for forward compatibility
}
Expand Down Expand Up @@ -217,6 +228,20 @@ def __init__(self, hs: "HomeServer", filter_json: JsonMapping):
self.event_fields = filter_json.get("event_fields", [])
self.event_format = filter_json.get("event_format", "client")

self.profile_fields: list[str] = []
if hs.config.experimental.msc4429_enabled:
profile_fields_filter = filter_json.get("profile_fields")
if profile_fields_filter is None:
profile_fields_filter = filter_json.get(
"org.matrix.msc4429.profile_fields"
)

if isinstance(profile_fields_filter, Mapping):
ids = profile_fields_filter.get("ids", [])
if ids is None:
ids = []
self.profile_fields = list(ids)

def __repr__(self) -> str:
return "<FilterCollection %s>" % (json.dumps(self._filter_json),)

Expand Down
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,9 @@ def read_config(
# MSC4133: Custom profile fields
self.msc4133_enabled: bool = experimental.get("msc4133_enabled", False)

# MSC4429: Profile updates for legacy /sync
self.msc4429_enabled: bool = experimental.get("msc4429_enabled", False)

# MSC4143: Matrix RTC Transport using Livekit Backend
self.msc4143_enabled: bool = experimental.get("msc4143_enabled", False)

Expand Down
25 changes: 22 additions & 3 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ class WriterLocations:
push_rules: The instances that write to the push stream. Currently
can only be a single instance.
device_lists: The instances that write to the device list stream.
thread_subscriptions: The instances that write to the thread subscriptions
stream.
profile_updates: The instances that write to the profile updates stream.
"""

events: list[str] = attr.ib(
Expand Down Expand Up @@ -177,7 +180,11 @@ class WriterLocations:
converter=_instance_to_list_converter,
)
thread_subscriptions: list[str] = attr.ib(
default=["master"],
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)
profile_updates: list[str] = attr.ib(
default=[MAIN_PROCESS_INSTANCE_NAME],
converter=_instance_to_list_converter,
)

Expand Down Expand Up @@ -355,8 +362,7 @@ def read_config(
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)

# Check that the configured writers for events and typing also appears in
# `instance_map`.
# Check that the configured writers also appear in `instance_map`.
for stream in (
"events",
"typing",
Expand All @@ -365,6 +371,9 @@ def read_config(
"receipts",
"presence",
"push_rules",
"device_lists",
"thread_subscriptions",
"profile_updates",
):
instances = _instance_to_list_converter(getattr(self.writers, stream))
for instance in instances:
Expand Down Expand Up @@ -415,6 +424,16 @@ def read_config(
"Must specify at least one instance to handle `device_lists` messages."
)

if len(self.writers.thread_subscriptions) == 0:
raise ConfigError(
"Must specify at least one instance to handle `thread_subscriptions` messages."
)

if len(self.writers.profile_updates) == 0:
raise ConfigError(
"Must specify at least one instance to handle `profile_updates` messages."
)

self.events_shard_config = RoutableShardedWorkerHandlingConfig(
self.writers.events
)
Expand Down
45 changes: 45 additions & 0 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
JsonValue,
Requester,
ScheduledTask,
StreamKeyType,
TaskStatus,
UserID,
create_requester,
Expand Down Expand Up @@ -75,6 +76,8 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock() # nb must be called this for @cached
self.store = hs.get_datastores().main
self.hs = hs
self._notifier = hs.get_notifier()
self._msc4429_enabled = hs.config.experimental.msc4429_enabled

self.federation = hs.get_federation_client()
hs.get_federation_registry().register_query_handler(
Expand All @@ -99,6 +102,24 @@ def __init__(self, hs: "HomeServer"):
)
self._worker_locks = hs.get_worker_locks_handler()

async def _notify_profile_update(self, user_id: UserID, stream_id: int) -> None:
room_ids = await self.store.get_rooms_for_user(user_id.to_string())
if not room_ids:
return

self._notifier.on_new_event(
StreamKeyType.PROFILE_UPDATES, stream_id, rooms=room_ids
)

async def _record_profile_updates(
self, user_id: UserID, updates: list[tuple[str, JsonValue | None]]
) -> None:
if not self._msc4429_enabled or not updates:
return

stream_id = await self.store.add_profile_updates(user_id, updates)
await self._notify_profile_update(user_id, stream_id)

async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
"""
Get a user's profile as a JSON dictionary.
Expand Down Expand Up @@ -253,6 +274,9 @@ async def set_displayname(
)

await self.store.set_profile_displayname(target_user, displayname_to_set)
await self._record_profile_updates(
target_user, [(ProfileFields.DISPLAYNAME, displayname_to_set)]
)

profile = await self.store.get_profileinfo(target_user)

Expand Down Expand Up @@ -362,6 +386,9 @@ async def set_avatar_url(
)

await self.store.set_profile_avatar_url(target_user, avatar_url_to_set)
await self._record_profile_updates(
target_user, [(ProfileFields.AVATAR_URL, avatar_url_to_set)]
)

profile = await self.store.get_profileinfo(target_user)

Expand Down Expand Up @@ -406,6 +433,8 @@ async def delete_profile_upon_deactivation(
# have it.
raise AuthError(400, "Cannot remove another user's profile")

profile_updates: list[tuple[str, JsonValue | None]] = []
current_profile: ProfileInfo | None = None
if not by_admin:
current_profile = await self.store.get_profileinfo(target_user)
if not self.hs.config.registration.enable_set_displayname:
Expand All @@ -428,7 +457,21 @@ async def delete_profile_upon_deactivation(
Codes.FORBIDDEN,
)

if self._msc4429_enabled:
if current_profile is None:
current_profile = await self.store.get_profileinfo(target_user)

if current_profile.display_name is not None:
profile_updates.append((ProfileFields.DISPLAYNAME, None))
if current_profile.avatar_url is not None:
profile_updates.append((ProfileFields.AVATAR_URL, None))

custom_fields = await self.store.get_profile_fields(target_user)
for field_name in custom_fields.keys():
profile_updates.append((field_name, None))

await self.store.delete_profile(target_user)
await self._record_profile_updates(target_user, profile_updates)

await self._third_party_rules.on_profile_update(
target_user.to_string(),
Expand Down Expand Up @@ -582,6 +625,7 @@ async def set_profile_field(
raise AuthError(403, "Cannot set another user's profile")

await self.store.set_profile_field(target_user, field_name, new_value)
await self._record_profile_updates(target_user, [(field_name, new_value)])

# Custom fields do not propagate into the user directory *or* rooms.
profile = await self.store.get_profileinfo(target_user)
Expand Down Expand Up @@ -617,6 +661,7 @@ async def delete_profile_field(
raise AuthError(400, "Cannot set another user's profile")

await self.store.delete_profile_field(target_user, field_name)
await self._record_profile_updates(target_user, [(field_name, None)])

# Custom fields do not propagate into the user directory *or* rooms.
profile = await self.store.get_profileinfo(target_user)
Expand Down
Loading
Loading