// Copyright 2024 New Vector Ltd. // Copyright 2019, 2020 The Matrix.org Foundation C.I.C. // Copyright 2017, 2018 New Vector Ltd // // SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial // Please see LICENSE files in the repository root for full details. package postgres import ( "context" "database/sql" "github.com/element-hq/dendrite/internal" "github.com/element-hq/dendrite/internal/sqlutil" "github.com/element-hq/dendrite/roomserver/storage/tables" "github.com/element-hq/dendrite/roomserver/types" ) const eventJSONSchema = ` -- Stores the JSON for each event. This kept separate from the main events -- table to keep the rows in the main events table small. CREATE TABLE IF NOT EXISTS roomserver_event_json ( -- Local numeric ID for the event. event_nid BIGINT NOT NULL PRIMARY KEY, -- The JSON for the event. -- Stored as TEXT because this should be valid UTF-8. -- Not stored as a JSONB because we always just pull the entire event -- so there is no point in postgres parsing it. -- Not stored as JSON because we already validate the JSON in the server -- so there is no point in postgres validating it. -- TODO: Should we be compressing the events with Snappy or DEFLATE? event_json TEXT NOT NULL ); ` const insertEventJSONSQL = "" + "INSERT INTO roomserver_event_json (event_nid, event_json) VALUES ($1, $2)" + " ON CONFLICT (event_nid) DO UPDATE SET event_json=$2" // Bulk event JSON lookup by numeric event ID. // Sort by the numeric event ID. // This means that we can use binary search to lookup by numeric event ID. const bulkSelectEventJSONSQL = "" + "SELECT event_nid, event_json FROM roomserver_event_json" + " WHERE event_nid = ANY($1)" + " ORDER BY event_nid ASC" type eventJSONStatements struct { insertEventJSONStmt *sql.Stmt bulkSelectEventJSONStmt *sql.Stmt } func CreateEventJSONTable(db *sql.DB) error { _, err := db.Exec(eventJSONSchema) return err } func PrepareEventJSONTable(db *sql.DB) (tables.EventJSON, error) { s := &eventJSONStatements{} return s, sqlutil.StatementList{ {&s.insertEventJSONStmt, insertEventJSONSQL}, {&s.bulkSelectEventJSONStmt, bulkSelectEventJSONSQL}, }.Prepare(db) } func (s *eventJSONStatements) InsertEventJSON( ctx context.Context, txn *sql.Tx, eventNID types.EventNID, eventJSON []byte, ) error { stmt := sqlutil.TxStmt(txn, s.insertEventJSONStmt) _, err := stmt.ExecContext(ctx, int64(eventNID), eventJSON) return err } func (s *eventJSONStatements) BulkSelectEventJSON( ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, ) ([]tables.EventJSONPair, error) { stmt := sqlutil.TxStmt(txn, s.bulkSelectEventJSONStmt) rows, err := stmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs)) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventJSON: rows.close() failed") // We know that we will only get as many results as event NIDs // because of the unique constraint on event NIDs. // So we can allocate an array of the correct size now. // We might get fewer results than NIDs so we adjust the length of the slice before returning it. results := make([]tables.EventJSONPair, len(eventNIDs)) i := 0 var eventNID int64 for ; rows.Next(); i++ { result := &results[i] if err := rows.Scan(&eventNID, &result.EventJSON); err != nil { return nil, err } result.EventNID = types.EventNID(eventNID) } return results[:i], rows.Err() }