Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19581.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove `redacted_because` from internal unsigned.
6 changes: 6 additions & 0 deletions rust/src/events/internal_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ pub struct EventInternalMetadata {
#[pyo3(get, set)]
instance_name: Option<String>,

/// The event ID of the redaction event, if this event has been redacted.
/// This is set dynamically at load time and is not persisted to the database.
#[pyo3(get, set)]
redacted_by: Option<String>,

/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
#[pyo3(get, set)]
Expand Down Expand Up @@ -289,6 +294,7 @@ impl EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
redacted_by: None,
outlier: false,
})
}
Expand Down
2 changes: 1 addition & 1 deletion synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
"presence_stream": ["currently_active"],
"public_room_list_stream": ["visibility"],
"pushers": ["enabled"],
"redactions": ["have_censored"],
"redactions": ["have_censored", "recheck"],
"remote_media_cache": ["authenticated"],
"room_memberships": ["participant"],
"room_stats_state": ["is_federatable"],
Expand Down
45 changes: 18 additions & 27 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
from prometheus_client import Counter
from typing_extensions import ParamSpec, TypeGuard

from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.constants import ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.appservice import (
ApplicationService,
TransactionOneTimeKeysCount,
TransactionUnusedFallbackKeys,
)
from synapse.events import EventBase
from synapse.events.utils import SerializeEventConfig, serialize_event
from synapse.events.utils import SerializeEventConfig
from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
from synapse.logging import opentracing
from synapse.metrics import SERVER_NAME_LABEL
Expand Down Expand Up @@ -128,6 +128,7 @@ def __init__(self, hs: "HomeServer"):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.config = hs.config.appservice
self._event_serializer = hs.get_event_client_serializer()

self.protocol_meta_cache: ResponseCache[tuple[str, str]] = ResponseCache(
clock=hs.get_clock(),
Expand Down Expand Up @@ -343,7 +344,7 @@ async def push_bulk(
# This is required by the configuration.
assert service.hs_token is not None

serialized_events = self._serialize(service, events)
serialized_events = await self._serialize(service, events)

if txn_id is None:
logger.warning(
Expand Down Expand Up @@ -539,30 +540,20 @@ async def query_keys(

return response

def _serialize(
async def _serialize(
self, service: "ApplicationService", events: Iterable[EventBase]
) -> list[JsonDict]:
time_now = self.clock.time_msec()
return [
serialize_event(
e,
time_now,
config=SerializeEventConfig(
as_client_event=True,
# If this is an invite or a knock membership event, and we're interested
# in this user, then include any stripped state alongside the event.
include_stripped_room_state=(
e.type == EventTypes.Member
and (
e.membership == Membership.INVITE
or e.membership == Membership.KNOCK
)
and service.is_interested_in_user(e.state_key)
),
# Appservices are considered 'trusted' by the admin and should have
# applicable metadata on their events.
include_admin_metadata=True,
),
)
for e in events
]
return await self._event_serializer.serialize_events(
list(events),
time_now,
config=SerializeEventConfig(
as_client_event=True,
# If this is an invite or a knock membership event, and we're interested
# in this user, then include any stripped state alongside the event.
include_stripped_room_state=True,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for membership is superfluous as we only include stripped state for invites and knocks anyway, so the change here is that we include stripped state even if the appservice is not interested in the user. Which I think is fine.

The reason to batch things up is so that we more efficiently fetch redactions from the DB.

# Appservices are considered 'trusted' by the admin and should have
# applicable metadata on their events.
include_admin_metadata=True,
),
)
54 changes: 45 additions & 9 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def prune_event(event: EventBase) -> EventBase:
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier
pruned_event.internal_metadata.redacted_by = event.internal_metadata.redacted_by

# Mark the event as redacted
pruned_event.internal_metadata.redacted = True
Expand Down Expand Up @@ -123,6 +124,7 @@ def clone_event(event: EventBase) -> EventBase:
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier
new_event.internal_metadata.redacted_by = event.internal_metadata.redacted_by

return new_event

Expand Down Expand Up @@ -444,7 +446,7 @@ def make_config_for_admin(existing: SerializeEventConfig) -> SerializeEventConfi
return attr.evolve(existing, include_admin_metadata=True)


def serialize_event(
def _serialize_event(
e: JsonDict | EventBase,
time_now_ms: int,
*,
Expand Down Expand Up @@ -476,13 +478,6 @@ def serialize_event(
d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
del d["unsigned"]["age_ts"]

if "redacted_because" in e.unsigned:
d["unsigned"]["redacted_because"] = serialize_event(
e.unsigned["redacted_because"],
time_now_ms,
config=config,
)

# If we have applicable fields saved in the internal_metadata, include them in the
# unsigned section of the event if the event was sent by the same session (or when
# appropriate, just the same sender) as the one requesting the event.
Expand Down Expand Up @@ -591,6 +586,7 @@ async def serialize_event(
*,
config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
bundle_aggregations: dict[str, "BundledAggregations"] | None = None,
redaction_map: dict[str, "EventBase"] | None = None,
) -> JsonDict:
"""Serializes a single event.

Expand All @@ -600,6 +596,8 @@ async def serialize_event(
config: Event serialization config
bundle_aggregations: A map from event_id to the aggregations to be bundled
into the event.
redaction_map: Optional pre-fetched map from redaction event_id to event,
used to avoid per-event DB lookups when serializing many events.

Returns:
The serialized event
Expand All @@ -617,7 +615,34 @@ async def serialize_event(
):
config = make_config_for_admin(config)

serialized_event = serialize_event(event, time_now, config=config)
serialized_event = _serialize_event(event, time_now, config=config)

# If the event was redacted, fetch the redaction event from the database
# and include it in the serialized event's unsigned section.
redacted_by: str | None = event.internal_metadata.redacted_by
if redacted_by is not None:
serialized_event.setdefault("unsigned", {})["redacted_by"] = redacted_by
if redaction_map is not None:
redaction_event: EventBase | None = redaction_map.get(redacted_by)
else:
redaction_event = await self._store.get_event(
redacted_by,
allow_none=True,
)
if redaction_event is not None:
serialized_redaction = _serialize_event(
redaction_event, time_now, config=config
)
serialized_event.setdefault("unsigned", {})["redacted_because"] = (
serialized_redaction
)
# format_event_for_client_v1 copies redacted_because to the
# top level, but since we add it after that runs, do it here.
if (
config.as_client_event
and config.event_format is format_event_for_client_v1
):
serialized_event["redacted_because"] = serialized_redaction

new_unsigned = {}
for callback in self._add_extra_fields_to_unsigned_client_event_callbacks:
Expand Down Expand Up @@ -745,12 +770,23 @@ async def serialize_events(
str(len(events)),
)

# Batch-fetch all redaction events in one go rather than one per event.
redaction_ids = {
e.internal_metadata.redacted_by
for e in events
if isinstance(e, EventBase) and e.internal_metadata.redacted_by is not None
}
redaction_map = (
await self._store.get_events(redaction_ids) if redaction_ids else {}
)

return [
await self.serialize_event(
event,
time_now,
config=config,
bundle_aggregations=bundle_aggregations,
redaction_map=redaction_map,
)
for event in events
]
Expand Down
8 changes: 6 additions & 2 deletions synapse/rest/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from synapse.events.utils import (
SerializeEventConfig,
format_event_raw,
serialize_event,
)
from synapse.http.servlet import RestServlet
from synapse.http.site import SynapseRequest
Expand Down Expand Up @@ -40,6 +39,7 @@ def __init__(self, hs: "HomeServer"):
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self._clock = hs.get_clock()
self._event_serializer = hs.get_event_client_serializer()

async def on_GET(
self, request: SynapseRequest, event_id: str
Expand All @@ -64,6 +64,10 @@ async def on_GET(
include_stripped_room_state=True,
include_admin_metadata=True,
)
res = {"event": serialize_event(event, self._clock.time_msec(), config=config)}
res = {
"event": await self._event_serializer.serialize_event(
event, self._clock.time_msec(), config=config
)
}

return HTTPStatus.OK, res
4 changes: 2 additions & 2 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
EventClientSerializer,
SerializeEventConfig,
format_event_for_client_v2,
serialize_event,
)
from synapse.handlers.pagination import GetMessagesResult
from synapse.http.server import HttpServer
Expand Down Expand Up @@ -214,6 +213,7 @@ def __init__(self, hs: "HomeServer"):
self.delayed_events_handler = hs.get_delayed_events_handler()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self._event_serializer = hs.get_event_client_serializer()
self._max_event_delay_ms = hs.config.server.max_event_delay_ms
self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker
self._msc4354_enabled = hs.config.experimental.msc4354_enabled
Expand Down Expand Up @@ -285,7 +285,7 @@ async def on_GET(
raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)

if format == "event":
event = serialize_event(
event = await self._event_serializer.serialize_event(
data,
self.clock.time_msec(),
config=SerializeEventConfig(
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2944,6 +2944,7 @@ def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
values={
"redacts": event.redacts,
"received_ts": self._clock.time_msec(),
"recheck": event.internal_metadata.need_to_check_redaction(),
},
)

Expand Down
62 changes: 62 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ def __init__(
"redactions_received_ts", self._redactions_received_ts
)

self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.REDACTIONS_RECHECK_BG_UPDATE,
self._redactions_recheck_bg_update,
)

# This index gets deleted in `event_fix_redactions_bytes` update
self.db_pool.updates.register_background_index_update(
"event_fix_redactions_bytes_create_index",
Expand Down Expand Up @@ -747,6 +752,63 @@ def _redactions_received_ts_txn(txn: LoggingTransaction) -> int:

return count

async def _redactions_recheck_bg_update(
self, progress: JsonDict, batch_size: int
) -> int:
"""Fills in the `recheck` column of the `redactions` table based on
the `recheck_redaction` field in each event's internal metadata."""
last_event_id = progress.get("last_event_id", "")

def _txn(txn: LoggingTransaction) -> int:
sql = """
SELECT r.event_id, ej.internal_metadata
FROM redactions AS r
LEFT JOIN event_json AS ej USING (event_id)
WHERE r.event_id > ?
ORDER BY r.event_id ASC
LIMIT ?
"""
txn.execute(sql, (last_event_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0

updates = []
for event_id, internal_metadata_json in rows:
if internal_metadata_json is not None:
internal_metadata = db_to_json(internal_metadata_json)
recheck = bool(internal_metadata.get("recheck_redaction", False))
else:
recheck = False
updates.append((event_id, recheck))

self.db_pool.simple_update_many_txn(
txn,
table="redactions",
key_names=("event_id",),
key_values=[(event_id,) for event_id, _ in updates],
value_names=("recheck",),
value_values=[(recheck,) for _, recheck in updates],
)

upper_event_id = rows[-1][0]
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.REDACTIONS_RECHECK_BG_UPDATE,
{"last_event_id": upper_event_id},
)

return len(rows)

count = await self.db_pool.runInteraction("_redactions_recheck_bg_update", _txn)

if not count:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.REDACTIONS_RECHECK_BG_UPDATE
)

return count

async def _event_fix_redactions_bytes(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down
Loading
Loading