This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 278
/
Copy path__init__.py
180 lines (160 loc) · 8.68 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from typing import Sequence, Tuple, Iterator, Optional, Union
from data_diff.abcs.database_types import DbTime, DbPath
from data_diff.databases import Database
from data_diff.tracking import disable_tracking
from data_diff.databases._connect import connect
from data_diff.diff_tables import Algorithm
from data_diff.hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
from data_diff.joindiff_tables import JoinDiffer, TABLE_WRITE_LIMIT
from data_diff.table_segment import TableSegment
from data_diff.utils import eval_name_template, Vector
def connect_to_table(
db_info: Union[str, dict],
table_name: Union[DbPath, str],
key_columns: str = ("id",),
thread_count: Optional[int] = 1,
**kwargs,
) -> TableSegment:
"""Connects to the given database, and creates a TableSegment instance
Parameters:
db_info: Either a URI string, or a dict of connection options.
table_name: Name of the table as a string, or a tuple that signifies the path.
key_columns: Names of the key columns
thread_count: Number of threads for this connection (only if using a threadpooled db implementation)
See Also:
:meth:`connect`
"""
if isinstance(key_columns, str):
key_columns = (key_columns,)
db: Database = connect(db_info, thread_count=thread_count)
if isinstance(table_name, str):
table_name = db.dialect.parse_table_name(table_name)
return TableSegment(db, table_name, key_columns, **kwargs)
def diff_tables(
table1: TableSegment,
table2: TableSegment,
*,
# Name of the key column, which uniquely identifies each row (usually id)
key_columns: Sequence[str] = None,
# Name of updated column, which signals that rows changed (usually updated_at or last_update)
update_column: str = None,
# Extra columns to compare
extra_columns: Tuple[str, ...] = None,
# Start/end key_column values, used to restrict the segment
min_key: Vector = None,
max_key: Vector = None,
# Start/end update_column values, used to restrict the segment
min_update: DbTime = None,
max_update: DbTime = None,
# Enable/disable threaded diffing. Needed to take advantage of database threads.
threaded: bool = True,
# Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
# There may be many pools, so number of actual threads can be a lot higher.
max_threadpool_size: Optional[int] = 1,
# Algorithm
algorithm: Algorithm = Algorithm.AUTO,
# An additional 'where' expression to restrict the search space.
where: str = None,
# Into how many segments to bisect per iteration (hashdiff only)
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
# When should we stop bisecting and compare locally (in row count; hashdiff only)
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
# Enable/disable validating that the key columns are unique. (joindiff only)
validate_unique_key: bool = True,
# Enable/disable sampling of exclusive rows. Creates a temporary table. (joindiff only)
sample_exclusive_rows: bool = False,
# Path of new table to write diff results to. Disabled if not provided. (joindiff only)
materialize_to_table: Union[str, DbPath] = None,
# Materialize every row, not just those that are different. (joindiff only)
materialize_all_rows: bool = False,
# Maximum number of rows to write when materializing, per thread. (joindiff only)
table_write_limit: int = TABLE_WRITE_LIMIT,
# Skips diffing any rows with null keys. (joindiff only)
skip_null_keys: bool = False,
) -> Iterator:
"""Finds the diff between table1 and table2.
Parameters:
key_columns (Tuple[str, ...]): Name of the key column, which uniquely identifies each row (usually id)
update_column (str, optional): Name of updated column, which signals that rows changed.
Usually updated_at or last_update. Used by `min_update` and `max_update`.
extra_columns (Tuple[str, ...], optional): Extra columns to compare
min_key (:data:`Vector`, optional): Lowest key value, used to restrict the segment
max_key (:data:`Vector`, optional): Highest key value, used to restrict the segment
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
Only relevant when `threaded` is ``True``.
There may be many pools, so number of actual threads can be a lot higher.
where (str, optional): An additional 'where' expression to restrict the search space.
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`. Default=`AUTO`)
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
and compare locally. (Used when algorithm is `HASHDIFF`).
validate_unique_key (bool): Enable/disable validating that the key columns are unique. (used for `JOINDIFF`. default: True)
Single query, and can't be threaded, so it's very slow on non-cloud dbs.
Future versions will detect UNIQUE constraints in the schema.
sample_exclusive_rows (bool): Enable/disable sampling of exclusive rows. Creates a temporary table. (used for `JOINDIFF`. default: False)
materialize_to_table (Union[str, DbPath], optional): Path of new table to write diff results to. Disabled if not provided. Used for `JOINDIFF`.
materialize_all_rows (bool): Materialize every row, not just those that are different. (used for `JOINDIFF`. default: False)
table_write_limit (int): Maximum number of rows to write when materializing, per thread.
skip_null_keys (bool): Skips diffing any rows with null PKs (displays a warning if any are null) (used for `JOINDIFF`. default: False)
Note:
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
`key_columns`, `update_column`, `extra_columns`, `min_key`, `max_key`, `where`.
If different values are needed per table, it's possible to omit them here, and instead set
them directly when creating each :class:`TableSegment`.
Example:
>>> table1 = connect_to_table('postgresql:///', 'Rating', 'id')
>>> list(diff_tables(table1, table1))
[]
See Also:
:class:`TableSegment`
:class:`HashDiffer`
:class:`JoinDiffer`
"""
if isinstance(key_columns, str):
key_columns = (key_columns,)
tables = [table1, table2]
override_attrs = {
k: v
for k, v in dict(
key_columns=key_columns,
update_column=update_column,
extra_columns=extra_columns,
min_key=min_key,
max_key=max_key,
min_update=min_update,
max_update=max_update,
where=where,
).items()
if v is not None
}
segments = [t.new(**override_attrs) for t in tables] if override_attrs else tables
algorithm = Algorithm(algorithm)
if algorithm == Algorithm.AUTO:
algorithm = Algorithm.JOINDIFF if table1.database is table2.database else Algorithm.HASHDIFF
if algorithm == Algorithm.HASHDIFF:
differ = HashDiffer(
bisection_factor=bisection_factor,
bisection_threshold=bisection_threshold,
threaded=threaded,
max_threadpool_size=max_threadpool_size,
)
elif algorithm == Algorithm.JOINDIFF:
if isinstance(materialize_to_table, str):
table_name = eval_name_template(materialize_to_table)
materialize_to_table = table1.database.dialect.parse_table_name(table_name)
differ = JoinDiffer(
threaded=threaded,
max_threadpool_size=max_threadpool_size,
validate_unique_key=validate_unique_key,
sample_exclusive_rows=sample_exclusive_rows,
materialize_to_table=materialize_to_table,
materialize_all_rows=materialize_all_rows,
table_write_limit=table_write_limit,
skip_null_keys=skip_null_keys,
)
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
return differ.diff_tables(*segments)