-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathSnowflakeClient.cs
298 lines (244 loc) · 12.7 KB
/
SnowflakeClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
using Snowflake.Client.Helpers;
using Snowflake.Client.Json;
using Snowflake.Client.Model;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace Snowflake.Client
{
public class SnowflakeClient : ISnowflakeClient
{
/// <summary>
/// Current Snowflake session.
/// </summary>
public SnowflakeSession SnowflakeSession { get; private set; }
private readonly RestClient _restClient;
private readonly RequestBuilder _requestBuilder;
private readonly SnowflakeClientSettings _clientSettings;
/// <summary>
/// Creates new Snowflake client.
/// </summary>
/// <param name="user">Username</param>
/// <param name="password">Password</param>
/// <param name="account">Account</param>
/// <param name="region">Region: "us-east-1", etc. Required for all except for US West Oregon (us-west-2).</param>
public SnowflakeClient(string user, string password, string account, string region = null)
: this(new AuthInfo(user, password, account, region))
{
}
/// <summary>
/// Creates new Snowflake client.
/// </summary>
/// <param name="authInfo">Auth information: user, password, account, region</param>
/// <param name="sessionInfo">Session information: role, schema, database, warehouse</param>
/// <param name="urlInfo">URL information: host, protocol and port</param>
/// <param name="jsonMapperOptions">JsonSerializerOptions which will be used to map response to your model</param>
public SnowflakeClient(AuthInfo authInfo, SessionInfo sessionInfo = null, UrlInfo urlInfo = null, JsonSerializerOptions jsonMapperOptions = null)
: this(new SnowflakeClientSettings(authInfo, sessionInfo, urlInfo, jsonMapperOptions))
{
}
/// <summary>
/// Creates new Snowflake client.
/// </summary>
/// <param name="settings">Client settings to initialize new session.</param>
public SnowflakeClient(SnowflakeClientSettings settings)
{
ValidateClientSettings(settings);
_clientSettings = settings;
_restClient = new RestClient();
_requestBuilder = new RequestBuilder(settings.UrlInfo);
SnowflakeDataMapper.SetJsonMapperOptions(settings.JsonMapperOptions);
}
private static void ValidateClientSettings(SnowflakeClientSettings settings)
{
if (settings == null)
throw new ArgumentException("Settings object cannot be null.");
if (string.IsNullOrEmpty(settings.AuthInfo?.User))
throw new ArgumentException("User name is either empty or null.");
if (string.IsNullOrEmpty(settings.AuthInfo?.Password))
throw new ArgumentException("User password is either empty or null.");
if (string.IsNullOrEmpty(settings.AuthInfo?.Account))
throw new ArgumentException("Snowflake account is either empty or null.");
if (settings.UrlInfo?.Protocol != "https" && settings.UrlInfo?.Protocol != "http")
throw new ArgumentException("URL Protocol should be either http or https.");
if (string.IsNullOrEmpty(settings.UrlInfo?.Host))
throw new ArgumentException("URL Host cannot be empty.");
if (!settings.UrlInfo.Host.ToLower().EndsWith(".snowflakecomputing.com"))
throw new ArgumentException("URL Host should end up with '.snowflakecomputing.com'.");
}
/// <summary>
/// Initializes new Snowflake session.
/// </summary>
/// <returns>True if session successfully initialized</returns>
public async Task<bool> InitNewSessionAsync(CancellationToken ct = default)
{
SnowflakeSession = await AuthenticateAsync(_clientSettings.AuthInfo, _clientSettings.SessionInfo, ct).ConfigureAwait(false);
_requestBuilder.SetSessionTokens(SnowflakeSession.SessionToken, SnowflakeSession.MasterToken);
return true;
}
private async Task<SnowflakeSession> AuthenticateAsync(AuthInfo authInfo, SessionInfo sessionInfo, CancellationToken ct)
{
var loginRequest = _requestBuilder.BuildLoginRequest(authInfo, sessionInfo);
var response = await _restClient.SendAsync<LoginResponse>(loginRequest, ct).ConfigureAwait(false);
if (!response.Success)
throw new SnowflakeException($"Authentication failed. Message: {response.Message}", response.Code);
return new SnowflakeSession(response.Data);
}
/// <summary>
/// Renew session
/// </summary>
/// <returns>True if session successfully renewed</returns>
public async Task<bool> RenewSessionAsync(CancellationToken ct = default)
{
if (SnowflakeSession == null)
throw new SnowflakeException("Session is not initialized yet.");
var renewSessionRequest = _requestBuilder.BuildRenewSessionRequest();
var response = await _restClient.SendAsync<RenewSessionResponse>(renewSessionRequest, ct).ConfigureAwait(false);
if (!response.Success)
throw new SnowflakeException($"Renew session failed. Message: {response.Message}", response.Code);
SnowflakeSession.Renew(response.Data);
_requestBuilder.SetSessionTokens(SnowflakeSession.SessionToken, SnowflakeSession.MasterToken);
return true;
}
/// <summary>
/// Execute SQL that selects a single value.
/// </summary>
/// <param name="sql">The SQL to execute.</param>
/// <param name="sqlParams">The parameters to use for this command.</param>
/// <returns>The first cell value returned as string.</returns>
public async Task<string> ExecuteScalarAsync(string sql, object sqlParams = null, CancellationToken ct = default)
{
var response = await QueryInternalAsync(sql, sqlParams, false, ct).ConfigureAwait(false);
var rawResult = response.Data.RowSet.FirstOrDefault()?.FirstOrDefault();
return rawResult;
}
/// <summary>
/// Execute parameterized SQL.
/// </summary>
/// <param name="sql">The SQL to execute for this query.</param>
/// <param name="sqlParams">The parameters to use for this query.</param>
/// <returns>The number of rows affected.</returns>
public async Task<long> ExecuteAsync(string sql, object sqlParams = null, CancellationToken ct = default)
{
var response = await QueryInternalAsync(sql, sqlParams, false, ct).ConfigureAwait(false);
var affectedRows = SnowflakeUtils.GetAffectedRowsCount(response);
return affectedRows;
}
/// <summary>
/// Executes a query, returning the data typed as <typeparamref name="T"/>.
/// </summary>
/// <typeparam name="T">The type of results to return.</typeparam>
/// <param name="sql">The SQL to execute.</param>
/// <param name="sqlParams">The parameters to use for this command.</param>
/// <returns>A sequence of data of the supplied type: one instance per row.</returns>
public async Task<IEnumerable<T>> QueryAsync<T>(string sql, object sqlParams = null, CancellationToken ct = default)
{
var response = await QueryInternalAsync(sql, sqlParams, false, ct).ConfigureAwait(false);
var rowSet = response.Data.RowSet;
if (response.Data.Chunks != null && response.Data.Chunks.Count > 0)
{
rowSet.AddRange(await ChunksDownloader.DownloadAndParseChunksAsync(new ChunksDownloadInfo()
{
ChunkHeaders = response.Data.ChunkHeaders,
Chunks = response.Data.Chunks,
Qrmk = response.Data.Qrmk
}, ct));
}
var result = SnowflakeDataMapper.MapTo<T>(response.Data.RowType, rowSet);
return result;
}
/// <summary>
/// Executes a query, returning the raw data returned by REST API (rows, columns and query information).
/// </summary>
/// <param name="sql">The SQL to execute.</param>
/// <param name="sqlParams">The parameters to use for this command.</param>
/// <param name="describeOnly">Return only columns information.</param>
/// <returns>Rows and columns.</returns>
public async Task<SnowflakeQueryRawResponse> QueryRawResponseAsync(string sql, object sqlParams = null, bool describeOnly = false, CancellationToken ct = default)
{
var response = await QueryInternalAsync(sql, sqlParams, describeOnly, ct).ConfigureAwait(false);
return new SnowflakeQueryRawResponse(response.Data);
}
/// <summary>
/// Cancels running query
/// </summary>
/// <param name="requestId">Request ID to cancel.</param>
public async Task<bool> CancelQueryAsync(string requestId, CancellationToken ct = default)
{
var cancelQueryRequest = _requestBuilder.BuildCancelQueryRequest(requestId);
var response = await _restClient.SendAsync<NullDataResponse>(cancelQueryRequest, ct).ConfigureAwait(false);
if (!response.Success)
throw new SnowflakeException($"Cancelling query failed. Message: {response.Message}", response.Code);
return true;
}
private async Task<QueryExecResponse> QueryInternalAsync(string sql, object sqlParams = null, bool describeOnly = false, CancellationToken ct = default)
{
if (SnowflakeSession == null)
{
await InitNewSessionAsync(ct).ConfigureAwait(false);
}
var queryRequest = _requestBuilder.BuildQueryRequest(sql, sqlParams, describeOnly);
var response = await _restClient.SendAsync<QueryExecResponse>(queryRequest, ct).ConfigureAwait(false);
// Auto renew session, if it's expired
if (response.Code == 390112)
{
await RenewSessionAsync(ct).ConfigureAwait(false);
response = await _restClient.SendAsync<QueryExecResponse>(queryRequest, ct).ConfigureAwait(false);
}
// If query execution takes more than 45 seconds we will get this
if (response.Code == 333334 || response.Code == 333333)
{
response = await RepeatUntilQueryCompleted(response.Data.GetResultUrl, ct);
}
if (!response.Success)
throw new SnowflakeException($"Query execution failed. Message: {response.Message}", response.Code);
return response;
}
private async Task<QueryExecResponse> RepeatUntilQueryCompleted(string getResultUrl, CancellationToken ct = default)
{
var lastResultUrl = getResultUrl;
QueryExecResponse response;
do
{
var getResultRequest = _requestBuilder.BuildGetResultRequest(lastResultUrl);
response = await _restClient.SendAsync<QueryExecResponse>(getResultRequest, ct).ConfigureAwait(false);
if (response.Code == 390112)
{
await RenewSessionAsync(ct).ConfigureAwait(false);
}
else
{
lastResultUrl = response.Data?.GetResultUrl;
}
} while (response.Code == 333334 || response.Code == 333333 || response.Code == 390112);
return response;
}
/// <summary>
/// Closes current Snowflake session.
/// </summary>
/// <returns>True if session was successfully closed.</returns>
public async Task<bool> CloseSessionAsync(CancellationToken ct = default)
{
var closeSessionRequest = _requestBuilder.BuildCloseSessionRequest();
var response = await _restClient.SendAsync<CloseResponse>(closeSessionRequest, ct).ConfigureAwait(false);
SnowflakeSession = null;
_requestBuilder.ClearSessionTokens();
if (!response.Success)
throw new SnowflakeException($"Closing session failed. Message: {response.Message}", response.Code);
return response.Success;
}
/// <summary>
/// Overrides internal HttpClient
/// </summary>
public void SetHttpClient(HttpClient httpClient)
{
if (httpClient == null)
throw new ArgumentException("HttpClient cannot be null.");
_restClient.SetHttpClient(httpClient);
}
}
}