@@ -80,11 +80,11 @@ def store_search_entries_txn(
80
80
if not self .hs .config .server .enable_search :
81
81
return
82
82
if isinstance (self .database_engine , PostgresEngine ):
83
- sql = (
84
- " INSERT INTO event_search"
85
- " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
86
- " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
87
- )
83
+ sql = """
84
+ INSERT INTO event_search
85
+ (event_id, room_id, key, vector, stream_ordering, origin_server_ts)
86
+ VALUES (?,?,?,to_tsvector('english', ?),?,?)
87
+ """
88
88
89
89
args1 = (
90
90
(
@@ -101,20 +101,20 @@ def store_search_entries_txn(
101
101
txn .execute_batch (sql , args1 )
102
102
103
103
elif isinstance (self .database_engine , Sqlite3Engine ):
104
- sql = (
105
- "INSERT INTO event_search (event_id, room_id, key, value)"
106
- " VALUES (?,?,?,?)"
107
- )
108
- args2 = (
109
- (
110
- entry .event_id ,
111
- entry .room_id ,
112
- entry .key ,
113
- _clean_value_for_search (entry .value ),
114
- )
115
- for entry in entries
104
+ self .db_pool .simple_insert_many_txn (
105
+ txn ,
106
+ table = "event_search" ,
107
+ keys = ("event_id" , "room_id" , "key" , "value" ),
108
+ values = (
109
+ (
110
+ entry .event_id ,
111
+ entry .room_id ,
112
+ entry .key ,
113
+ _clean_value_for_search (entry .value ),
114
+ )
115
+ for entry in entries
116
+ ),
116
117
)
117
- txn .execute_batch (sql , args2 )
118
118
119
119
else :
120
120
# This should be unreachable.
@@ -162,15 +162,17 @@ async def _background_reindex_search(
162
162
TYPES = ["m.room.name" , "m.room.message" , "m.room.topic" ]
163
163
164
164
def reindex_search_txn (txn : LoggingTransaction ) -> int :
165
- sql = (
166
- "SELECT stream_ordering, event_id, room_id, type, json, "
167
- " origin_server_ts FROM events"
168
- " JOIN event_json USING (room_id, event_id)"
169
- " WHERE ? <= stream_ordering AND stream_ordering < ?"
170
- " AND (%s)"
171
- " ORDER BY stream_ordering DESC"
172
- " LIMIT ?"
173
- ) % (" OR " .join ("type = '%s'" % (t ,) for t in TYPES ),)
165
+ sql = """
166
+ SELECT stream_ordering, event_id, room_id, type, json, origin_server_ts
167
+ FROM events
168
+ JOIN event_json USING (room_id, event_id)
169
+ WHERE ? <= stream_ordering AND stream_ordering < ?
170
+ AND (%s)
171
+ ORDER BY stream_ordering DESC
172
+ LIMIT ?
173
+ """ % (
174
+ " OR " .join ("type = '%s'" % (t ,) for t in TYPES ),
175
+ )
174
176
175
177
txn .execute (sql , (target_min_stream_id , max_stream_id , batch_size ))
176
178
@@ -284,8 +286,10 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
284
286
285
287
try :
286
288
c .execute (
287
- "CREATE INDEX CONCURRENTLY event_search_fts_idx"
288
- " ON event_search USING GIN (vector)"
289
+ """
290
+ CREATE INDEX CONCURRENTLY event_search_fts_idx
291
+ ON event_search USING GIN (vector)
292
+ """
289
293
)
290
294
except psycopg2 .ProgrammingError as e :
291
295
logger .warning (
@@ -323,12 +327,16 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
323
327
# We create with NULLS FIRST so that when we search *backwards*
324
328
# we get the ones with non null origin_server_ts *first*
325
329
c .execute (
326
- "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search("
327
- "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
330
+ """
331
+ CREATE INDEX CONCURRENTLY event_search_room_order
332
+ ON event_search(room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
333
+ """
328
334
)
329
335
c .execute (
330
- "CREATE INDEX CONCURRENTLY event_search_order ON event_search("
331
- "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)"
336
+ """
337
+ CREATE INDEX CONCURRENTLY event_search_order
338
+ ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
339
+ """
332
340
)
333
341
conn .set_session (autocommit = False )
334
342
@@ -345,14 +353,14 @@ def create_index(conn: LoggingDatabaseConnection) -> None:
345
353
)
346
354
347
355
def reindex_search_txn (txn : LoggingTransaction ) -> Tuple [int , bool ]:
348
- sql = (
349
- " UPDATE event_search AS es SET stream_ordering = e.stream_ordering,"
350
- " origin_server_ts = e.origin_server_ts"
351
- " FROM events AS e"
352
- " WHERE e.event_id = es.event_id"
353
- " AND ? <= e.stream_ordering AND e.stream_ordering < ?"
354
- " RETURNING es.stream_ordering"
355
- )
356
+ sql = """
357
+ UPDATE event_search AS es
358
+ SET stream_ordering = e.stream_ordering, origin_server_ts = e.origin_server_ts
359
+ FROM events AS e
360
+ WHERE e.event_id = es.event_id
361
+ AND ? <= e.stream_ordering AND e.stream_ordering < ?
362
+ RETURNING es.stream_ordering
363
+ """
356
364
357
365
min_stream_id = max_stream_id - batch_size
358
366
txn .execute (sql , (min_stream_id , max_stream_id ))
@@ -456,33 +464,33 @@ async def search_msgs(
456
464
if isinstance (self .database_engine , PostgresEngine ):
457
465
search_query = search_term
458
466
tsquery_func = self .database_engine .tsquery_func
459
- sql = (
460
- f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,"
461
- " room_id, event_id"
462
- " FROM event_search"
463
- f" WHERE vector @@ { tsquery_func } ('english', ?)"
464
- )
467
+ sql = f"""
468
+ SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) AS rank,
469
+ room_id, event_id
470
+ FROM event_search
471
+ WHERE vector @@ { tsquery_func } ('english', ?)
472
+ """
465
473
args = [search_query , search_query ] + args
466
474
467
- count_sql = (
468
- " SELECT room_id, count(*) as count FROM event_search"
469
- f" WHERE vector @@ { tsquery_func } ('english', ?)"
470
- )
475
+ count_sql = f"""
476
+ SELECT room_id, count(*) as count FROM event_search
477
+ WHERE vector @@ { tsquery_func } ('english', ?)
478
+ """
471
479
count_args = [search_query ] + count_args
472
480
elif isinstance (self .database_engine , Sqlite3Engine ):
473
481
search_query = _parse_query_for_sqlite (search_term )
474
482
475
- sql = (
476
- " SELECT rank(matchinfo(event_search)) as rank, room_id, event_id"
477
- " FROM event_search"
478
- " WHERE value MATCH ?"
479
- )
483
+ sql = """
484
+ SELECT rank(matchinfo(event_search)) as rank, room_id, event_id
485
+ FROM event_search
486
+ WHERE value MATCH ?
487
+ """
480
488
args = [search_query ] + args
481
489
482
- count_sql = (
483
- " SELECT room_id, count(*) as count FROM event_search"
484
- " WHERE value MATCH ?"
485
- )
490
+ count_sql = """
491
+ SELECT room_id, count(*) as count FROM event_search
492
+ WHERE value MATCH ?
493
+ """
486
494
count_args = [search_query ] + count_args
487
495
else :
488
496
# This should be unreachable.
@@ -588,26 +596,27 @@ async def search_rooms(
588
596
raise SynapseError (400 , "Invalid pagination token" )
589
597
590
598
clauses .append (
591
- "(origin_server_ts < ?"
592
- " OR (origin_server_ts = ? AND stream_ordering < ?))"
599
+ """
600
+ (origin_server_ts < ? OR (origin_server_ts = ? AND stream_ordering < ?))
601
+ """
593
602
)
594
603
args .extend ([origin_server_ts , origin_server_ts , stream ])
595
604
596
605
if isinstance (self .database_engine , PostgresEngine ):
597
606
search_query = search_term
598
607
tsquery_func = self .database_engine .tsquery_func
599
- sql = (
600
- f" SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,"
601
- " origin_server_ts, stream_ordering, room_id, event_id"
602
- " FROM event_search"
603
- f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
604
- )
608
+ sql = f"""
609
+ SELECT ts_rank_cd(vector, { tsquery_func } ('english', ?)) as rank,
610
+ origin_server_ts, stream_ordering, room_id, event_id
611
+ FROM event_search
612
+ WHERE vector @@ { tsquery_func } ('english', ?) AND
613
+ """
605
614
args = [search_query , search_query ] + args
606
615
607
- count_sql = (
608
- " SELECT room_id, count(*) as count FROM event_search"
609
- f" WHERE vector @@ { tsquery_func } ('english', ?) AND "
610
- )
616
+ count_sql = f"""
617
+ SELECT room_id, count(*) as count FROM event_search
618
+ WHERE vector @@ { tsquery_func } ('english', ?) AND
619
+ """
611
620
count_args = [search_query ] + count_args
612
621
elif isinstance (self .database_engine , Sqlite3Engine ):
613
622
@@ -619,23 +628,24 @@ async def search_rooms(
619
628
# in the events table to get the topological ordering. We need
620
629
# to use the indexes in this order because sqlite refuses to
621
630
# MATCH unless it uses the full text search index
622
- sql = (
623
- "SELECT rank(matchinfo) as rank, room_id, event_id,"
624
- " origin_server_ts, stream_ordering"
625
- " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo"
626
- " FROM event_search"
627
- " WHERE value MATCH ?"
628
- " )"
629
- " CROSS JOIN events USING (event_id)"
630
- " WHERE "
631
+ sql = """
632
+ SELECT
633
+ rank(matchinfo) as rank, room_id, event_id, origin_server_ts, stream_ordering
634
+ FROM (
635
+ SELECT key, event_id, matchinfo(event_search) as matchinfo
636
+ FROM event_search
637
+ WHERE value MATCH ?
631
638
)
639
+ CROSS JOIN events USING (event_id)
640
+ WHERE
641
+ """
632
642
search_query = _parse_query_for_sqlite (search_term )
633
643
args = [search_query ] + args
634
644
635
- count_sql = (
636
- " SELECT room_id, count(*) as count FROM event_search"
637
- " WHERE value MATCH ? AND "
638
- )
645
+ count_sql = """
646
+ SELECT room_id, count(*) as count FROM event_search
647
+ WHERE value MATCH ? AND
648
+ """
639
649
count_args = [search_query ] + count_args
640
650
else :
641
651
# This should be unreachable.
@@ -647,10 +657,10 @@ async def search_rooms(
647
657
# We add an arbitrary limit here to ensure we don't try to pull the
648
658
# entire table from the database.
649
659
if isinstance (self .database_engine , PostgresEngine ):
650
- sql += (
651
- " ORDER BY origin_server_ts DESC NULLS LAST,"
652
- " stream_ordering DESC NULLS LAST LIMIT ?"
653
- )
660
+ sql += """
661
+ ORDER BY origin_server_ts DESC NULLS LAST, stream_ordering DESC NULLS LAST
662
+ LIMIT ?
663
+ """
654
664
elif isinstance (self .database_engine , Sqlite3Engine ):
655
665
sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?"
656
666
else :
0 commit comments