Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 38b346a

Browse files
authored
Replace or_ignore in simple_insert with simple_upsert (#10442)
Now that we have `simple_upsert` that should be used in preference to trying to insert and looking for an exception. The main benefit is that we ERROR message don't get written to postgres logs. We also have tidy up the return value on `simple_upsert`, rather than having a tri-state of inserted/not-inserted/unknown.
1 parent d8324b8 commit 38b346a

File tree

6 files changed

+44
-99
lines changed

6 files changed

+44
-99
lines changed

changelog.d/10442.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace usage of `or_ignore` in `simple_insert` with `simple_upsert` usage, to stop spamming postgres logs with spurious ERROR messages.

synapse/storage/database.py

Lines changed: 20 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -832,31 +832,16 @@ async def simple_insert(
832832
self,
833833
table: str,
834834
values: Dict[str, Any],
835-
or_ignore: bool = False,
836835
desc: str = "simple_insert",
837-
) -> bool:
836+
) -> None:
838837
"""Executes an INSERT query on the named table.
839838
840839
Args:
841840
table: string giving the table name
842841
values: dict of new column names and values for them
843-
or_ignore: bool stating whether an exception should be raised
844-
when a conflicting row already exists. If True, False will be
845-
returned by the function instead
846842
desc: description of the transaction, for logging and metrics
847-
848-
Returns:
849-
Whether the row was inserted or not. Only useful when `or_ignore` is True
850843
"""
851-
try:
852-
await self.runInteraction(desc, self.simple_insert_txn, table, values)
853-
except self.engine.module.IntegrityError:
854-
# We have to do or_ignore flag at this layer, since we can't reuse
855-
# a cursor after we receive an error from the db.
856-
if not or_ignore:
857-
raise
858-
return False
859-
return True
844+
await self.runInteraction(desc, self.simple_insert_txn, table, values)
860845

861846
@staticmethod
862847
def simple_insert_txn(
@@ -930,7 +915,7 @@ async def simple_upsert(
930915
insertion_values: Optional[Dict[str, Any]] = None,
931916
desc: str = "simple_upsert",
932917
lock: bool = True,
933-
) -> Optional[bool]:
918+
) -> bool:
934919
"""
935920
936921
`lock` should generally be set to True (the default), but can be set
@@ -951,8 +936,8 @@ async def simple_upsert(
951936
desc: description of the transaction, for logging and metrics
952937
lock: True to lock the table when doing the upsert.
953938
Returns:
954-
Native upserts always return None. Emulated upserts return True if a
955-
new entry was created, False if an existing one was updated.
939+
Returns True if a row was inserted or updated (i.e. if `values` is
940+
not empty then this always returns True)
956941
"""
957942
insertion_values = insertion_values or {}
958943

@@ -995,7 +980,7 @@ def simple_upsert_txn(
995980
values: Dict[str, Any],
996981
insertion_values: Optional[Dict[str, Any]] = None,
997982
lock: bool = True,
998-
) -> Optional[bool]:
983+
) -> bool:
999984
"""
1000985
Pick the UPSERT method which works best on the platform. Either the
1001986
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
@@ -1008,16 +993,15 @@ def simple_upsert_txn(
1008993
insertion_values: additional key/values to use only when inserting
1009994
lock: True to lock the table when doing the upsert.
1010995
Returns:
1011-
Native upserts always return None. Emulated upserts return True if a
1012-
new entry was created, False if an existing one was updated.
996+
Returns True if a row was inserted or updated (i.e. if `values` is
997+
not empty then this always returns True)
1013998
"""
1014999
insertion_values = insertion_values or {}
10151000

10161001
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
1017-
self.simple_upsert_txn_native_upsert(
1002+
return self.simple_upsert_txn_native_upsert(
10181003
txn, table, keyvalues, values, insertion_values=insertion_values
10191004
)
1020-
return None
10211005
else:
10221006
return self.simple_upsert_txn_emulated(
10231007
txn,
@@ -1045,8 +1029,8 @@ def simple_upsert_txn_emulated(
10451029
insertion_values: additional key/values to use only when inserting
10461030
lock: True to lock the table when doing the upsert.
10471031
Returns:
1048-
Returns True if a new entry was created, False if an existing
1049-
one was updated.
1032+
Returns True if a row was inserted or updated (i.e. if `values` is
1033+
not empty then this always returns True)
10501034
"""
10511035
insertion_values = insertion_values or {}
10521036

@@ -1086,8 +1070,7 @@ def _getwhere(key):
10861070

10871071
txn.execute(sql, sqlargs)
10881072
if txn.rowcount > 0:
1089-
# successfully updated at least one row.
1090-
return False
1073+
return True
10911074

10921075
# We didn't find any existing rows, so insert a new one
10931076
allvalues: Dict[str, Any] = {}
@@ -1111,15 +1094,19 @@ def simple_upsert_txn_native_upsert(
11111094
keyvalues: Dict[str, Any],
11121095
values: Dict[str, Any],
11131096
insertion_values: Optional[Dict[str, Any]] = None,
1114-
) -> None:
1097+
) -> bool:
11151098
"""
1116-
Use the native UPSERT functionality in recent PostgreSQL versions.
1099+
Use the native UPSERT functionality in PostgreSQL.
11171100
11181101
Args:
11191102
table: The table to upsert into
11201103
keyvalues: The unique key tables and their new values
11211104
values: The nonunique columns and their new values
11221105
insertion_values: additional key/values to use only when inserting
1106+
1107+
Returns:
1108+
Returns True if a row was inserted or updated (i.e. if `values` is
1109+
not empty then this always returns True)
11231110
"""
11241111
allvalues: Dict[str, Any] = {}
11251112
allvalues.update(keyvalues)
@@ -1140,6 +1127,8 @@ def simple_upsert_txn_native_upsert(
11401127
)
11411128
txn.execute(sql, list(allvalues.values()))
11421129

1130+
return bool(txn.rowcount)
1131+
11431132
async def simple_upsert_many(
11441133
self,
11451134
table: str,

synapse/storage/databases/main/devices.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,16 +1078,18 @@ async def store_device(
10781078
return False
10791079

10801080
try:
1081-
inserted = await self.db_pool.simple_insert(
1081+
inserted = await self.db_pool.simple_upsert(
10821082
"devices",
1083-
values={
1083+
keyvalues={
10841084
"user_id": user_id,
10851085
"device_id": device_id,
1086+
},
1087+
values={},
1088+
insertion_values={
10861089
"display_name": initial_device_display_name,
10871090
"hidden": False,
10881091
},
10891092
desc="store_device",
1090-
or_ignore=True,
10911093
)
10921094
if not inserted:
10931095
# if the device already exists, check if it's a real device, or
@@ -1099,6 +1101,7 @@ async def store_device(
10991101
)
11001102
if hidden:
11011103
raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
1104+
11021105
self.device_id_exists_cache.set(key, True)
11031106
return inserted
11041107
except StoreError:

synapse/storage/databases/main/monthly_active_users.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -297,17 +297,13 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
297297
Args:
298298
txn (cursor):
299299
user_id (str): user to add/update
300-
301-
Returns:
302-
bool: True if a new entry was created, False if an
303-
existing one was updated.
304300
"""
305301

306302
# Am consciously deciding to lock the table on the basis that is ought
307303
# never be a big table and alternative approaches (batching multiple
308304
# upserts into a single txn) introduced a lot of extra complexity.
309305
# See https://github.com/matrix-org/synapse/issues/3854 for more
310-
is_insert = self.db_pool.simple_upsert_txn(
306+
self.db_pool.simple_upsert_txn(
311307
txn,
312308
table="monthly_active_users",
313309
keyvalues={"user_id": user_id},
@@ -322,8 +318,6 @@ def upsert_monthly_active_user_txn(self, txn, user_id):
322318
txn, self.user_last_seen_monthly_active, (user_id,)
323319
)
324320

325-
return is_insert
326-
327321
async def populate_monthly_active_users(self, user_id):
328322
"""Checks on the state of monthly active user limits and optionally
329323
add the user to the monthly active tables

synapse/storage/databases/main/transactions.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,16 +134,18 @@ async def set_received_txn_response(
134134
response_dict: The response, to be encoded into JSON.
135135
"""
136136

137-
await self.db_pool.simple_insert(
137+
await self.db_pool.simple_upsert(
138138
table="received_transactions",
139-
values={
139+
keyvalues={
140140
"transaction_id": transaction_id,
141141
"origin": origin,
142+
},
143+
values={},
144+
insertion_values={
142145
"response_code": code,
143146
"response_json": db_binary_type(encode_canonical_json(response_dict)),
144147
"ts": self._clock.time_msec(),
145148
},
146-
or_ignore=True,
147149
desc="set_received_txn_response",
148150
)
149151

synapse/storage/databases/main/user_directory.py

Lines changed: 11 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ async def update_profile_in_user_dir(
377377
avatar_url = None
378378

379379
def _update_profile_in_user_dir_txn(txn):
380-
new_entry = self.db_pool.simple_upsert_txn(
380+
self.db_pool.simple_upsert_txn(
381381
txn,
382382
table="user_directory",
383383
keyvalues={"user_id": user_id},
@@ -388,67 +388,23 @@ def _update_profile_in_user_dir_txn(txn):
388388
if isinstance(self.database_engine, PostgresEngine):
389389
# We weight the localpart most highly, then display name and finally
390390
# server name
391-
if self.database_engine.can_native_upsert:
392-
sql = """
391+
sql = """
393392
INSERT INTO user_directory_search(user_id, vector)
394393
VALUES (?,
395394
setweight(to_tsvector('simple', ?), 'A')
396395
|| setweight(to_tsvector('simple', ?), 'D')
397396
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
398397
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
399398
"""
400-
txn.execute(
401-
sql,
402-
(
403-
user_id,
404-
get_localpart_from_id(user_id),
405-
get_domain_from_id(user_id),
406-
display_name,
407-
),
408-
)
409-
else:
410-
# TODO: Remove this code after we've bumped the minimum version
411-
# of postgres to always support upserts, so we can get rid of
412-
# `new_entry` usage
413-
if new_entry is True:
414-
sql = """
415-
INSERT INTO user_directory_search(user_id, vector)
416-
VALUES (?,
417-
setweight(to_tsvector('simple', ?), 'A')
418-
|| setweight(to_tsvector('simple', ?), 'D')
419-
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
420-
)
421-
"""
422-
txn.execute(
423-
sql,
424-
(
425-
user_id,
426-
get_localpart_from_id(user_id),
427-
get_domain_from_id(user_id),
428-
display_name,
429-
),
430-
)
431-
elif new_entry is False:
432-
sql = """
433-
UPDATE user_directory_search
434-
SET vector = setweight(to_tsvector('simple', ?), 'A')
435-
|| setweight(to_tsvector('simple', ?), 'D')
436-
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
437-
WHERE user_id = ?
438-
"""
439-
txn.execute(
440-
sql,
441-
(
442-
get_localpart_from_id(user_id),
443-
get_domain_from_id(user_id),
444-
display_name,
445-
user_id,
446-
),
447-
)
448-
else:
449-
raise RuntimeError(
450-
"upsert returned None when 'can_native_upsert' is False"
451-
)
399+
txn.execute(
400+
sql,
401+
(
402+
user_id,
403+
get_localpart_from_id(user_id),
404+
get_domain_from_id(user_id),
405+
display_name,
406+
),
407+
)
452408
elif isinstance(self.database_engine, Sqlite3Engine):
453409
value = "%s %s" % (user_id, display_name) if display_name else user_id
454410
self.db_pool.simple_upsert_txn(

0 commit comments

Comments
 (0)