This repository was archived by the owner on May 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 278
/
Copy pathtracking.py
237 lines (190 loc) · 6.35 KB
/
tracking.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#
# This module contains all the functionality related to the anonymous tracking of data-diff use.
#
import logging
import os
import json
import platform
from time import time
from typing import Any, Dict, Optional
import urllib.request
from uuid import uuid4
import toml
from rich import get_console
from data_diff.version import __version__
TRACK_URL = "https://hosted.rudderlabs.com/v1/track"
START_EVENT = "os_diff_run_start"
END_EVENT = "os_diff_run_end"
TOKEN = "2HgtM4Hcq9BmeiCqNYhz7O9tkjM"
TIMEOUT = 8
DEFAULT_PROFILE = os.path.expanduser("~/.datadiff.toml")
def _load_profile():
try:
with open(DEFAULT_PROFILE) as f:
conf = toml.load(f)
except FileNotFoundError:
conf = {}
if "anonymous_id" not in conf:
conf["anonymous_id"] = str(uuid4())
with open(DEFAULT_PROFILE, "w") as f:
toml.dump(conf, f)
return conf
def bool_ask_for_email() -> bool:
"""
Checks the .datadiff.toml profile file for the asked_for_email key
Returns False immediately if --no-tracking or not in an interactive terminal
If found, return False (already asked for email)
If not found, add a key "asked_for_email", and return True (we should ask for email)
Returns:
bool: decision on whether to prompt the user for their email
"""
console = get_console()
if g_tracking_enabled and console.is_interactive:
profile = _load_profile()
if "asked_for_email" not in profile:
profile["asked_for_email"] = ""
with open(DEFAULT_PROFILE, "w") as conf:
toml.dump(profile, conf)
return True
return False
def bool_notify_about_extension() -> bool:
profile = _load_profile()
console = get_console()
if "notified_about_extension" not in profile and console.is_interactive:
profile["notified_about_extension"] = ""
with open(DEFAULT_PROFILE, "w") as conf:
toml.dump(profile, conf)
return True
return False
g_tracking_enabled = True
g_anonymous_id = None
entrypoint_name = "Python API"
def disable_tracking() -> None:
global g_tracking_enabled
g_tracking_enabled = False
def is_tracking_enabled() -> bool:
return g_tracking_enabled
def set_entrypoint_name(s) -> None:
global entrypoint_name
entrypoint_name = s
dbt_user_id = None
dbt_version = None
dbt_project_id = None
def set_dbt_user_id(s) -> None:
global dbt_user_id
dbt_user_id = s
def set_dbt_version(s) -> None:
global dbt_version
dbt_version = s
def set_dbt_project_id(s) -> None:
global dbt_project_id
dbt_project_id = s
def get_anonymous_id() -> str:
global g_anonymous_id
if g_anonymous_id is None:
profile = _load_profile()
g_anonymous_id = profile["anonymous_id"]
return g_anonymous_id
def create_start_event_json(diff_options: Dict[str, Any]):
return {
"event": "os_diff_run_start",
"properties": {
"distinct_id": get_anonymous_id(),
"token": TOKEN,
"time": time(),
"os_type": os.name,
"os_version": platform.platform(),
"python_version": f"{platform.python_version()}/{platform.python_implementation()}",
"diff_options": diff_options,
"data_diff_version:": __version__,
"entrypoint_name": entrypoint_name,
"dbt_user_id": dbt_user_id,
"dbt_version": dbt_version,
"dbt_project_id": dbt_project_id,
},
}
def create_end_event_json(
is_success: bool,
runtime_seconds: float,
data_source_1_type: str,
data_source_2_type: str,
table1_count: int,
table2_count: int,
diff_count: int,
error: Optional[str],
diff_id: Optional[int] = None,
is_cloud: bool = False,
org_id: Optional[int] = None,
org_name: Optional[str] = None,
user_id: Optional[int] = None,
):
return {
"event": "os_diff_run_end",
"properties": {
"distinct_id": get_anonymous_id(),
"token": TOKEN,
"time": time(),
"is_success": is_success,
"runtime_seconds": runtime_seconds,
"data_source_1_type": data_source_1_type,
"data_source_2_type": data_source_2_type,
"table_1_rows_cnt": table1_count,
"table_2_rows_cnt": table2_count,
"diff_rows_cnt": diff_count,
"error_message": error,
"data_diff_version:": __version__,
"entrypoint_name": entrypoint_name,
"is_cloud": is_cloud,
"diff_id": diff_id,
"dbt_user_id": dbt_user_id,
"dbt_version": dbt_version,
"dbt_project_id": dbt_project_id,
"org_id": org_id,
"org_name": org_name,
"user_id": user_id,
},
}
def create_email_signup_event_json(email: str) -> Dict[str, Any]:
return {
"event": "os_diff_email_opt_in",
"properties": {
"distinct_id": get_anonymous_id(),
"token": TOKEN,
"time": time(),
"data_diff_version:": __version__,
"entrypoint_name": entrypoint_name,
"email": email,
"dbt_user_id": dbt_user_id,
"dbt_project_id": dbt_project_id,
},
}
def convert_sets_to_lists(obj):
"""
Recursively convert sets in the given object to lists.
"""
if isinstance(obj, set):
return list(obj)
elif isinstance(obj, dict):
return {k: convert_sets_to_lists(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_sets_to_lists(elem) for elem in obj]
else:
return obj
def send_event_json(event_json) -> None:
if not g_tracking_enabled:
raise RuntimeError("Won't send; tracking is disabled!")
# Convert sets to lists in event_json
event_json = convert_sets_to_lists(event_json)
headers = {
"Content-Type": "application/json",
"Authorization": "Basic MkhndE00SGNxOUJtZWlDcU5ZaHo3Tzl0a2pNOg==",
}
data = json.dumps(event_json).encode()
try:
req = urllib.request.Request(TRACK_URL, data=data, headers=headers)
with urllib.request.urlopen(req, timeout=TIMEOUT) as f:
res = f.read()
if f.code != 200:
raise RuntimeError(res)
except Exception as e:
logging.debug(f"Failed to post to Rudderstack: {e}")