@@ -30,17 +30,19 @@ use databend_common_exception::ErrorCode;
30
30
use databend_common_exception:: Result ;
31
31
use databend_common_license:: license:: Feature ;
32
32
use databend_common_license:: license_manager:: LicenseManagerSwitch ;
33
- use databend_common_meta_client:: ClientHandle ;
34
33
use databend_common_meta_client:: MetaGrpcClient ;
35
34
use databend_common_meta_kvapi:: kvapi:: KVApi ;
36
35
use databend_common_meta_semaphore:: acquirer:: Permit ;
37
36
use databend_common_meta_semaphore:: Semaphore ;
37
+ use databend_common_meta_store:: MetaStore ;
38
+ use databend_common_meta_store:: MetaStoreProvider ;
38
39
use databend_common_meta_types:: MatchSeq ;
39
40
use databend_common_meta_types:: Operation ;
40
41
use databend_common_meta_types:: UpsertKV ;
41
42
use databend_common_sql:: Planner ;
42
43
use databend_common_storage:: DataOperator ;
43
44
use databend_common_tracing:: GlobalLogger ;
45
+ use databend_common_tracing:: PERSISTENT_LOG_SCHEMA_VERSION ;
44
46
use log:: error;
45
47
use log:: info;
46
48
use rand:: random;
@@ -55,14 +57,13 @@ use crate::persistent_log::table_schemas::QueryProfileTable;
55
57
use crate :: sessions:: QueryContext ;
56
58
57
59
pub struct GlobalPersistentLog {
58
- meta_client : Option < Arc < ClientHandle > > ,
60
+ meta_store : MetaStore ,
59
61
interval : u64 ,
60
62
tenant_id : String ,
61
63
node_id : String ,
62
64
cluster_id : String ,
63
65
stage_name : String ,
64
66
initialized : AtomicBool ,
65
- stopped : AtomicBool ,
66
67
tables : Vec < Box < dyn PersistentLogTable > > ,
67
68
retention : usize ,
68
69
}
@@ -79,7 +80,7 @@ impl GlobalPersistentLog {
79
80
let mut tables: Vec < Box < dyn PersistentLogTable > > = vec ! [ ] ;
80
81
81
82
if cfg. log . query . on {
82
- let query_details = QueryDetailsTable ;
83
+ let query_details = QueryDetailsTable :: new ( ) ;
83
84
info ! (
84
85
"Persistent query details table is enabled, persistent_system.{}" ,
85
86
query_details. table_name( )
@@ -88,30 +89,35 @@ impl GlobalPersistentLog {
88
89
}
89
90
90
91
if cfg. log . profile . on {
91
- let profile = QueryProfileTable ;
92
+ let profile = QueryProfileTable :: new ( ) ;
92
93
info ! (
93
94
"Persistent query profile table is enabled, persistent_system.{}" ,
94
95
profile. table_name( )
95
96
) ;
96
97
tables. push ( Box :: new ( profile) ) ;
97
98
}
98
99
99
- let query_log = QueryLogTable ;
100
+ let query_log = QueryLogTable :: new ( ) ;
100
101
info ! (
101
102
"Persistent query log table is enabled, persistent_system.{}" ,
102
103
query_log. table_name( )
103
104
) ;
104
105
tables. push ( Box :: new ( query_log) ) ;
105
106
107
+ let stage_name = format ! (
108
+ "{}_v{}" ,
109
+ cfg. log. persistentlog. stage_name. clone( ) ,
110
+ PERSISTENT_LOG_SCHEMA_VERSION
111
+ ) ;
112
+
106
113
let instance = Arc :: new ( Self {
107
- meta_client : Some ( meta_client) ,
114
+ meta_store : MetaStore :: R ( meta_client) ,
108
115
interval : cfg. log . persistentlog . interval as u64 ,
109
116
tenant_id : cfg. query . tenant_id . tenant_name ( ) . to_string ( ) ,
110
117
node_id : cfg. query . node_id . clone ( ) ,
111
118
cluster_id : cfg. query . cluster_id . clone ( ) ,
112
- stage_name : cfg . log . persistentlog . stage_name . clone ( ) ,
119
+ stage_name,
113
120
initialized : AtomicBool :: new ( false ) ,
114
- stopped : AtomicBool :: new ( false ) ,
115
121
tables,
116
122
retention : cfg. log . persistentlog . retention ,
117
123
} ) ;
@@ -127,21 +133,24 @@ impl GlobalPersistentLog {
127
133
Ok ( ( ) )
128
134
}
129
135
136
+ /// Create a dummy instance of GlobalPersistentLog for testing purposes.
130
137
pub async fn create_dummy ( cfg : & InnerConfig ) -> Result < Self > {
131
138
setup_operator ( ) . await ?;
139
+ let meta_store = MetaStoreProvider :: new ( cfg. meta . to_meta_grpc_client_conf ( ) )
140
+ . create_meta_store ( )
141
+ . await ?;
132
142
Ok ( Self {
133
- meta_client : None ,
143
+ meta_store ,
134
144
interval : cfg. log . persistentlog . interval as u64 ,
135
145
tenant_id : cfg. query . tenant_id . tenant_name ( ) . to_string ( ) ,
136
146
node_id : cfg. query . node_id . clone ( ) ,
137
147
cluster_id : cfg. query . cluster_id . clone ( ) ,
138
148
stage_name : cfg. log . persistentlog . stage_name . clone ( ) ,
139
149
initialized : AtomicBool :: new ( false ) ,
140
- stopped : AtomicBool :: new ( false ) ,
141
150
tables : vec ! [
142
- Box :: new( QueryDetailsTable ) ,
143
- Box :: new( QueryProfileTable ) ,
144
- Box :: new( QueryLogTable ) ,
151
+ Box :: new( QueryDetailsTable :: new ( ) ) ,
152
+ Box :: new( QueryProfileTable :: new ( ) ) ,
153
+ Box :: new( QueryLogTable :: new ( ) ) ,
145
154
] ,
146
155
retention : cfg. log . persistentlog . retention ,
147
156
} )
@@ -157,6 +166,7 @@ impl GlobalPersistentLog {
157
166
158
167
pub async fn work ( & self ) -> Result < ( ) > {
159
168
let mut prepared = false ;
169
+ let meta_key = format ! ( "{}/persistent_log_work" , self . tenant_id) ;
160
170
// Wait all services to be initialized
161
171
loop {
162
172
if !self . initialized . load ( Ordering :: SeqCst ) {
@@ -171,19 +181,10 @@ impl GlobalPersistentLog {
171
181
}
172
182
} ) ;
173
183
loop {
174
- if self . stopped . load ( Ordering :: SeqCst ) {
175
- return Ok ( ( ) ) ;
176
- }
177
184
// create the stage, database and table if not exists
178
185
// alter the table if schema is changed
179
186
if !prepared {
180
- let prepare_guard = self
181
- . acquire (
182
- & format ! ( "{}/persistent_log_prepare" , self . tenant_id) ,
183
- self . interval ,
184
- 0 ,
185
- )
186
- . await ?;
187
+ let prepare_guard = self . acquire ( & meta_key, self . interval , 0 ) . await ?;
187
188
match self . prepare ( ) . await {
188
189
Ok ( _) => {
189
190
info ! ( "Persistent log prepared successfully" ) ;
@@ -195,13 +196,21 @@ impl GlobalPersistentLog {
195
196
}
196
197
drop ( prepare_guard) ;
197
198
}
198
- let meta_key = format ! ( "{}/persistent_log_work" , self . tenant_id) ;
199
199
let may_permit = self
200
200
. acquire ( & meta_key, self . interval , self . interval )
201
201
. await ?;
202
202
if let Some ( guard) = may_permit {
203
203
if let Err ( e) = self . do_copy_into ( ) . await {
204
204
error ! ( "Persistent log copy into failed: {:?}" , e) ;
205
+ let latest_version = self . get_version_from_meta ( ) . await ?;
206
+ if let Some ( version) = latest_version {
207
+ if version > PERSISTENT_LOG_SCHEMA_VERSION as u64 {
208
+ info ! ( "Persistent log tables enable version suffix" ) ;
209
+ for table in & self . tables {
210
+ table. enable_version_suffix ( ) ;
211
+ }
212
+ }
213
+ }
205
214
}
206
215
self . finish_hook ( & meta_key) . await ?;
207
216
drop ( guard) ;
@@ -222,8 +231,12 @@ impl GlobalPersistentLog {
222
231
lease : u64 ,
223
232
interval : u64 ,
224
233
) -> Result < Option < Permit > > {
234
+ let meta_client = match & self . meta_store {
235
+ MetaStore :: R ( handle) => handle. clone ( ) ,
236
+ _ => unreachable ! ( "Metastore::L should only used for testing" ) ,
237
+ } ;
225
238
let acquired_guard = Semaphore :: new_acquired (
226
- self . meta_client . clone ( ) . unwrap ( ) ,
239
+ meta_client,
227
240
meta_key,
228
241
1 ,
229
242
self . node_id . clone ( ) ,
@@ -235,9 +248,7 @@ impl GlobalPersistentLog {
235
248
return Ok ( Some ( acquired_guard) ) ;
236
249
}
237
250
if match self
238
- . meta_client
239
- . clone ( )
240
- . unwrap ( )
251
+ . meta_store
241
252
. get_kv ( & format ! ( "{}/last_timestamp" , meta_key) )
242
253
. await ?
243
254
{
@@ -257,9 +268,7 @@ impl GlobalPersistentLog {
257
268
}
258
269
259
270
pub async fn finish_hook ( & self , meta_key : & str ) -> Result < ( ) > {
260
- self . meta_client
261
- . clone ( )
262
- . unwrap ( )
271
+ self . meta_store
263
272
. upsert_kv ( UpsertKV :: new (
264
273
format ! ( "{}/last_timestamp" , meta_key) ,
265
274
MatchSeq :: Any ,
@@ -299,31 +308,47 @@ impl GlobalPersistentLog {
299
308
let create_db = "CREATE DATABASE IF NOT EXISTS persistent_system" ;
300
309
self . execute_sql ( create_db) . await ?;
301
310
302
- for table in & self . tables {
303
- let session = create_session ( & self . tenant_id , & self . cluster_id ) . await ?;
304
- let context = session. create_query_context ( ) . await ?;
305
- let table_name = table. table_name ( ) ;
306
- let old_table = context
307
- . get_table ( CATALOG_DEFAULT , "persistent_system" , table_name)
308
- . await ;
309
- if old_table. is_ok ( ) {
310
- let old_schema = old_table?. schema ( ) ;
311
- if !table. schema_equal ( old_schema) {
312
- let rename_target =
313
- format ! ( "`{}_old_{}`" , table_name, chrono:: Local :: now( ) . timestamp( ) ) ;
314
- let rename = format ! (
315
- "ALTER TABLE persistent_system.{} RENAME TO {}" ,
316
- table_name, rename_target
311
+ let session = create_session ( & self . tenant_id , & self . cluster_id ) . await ?;
312
+ let context = session. create_query_context ( ) . await ?;
313
+ if let Some ( version) = self . get_version_from_meta ( ) . await ? {
314
+ if version > PERSISTENT_LOG_SCHEMA_VERSION as u64 {
315
+ // older version node need put the logs into the table has version suffix
316
+ for table in & self . tables {
317
+ table. enable_version_suffix ( ) ;
318
+ }
319
+ return Ok ( ( ) ) ;
320
+ }
321
+ let mut need_rename = false ;
322
+ for table in & self . tables {
323
+ let old_table = context
324
+ . get_table ( CATALOG_DEFAULT , "persistent_system" , & table. table_name ( ) )
325
+ . await ;
326
+ if old_table. is_ok ( ) {
327
+ let old_schema = old_table?. schema ( ) ;
328
+ if !table. schema_equal ( old_schema) {
329
+ need_rename = true ;
330
+ }
331
+ }
332
+ }
333
+ if need_rename {
334
+ for table in & self . tables {
335
+ let old_table_name = format ! ( "`{}_v{}`" , table. table_name( ) , version) ;
336
+ let rename_sql = format ! (
337
+ "ALTER TABLE IF EXISTS persistent_system.{} RENAME TO {}" ,
338
+ table. table_name( ) ,
339
+ old_table_name
317
340
) ;
318
- info ! ( "Persistent log table already exists, but schema is different, renaming to {}" , rename_target) ;
319
- self . execute_sql ( & rename) . await ?;
341
+ self . execute_sql ( & rename_sql) . await ?;
320
342
}
321
- } else {
322
- info ! ( "Persistent log table {} not exists, creating" , table_name) ;
323
343
}
344
+ }
345
+ self . set_version_to_meta ( PERSISTENT_LOG_SCHEMA_VERSION )
346
+ . await ?;
347
+ for table in & self . tables {
324
348
let create_table = table. create_table_sql ( ) ;
325
349
self . execute_sql ( & create_table) . await ?;
326
350
}
351
+
327
352
Ok ( ( ) )
328
353
}
329
354
@@ -392,8 +417,30 @@ impl GlobalPersistentLog {
392
417
Ok ( ( ) )
393
418
}
394
419
395
- pub fn stop ( & self ) {
396
- self . stopped . store ( true , Ordering :: SeqCst ) ;
420
+ pub async fn get_version_from_meta ( & self ) -> Result < Option < u64 > > {
421
+ match self
422
+ . meta_store
423
+ . get_kv ( & format ! ( "{}/persistent_log_work/version" , self . tenant_id) )
424
+ . await ?
425
+ {
426
+ Some ( v) => {
427
+ let version: u64 = serde_json:: from_slice ( & v. data ) ?;
428
+ Ok ( Some ( version) )
429
+ }
430
+ None => Ok ( None ) ,
431
+ }
432
+ }
433
+
434
+ pub async fn set_version_to_meta ( & self , version : usize ) -> Result < ( ) > {
435
+ self . meta_store
436
+ . upsert_kv ( UpsertKV :: new (
437
+ & format ! ( "{}/persistent_log_work/version" , self . tenant_id) ,
438
+ MatchSeq :: Any ,
439
+ Operation :: Update ( serde_json:: to_vec ( & version) ?) ,
440
+ None ,
441
+ ) )
442
+ . await ?;
443
+ Ok ( ( ) )
397
444
}
398
445
}
399
446
0 commit comments