-
Notifications
You must be signed in to change notification settings - Fork 244
/
Copy pathRawDb.cs
351 lines (280 loc) · 11.9 KB
/
RawDb.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Memory;
using Npgsql;
// ReSharper disable UseAwaitUsing
namespace PlatformBenchmarks
{
public class RawDb
{
private readonly ConcurrentRandom _random;
private readonly MemoryCache _cache
= new(new MemoryCacheOptions { ExpirationScanFrequency = TimeSpan.FromMinutes(60) });
#if NET7_0_OR_GREATER
private readonly NpgsqlDataSource _dataSource, _updateDataSource, _readOnlyDataSource;
#else
private readonly string _connectionString;
#endif
public RawDb(ConcurrentRandom random, AppSettings appSettings)
{
_random = random;
#if NET7_0_OR_GREATER
_dataSource = NpgsqlDataSource.Create(appSettings.ConnectionString);
// For the update benchmark, we use two different connection pools for read/update, to avoid head-of-line
// perf issues.
var dataSourceBuilder = new NpgsqlDataSourceBuilder(appSettings.ConnectionString);
dataSourceBuilder.ConnectionStringBuilder.MaxPoolSize = 18;
_readOnlyDataSource = dataSourceBuilder.Build();
dataSourceBuilder.ConnectionStringBuilder.MaxPoolSize = 9;
_updateDataSource = dataSourceBuilder.Build();
#else
_connectionString = appSettings.ConnectionString;
#endif
}
public async Task<World> LoadSingleQueryRow()
{
using var db = CreateConnection();
await db.OpenAsync();
var (cmd, _) = CreateReadCommand(db);
using var command = cmd;
return await ReadSingleRow(cmd);
}
public Task<CachedWorld[]> LoadCachedQueries(int count)
{
var result = new CachedWorld[count];
var cacheKeys = _cacheKeys;
var cache = _cache;
var random = _random;
for (var i = 0; i < result.Length; i++)
{
var id = random.Next(1, 10001);
var key = cacheKeys[id];
if (cache.TryGetValue(key, out var cached))
{
result[i] = (CachedWorld)cached;
}
else
{
return LoadUncachedQueries(id, i, count, this, result);
}
}
return Task.FromResult(result);
static async Task<CachedWorld[]> LoadUncachedQueries(int id, int i, int count, RawDb rawdb, CachedWorld[] result)
{
using var db = rawdb.CreateConnection();
await db.OpenAsync();
var (cmd, idParameter) = rawdb.CreateReadCommand(db);
using var command = cmd;
async Task<CachedWorld> create(ICacheEntry _) => await ReadSingleRow(cmd);
var cacheKeys = _cacheKeys;
var key = cacheKeys[id];
idParameter.TypedValue = id;
for (; i < result.Length; i++)
{
result[i] = await rawdb._cache.GetOrCreateAsync(key, create);
id = rawdb._random.Next(1, 10001);
idParameter.TypedValue = id;
key = cacheKeys[id];
}
return result;
}
}
public async Task PopulateCache()
{
using var db = CreateConnection();
await db.OpenAsync();
var (cmd, idParameter) = CreateReadCommand(db);
using var command = cmd;
var cacheKeys = _cacheKeys;
var cache = _cache;
for (var i = 1; i < 10001; i++)
{
idParameter.TypedValue = i;
cache.Set<CachedWorld>(cacheKeys[i], await ReadSingleRow(cmd));
}
Console.WriteLine("Caching Populated");
}
#if NET7_0_OR_GREATER
public async Task<World[]> LoadMultipleQueriesRows(int count)
{
var results = new World[count];
using var connection = await _dataSource.OpenConnectionAsync();
using var batch = new NpgsqlBatch(connection)
{
// Inserts a PG Sync message between each statement in the batch, required for compliance with
// TechEmpower general test requirement 7
// https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview
EnableErrorBarriers = true
};
for (var i = 0; i < count; i++)
{
batch.BatchCommands.Add(new()
{
CommandText = "SELECT id, randomnumber FROM world WHERE id = $1",
Parameters = { new NpgsqlParameter<int> { TypedValue = _random.Next(1, 10001) } }
});
}
using var reader = await batch.ExecuteReaderAsync();
for (var i = 0; i < count; i++)
{
await reader.ReadAsync();
results[i] = new World { Id = reader.GetInt32(0), RandomNumber = reader.GetInt32(1) };
await reader.NextResultAsync();
}
return results;
}
#else
public async Task<World[]> LoadMultipleQueriesRows(int count)
{
var results = new World[count];
using var db = CreateConnection();
await db.OpenAsync();
var (cmd, idParameter) = CreateReadCommand(db);
using var command = cmd;
for (var i = 0; i < results.Length; i++)
{
results[i] = await ReadSingleRow(cmd);
idParameter.TypedValue = _random.Next(1, 10001);
}
return results;
}
#endif
#if NET7_0_OR_GREATER
public async Task<World[]> LoadMultipleUpdatesRows(int count)
{
var results = new World[count];
using (var connection = await _readOnlyDataSource.OpenConnectionAsync())
using (var batch = new NpgsqlBatch(connection))
{
// Inserts a PG Sync message between each statement in the batch, required for compliance with
// TechEmpower general test requirement 7
// https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview
batch.EnableErrorBarriers = true;
for (var i = 0; i < count; i++)
{
batch.BatchCommands.Add(new()
{
CommandText = "SELECT id, randomnumber FROM world WHERE id = $1",
Parameters = { new NpgsqlParameter<int> { TypedValue = _random.Next(1, 10001) } }
});
}
using var reader = await batch.ExecuteReaderAsync();
for (var i = 0; i < count; i++)
{
await reader.ReadAsync();
results[i] = new World { Id = reader.GetInt32(0), RandomNumber = reader.GetInt32(1) };
await reader.NextResultAsync();
}
}
using (var connection = await _updateDataSource.OpenConnectionAsync())
using (var updateCmd = new NpgsqlCommand(BatchUpdateString.Query(count), connection))
{
for (var i = 0; i < results.Length; i++)
{
var randomNumber = _random.Next(1, 10001);
updateCmd.Parameters.Add(new NpgsqlParameter<int> { TypedValue = results[i].Id });
updateCmd.Parameters.Add(new NpgsqlParameter<int> { TypedValue = randomNumber });
results[i].RandomNumber = randomNumber;
}
await updateCmd.ExecuteNonQueryAsync();
}
return results;
}
#else
public async Task<World[]> LoadMultipleUpdatesRows(int count)
{
var results = new World[count];
using var connection = CreateConnection();
await connection.OpenAsync();
var (queryCmd, queryParameter) = CreateReadCommand(connection);
using (queryCmd)
{
for (var i = 0; i < results.Length; i++)
{
results[i] = await ReadSingleRow(queryCmd);
queryParameter.TypedValue = _random.Next(1, 10001);
}
}
using (var updateCmd = new NpgsqlCommand(BatchUpdateString.Query(count), connection))
{
for (var i = 0; i < results.Length; i++)
{
var randomNumber = _random.Next(1, 10001);
updateCmd.Parameters.Add(new NpgsqlParameter<int> { TypedValue = results[i].Id });
updateCmd.Parameters.Add(new NpgsqlParameter<int> { TypedValue = randomNumber });
results[i].RandomNumber = randomNumber;
}
await updateCmd.ExecuteNonQueryAsync();
}
return results;
}
#endif
public async Task<List<Fortune>> LoadFortunesRows()
{
// Benchmark requirements explicitly prohibit pre-initializing the list size
var result = new List<Fortune>();
using (var db = CreateConnection())
{
await db.OpenAsync();
using var cmd = new NpgsqlCommand("SELECT id, message FROM fortune", db);
using var rdr = await cmd.ExecuteReaderAsync();
while (await rdr.ReadAsync())
{
result.Add(new Fortune
(
id: rdr.GetInt32(0),
message: rdr.GetFieldValue<byte[]>(1)
));
}
}
result.Add(new Fortune(id: 0, AdditionalFortune));
result.Sort();
return result;
}
private readonly byte[] AdditionalFortune = "Additional fortune added at request time."u8.ToArray();
private (NpgsqlCommand readCmd, NpgsqlParameter<int> idParameter) CreateReadCommand(NpgsqlConnection connection)
{
var cmd = new NpgsqlCommand("SELECT id, randomnumber FROM world WHERE id = $1", connection);
var parameter = new NpgsqlParameter<int> { TypedValue = _random.Next(1, 10001) };
cmd.Parameters.Add(parameter);
return (cmd, parameter);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static async Task<World> ReadSingleRow(NpgsqlCommand cmd)
{
using var rdr = await cmd.ExecuteReaderAsync(System.Data.CommandBehavior.SingleRow);
await rdr.ReadAsync();
return new World
{
Id = rdr.GetInt32(0),
RandomNumber = rdr.GetInt32(1)
};
}
private NpgsqlConnection CreateConnection()
#if NET7_0_OR_GREATER
=> _dataSource.CreateConnection();
#else
=> new(_connectionString);
#endif
private static readonly object[] _cacheKeys = Enumerable.Range(0, 10001).Select((i) => new CacheKey(i)).ToArray();
public sealed class CacheKey : IEquatable<CacheKey>
{
private readonly int _value;
public CacheKey(int value)
=> _value = value;
public bool Equals(CacheKey key)
=> key._value == _value;
public override bool Equals(object obj)
=> ReferenceEquals(obj, this);
public override int GetHashCode()
=> _value;
public override string ToString()
=> _value.ToString();
}
}
}