mirror of
https://github.com/element-hq/dendrite.git
synced 2025-09-13 21:02:25 +03:00
History visibility database changes (#2533)
* Add new history_visibility column * Update SQL queries to include history_visibility * Store the history visibilty calculated by the roomserver * Update GMSL * Update migrations * Fix migration * Update GMSL * Fix `go.sum` * Update GMSL to use sql.Scanner & sql.Valuer * Re-order migration/table creation * Update gomatrixserverlib * Add history_visibility column to current_room_state * Fix migrations * Return error instead of Fatal log Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
parent
c0c909d306
commit
a7e92f8cb9
17 changed files with 299 additions and 107 deletions
|
@ -51,6 +51,7 @@ CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
|||
-- The serial ID of the output_room_events table when this event became
|
||||
-- part of the current state of the room.
|
||||
added_at BIGINT,
|
||||
history_visibility SMALLINT NOT NULL DEFAULT 2,
|
||||
-- Clobber based on 3-uple of room_id, type and state_key
|
||||
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
||||
);
|
||||
|
@ -63,8 +64,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON sync
|
|||
`
|
||||
|
||||
const upsertRoomStateSQL = "" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)" +
|
||||
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
|
||||
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
|
||||
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
|
||||
|
||||
|
@ -100,11 +101,11 @@ const selectStateEventSQL = "" +
|
|||
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
||||
|
||||
const selectEventsWithEventIDsSQL = "" +
|
||||
// TODO: The session_id and transaction_id blanks are here because otherwise
|
||||
// the rowsToStreamEvents expects there to be exactly six columns. We need to
|
||||
// TODO: The session_id and transaction_id blanks are here because
|
||||
// the rowsToStreamEvents expects there to be exactly seven columns. We need to
|
||||
// figure out if these really need to be in the DB, and if so, we need a
|
||||
// better permanent fix for this. - neilalexander, 2 Jan 2020
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
||||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id, history_visibility" +
|
||||
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
|
@ -336,6 +337,7 @@ func (s *currentRoomStateStatements) UpsertRoomState(
|
|||
headeredJSON,
|
||||
membership,
|
||||
addedAt,
|
||||
event.Visibility,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deltas
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
func LoadAddHistoryVisibilityColumn(m *sqlutil.Migrations) {
|
||||
m.AddMigration(UpAddHistoryVisibilityColumn, DownAddHistoryVisibilityColumn)
|
||||
}
|
||||
|
||||
func UpAddHistoryVisibilityColumn(tx *sql.Tx) error {
|
||||
_, err := tx.Exec(`
|
||||
ALTER TABLE syncapi_output_room_events ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
UPDATE syncapi_output_room_events SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
|
||||
ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
UPDATE syncapi_current_room_state SET history_visibility = 4 WHERE type IN ('m.room.message', 'm.room.encrypted');
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DownAddHistoryVisibilityColumn(tx *sql.Tx) error {
|
||||
_, err := tx.Exec(`
|
||||
ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
|
||||
ALTER TABLE syncapi_current_room_state DROP COLUMN IF EXISTS history_visibility;
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute downgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -67,7 +67,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
|
|||
-- events retrieved through backfilling that have a position in the stream
|
||||
-- that relates to the moment these were retrieved rather than the moment these
|
||||
-- were emitted.
|
||||
exclude_from_sync BOOL DEFAULT FALSE
|
||||
exclude_from_sync BOOL DEFAULT FALSE,
|
||||
-- The history visibility before this event (1 - world_readable; 2 - shared; 3 - invited; 4 - joined)
|
||||
history_visibility SMALLINT NOT NULL DEFAULT 2
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_type_idx ON syncapi_output_room_events (type);
|
||||
|
@ -78,16 +80,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON s
|
|||
|
||||
const insertEventSQL = "" +
|
||||
"INSERT INTO syncapi_output_room_events (" +
|
||||
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
|
||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
|
||||
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
|
||||
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
|
||||
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
|
||||
"RETURNING id"
|
||||
|
||||
const selectEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)"
|
||||
|
||||
const selectEventsWithFilterSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events WHERE event_id = ANY($1)" +
|
||||
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
|
||||
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
|
||||
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
|
||||
|
@ -96,7 +98,7 @@ const selectEventsWithFilterSQL = "" +
|
|||
" LIMIT $7"
|
||||
|
||||
const selectRecentEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
|
@ -105,7 +107,7 @@ const selectRecentEventsSQL = "" +
|
|||
" ORDER BY id DESC LIMIT $8"
|
||||
|
||||
const selectRecentEventsForSyncSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
|
@ -114,7 +116,7 @@ const selectRecentEventsForSyncSQL = "" +
|
|||
" ORDER BY id DESC LIMIT $8"
|
||||
|
||||
const selectEarlyEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
|
@ -130,7 +132,7 @@ const updateEventJSONSQL = "" +
|
|||
|
||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
"SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids, history_visibility" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
" AND room_id = ANY($3)" +
|
||||
|
@ -146,10 +148,10 @@ const deleteEventsForRoomSQL = "" +
|
|||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||
|
||||
const selectContextEventSQL = "" +
|
||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
|
||||
|
||||
const selectContextBeforeEventSQL = "" +
|
||||
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
||||
"SELECT headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
|
@ -157,7 +159,7 @@ const selectContextBeforeEventSQL = "" +
|
|||
" ORDER BY id DESC LIMIT $3"
|
||||
|
||||
const selectContextAfterEventSQL = "" +
|
||||
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
||||
"SELECT id, headered_event_json, history_visibility FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
|
@ -246,14 +248,15 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
addIDs pq.StringArray
|
||||
delIDs pq.StringArray
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs, &historyVisibility); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// Sanity check for deleted state and whine if we see it. We don't need to do anything
|
||||
|
@ -283,6 +286,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
|
|||
needSet[id] = true
|
||||
}
|
||||
stateNeeded[ev.RoomID()] = needSet
|
||||
ev.Visibility = historyVisibility
|
||||
|
||||
eventIDToEvent[eventID] = types.StreamEvent{
|
||||
HeaderedEvent: &ev,
|
||||
|
@ -314,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectMaxEventID(
|
|||
func (s *outputRoomEventsStatements) InsertEvent(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
|
||||
transactionID *api.TransactionID, excludeFromSync bool,
|
||||
transactionID *api.TransactionID, excludeFromSync bool, historyVisibility gomatrixserverlib.HistoryVisibility,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
var txnID *string
|
||||
var sessionID *int64
|
||||
|
@ -351,6 +355,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
|||
sessionID,
|
||||
txnID,
|
||||
excludeFromSync,
|
||||
historyVisibility,
|
||||
).Scan(&streamPos)
|
||||
return
|
||||
}
|
||||
|
@ -504,13 +509,15 @@ func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn
|
|||
row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
|
||||
|
||||
var eventAsString string
|
||||
if err = row.Scan(&id, &eventAsString); err != nil {
|
||||
var historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
if err = row.Scan(&id, &eventAsString, &historyVisibility); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
|
||||
if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
|
||||
return 0, evt, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
return id, evt, nil
|
||||
}
|
||||
|
||||
|
@ -532,15 +539,17 @@ func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&eventBytes); err != nil {
|
||||
if err = rows.Scan(&eventBytes, &historyVisibility); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return evts, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
@ -565,15 +574,17 @@ func (s *outputRoomEventsStatements) SelectContextAfterEvent(
|
|||
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
eventBytes []byte
|
||||
evt *gomatrixserverlib.HeaderedEvent
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err = rows.Scan(&lastID, &eventBytes); err != nil {
|
||||
if err = rows.Scan(&lastID, &eventBytes, &historyVisibility); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &evt); err != nil {
|
||||
return 0, evts, err
|
||||
}
|
||||
evt.Visibility = historyVisibility
|
||||
evts = append(evts, evt)
|
||||
}
|
||||
|
||||
|
@ -584,15 +595,16 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|||
var result []types.StreamEvent
|
||||
for rows.Next() {
|
||||
var (
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
eventID string
|
||||
streamPos types.StreamPosition
|
||||
eventBytes []byte
|
||||
excludeFromSync bool
|
||||
sessionID *int64
|
||||
txnID *string
|
||||
transactionID *api.TransactionID
|
||||
historyVisibility gomatrixserverlib.HistoryVisibility
|
||||
)
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID); err != nil {
|
||||
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: Handle redacted events
|
||||
|
|
|
@ -42,18 +42,16 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewDummyWriter()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err = d.db.Exec(outputRoomEventsSchema); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err = d.db.Exec(currentRoomStateSchema); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
accountData, err := NewPostgresAccountDataTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events, err := NewPostgresEventsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
invites, err := NewPostgresInvitesTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -101,9 +99,19 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
deltas.LoadAddHistoryVisibilityColumn(m)
|
||||
if err = m.RunDeltas(d.db, dbProperties); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// prepare statements after the migrations have run
|
||||
events, err := NewPostgresEventsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currState, err := NewPostgresCurrentRoomStateTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue