@@ -41,6 +41,7 @@ use common_meta_app::app_error::UnknownDatabaseId;
41
41
use common_meta_app:: app_error:: UnknownIndex ;
42
42
use common_meta_app:: app_error:: UnknownTable ;
43
43
use common_meta_app:: app_error:: UnknownTableId ;
44
+ use common_meta_app:: app_error:: VirtualColumnAlreadyExists ;
44
45
use common_meta_app:: app_error:: WrongShare ;
45
46
use common_meta_app:: app_error:: WrongShareObject ;
46
47
use common_meta_app:: schema:: CountTablesKey ;
@@ -54,6 +55,8 @@ use common_meta_app::schema::CreateTableLockRevReply;
54
55
use common_meta_app:: schema:: CreateTableLockRevReq ;
55
56
use common_meta_app:: schema:: CreateTableReply ;
56
57
use common_meta_app:: schema:: CreateTableReq ;
58
+ use common_meta_app:: schema:: CreateVirtualColumnReply ;
59
+ use common_meta_app:: schema:: CreateVirtualColumnReq ;
57
60
use common_meta_app:: schema:: DBIdTableName ;
58
61
use common_meta_app:: schema:: DatabaseId ;
59
62
use common_meta_app:: schema:: DatabaseIdToName ;
@@ -71,6 +74,8 @@ use common_meta_app::schema::DropIndexReply;
71
74
use common_meta_app:: schema:: DropIndexReq ;
72
75
use common_meta_app:: schema:: DropTableByIdReq ;
73
76
use common_meta_app:: schema:: DropTableReply ;
77
+ use common_meta_app:: schema:: DropVirtualColumnReply ;
78
+ use common_meta_app:: schema:: DropVirtualColumnReq ;
74
79
use common_meta_app:: schema:: EmptyProto ;
75
80
use common_meta_app:: schema:: ExtendTableLockRevReq ;
76
81
use common_meta_app:: schema:: GetDatabaseReq ;
@@ -85,6 +90,7 @@ use common_meta_app::schema::ListDatabaseReq;
85
90
use common_meta_app:: schema:: ListIndexesReq ;
86
91
use common_meta_app:: schema:: ListTableLockRevReq ;
87
92
use common_meta_app:: schema:: ListTableReq ;
93
+ use common_meta_app:: schema:: ListVirtualColumnsReq ;
88
94
use common_meta_app:: schema:: RenameDatabaseReply ;
89
95
use common_meta_app:: schema:: RenameDatabaseReq ;
90
96
use common_meta_app:: schema:: RenameTableReply ;
@@ -108,9 +114,13 @@ use common_meta_app::schema::UndropTableReply;
108
114
use common_meta_app:: schema:: UndropTableReq ;
109
115
use common_meta_app:: schema:: UpdateTableMetaReply ;
110
116
use common_meta_app:: schema:: UpdateTableMetaReq ;
117
+ use common_meta_app:: schema:: UpdateVirtualColumnReply ;
118
+ use common_meta_app:: schema:: UpdateVirtualColumnReq ;
111
119
use common_meta_app:: schema:: UpsertTableCopiedFileReq ;
112
120
use common_meta_app:: schema:: UpsertTableOptionReply ;
113
121
use common_meta_app:: schema:: UpsertTableOptionReq ;
122
+ use common_meta_app:: schema:: VirtualColumnMeta ;
123
+ use common_meta_app:: schema:: VirtualColumnNameIdent ;
114
124
use common_meta_app:: share:: ShareGrantObject ;
115
125
use common_meta_app:: share:: ShareId ;
116
126
use common_meta_app:: share:: ShareNameIdent ;
@@ -163,6 +173,7 @@ use crate::util::deserialize_u64;
163
173
use crate :: util:: get_index_metas_by_ids;
164
174
use crate :: util:: get_table_by_id_or_err;
165
175
use crate :: util:: get_table_names_by_ids;
176
+ use crate :: util:: get_virtual_column_by_id_or_err;
166
177
use crate :: util:: list_tables_from_share_db;
167
178
use crate :: util:: list_tables_from_unshare_db;
168
179
use crate :: util:: mget_pb_values;
@@ -1071,6 +1082,201 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
1071
1082
Ok ( index_metas)
1072
1083
}
1073
1084
1085
+ // virtual column
1086
+
1087
+ async fn create_virtual_column (
1088
+ & self ,
1089
+ req : CreateVirtualColumnReq ,
1090
+ ) -> Result < CreateVirtualColumnReply , KVAppError > {
1091
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1092
+
1093
+ let ctx = & func_name ! ( ) ;
1094
+ let mut trials = txn_trials ( None , ctx) ;
1095
+ loop {
1096
+ trials. next ( ) . unwrap ( ) ?;
1097
+
1098
+ let ( _, old_virtual_column_opt) : ( _ , Option < VirtualColumnMeta > ) =
1099
+ get_pb_value ( self , & req. name_ident ) . await ?;
1100
+
1101
+ if old_virtual_column_opt. is_some ( ) {
1102
+ return Err ( KVAppError :: AppError ( AppError :: VirtualColumnAlreadyExists (
1103
+ VirtualColumnAlreadyExists :: new (
1104
+ req. name_ident . table_id ,
1105
+ format ! (
1106
+ "create virtual column with tenant: {} table_id: {}" ,
1107
+ req. name_ident. tenant, req. name_ident. table_id
1108
+ ) ,
1109
+ ) ,
1110
+ ) ) ) ;
1111
+ }
1112
+ let virtual_column_meta = VirtualColumnMeta {
1113
+ table_id : req. name_ident . table_id ,
1114
+ virtual_columns : req. virtual_columns . clone ( ) ,
1115
+ created_on : Utc :: now ( ) ,
1116
+ updated_on : None ,
1117
+ } ;
1118
+
1119
+ // Create virtual column by inserting this record:
1120
+ // (tenant, table_id) -> virtual_column_meta
1121
+ {
1122
+ let condition = vec ! [ txn_cond_seq( & req. name_ident, Eq , 0 ) ] ;
1123
+ let if_then = vec ! [ txn_op_put(
1124
+ & req. name_ident,
1125
+ serialize_struct( & virtual_column_meta) ?,
1126
+ ) ] ;
1127
+
1128
+ let txn_req = TxnRequest {
1129
+ condition,
1130
+ if_then,
1131
+ else_then : vec ! [ ] ,
1132
+ } ;
1133
+
1134
+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1135
+
1136
+ debug ! (
1137
+ req. name_ident = debug( & virtual_column_meta) ,
1138
+ succ = display( succ) ,
1139
+ "create_virtual_column"
1140
+ ) ;
1141
+
1142
+ if succ {
1143
+ break ;
1144
+ }
1145
+ }
1146
+ }
1147
+
1148
+ Ok ( CreateVirtualColumnReply { } )
1149
+ }
1150
+
1151
+ async fn update_virtual_column (
1152
+ & self ,
1153
+ req : UpdateVirtualColumnReq ,
1154
+ ) -> Result < UpdateVirtualColumnReply , KVAppError > {
1155
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1156
+
1157
+ let ctx = & func_name ! ( ) ;
1158
+ let mut trials = txn_trials ( None , ctx) ;
1159
+ loop {
1160
+ trials. next ( ) . unwrap ( ) ?;
1161
+
1162
+ let ( seq, old_virtual_column_meta) =
1163
+ get_virtual_column_by_id_or_err ( self , & req. name_ident , ctx) . await ?;
1164
+
1165
+ let virtual_column_meta = VirtualColumnMeta {
1166
+ table_id : req. name_ident . table_id ,
1167
+ virtual_columns : req. virtual_columns . clone ( ) ,
1168
+ created_on : old_virtual_column_meta. created_on ,
1169
+ updated_on : Some ( Utc :: now ( ) ) ,
1170
+ } ;
1171
+
1172
+ // Update virtual column by inserting this record:
1173
+ // (tenant, table_id) -> virtual_column_meta
1174
+ {
1175
+ let condition = vec ! [ txn_cond_seq( & req. name_ident, Eq , seq) ] ;
1176
+ let if_then = vec ! [ txn_op_put(
1177
+ & req. name_ident,
1178
+ serialize_struct( & virtual_column_meta) ?,
1179
+ ) ] ;
1180
+
1181
+ let txn_req = TxnRequest {
1182
+ condition,
1183
+ if_then,
1184
+ else_then : vec ! [ ] ,
1185
+ } ;
1186
+
1187
+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1188
+
1189
+ debug ! (
1190
+ req. name_ident = debug( & virtual_column_meta) ,
1191
+ succ = display( succ) ,
1192
+ "update_virtual_column"
1193
+ ) ;
1194
+
1195
+ if succ {
1196
+ break ;
1197
+ }
1198
+ }
1199
+ }
1200
+
1201
+ Ok ( UpdateVirtualColumnReply { } )
1202
+ }
1203
+
1204
+ async fn drop_virtual_column (
1205
+ & self ,
1206
+ req : DropVirtualColumnReq ,
1207
+ ) -> Result < DropVirtualColumnReply , KVAppError > {
1208
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1209
+
1210
+ let ctx = & func_name ! ( ) ;
1211
+ let mut trials = txn_trials ( None , ctx) ;
1212
+ loop {
1213
+ trials. next ( ) . unwrap ( ) ?;
1214
+
1215
+ let ( _, _) = get_virtual_column_by_id_or_err ( self , & req. name_ident , ctx) . await ?;
1216
+
1217
+ // Drop virtual column by deleting this record:
1218
+ // (tenant, table_id) -> virtual_column_meta
1219
+ {
1220
+ let if_then = vec ! [ txn_op_del( & req. name_ident) ] ;
1221
+ let txn_req = TxnRequest {
1222
+ condition : vec ! [ ] ,
1223
+ if_then,
1224
+ else_then : vec ! [ ] ,
1225
+ } ;
1226
+
1227
+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1228
+
1229
+ debug ! (
1230
+ req. name_ident = debug( & req. name_ident) ,
1231
+ succ = display( succ) ,
1232
+ "drop_virtual_column"
1233
+ ) ;
1234
+
1235
+ if succ {
1236
+ break ;
1237
+ }
1238
+ }
1239
+ }
1240
+
1241
+ Ok ( DropVirtualColumnReply { } )
1242
+ }
1243
+
1244
+ async fn list_virtual_columns (
1245
+ & self ,
1246
+ req : ListVirtualColumnsReq ,
1247
+ ) -> Result < Vec < VirtualColumnMeta > , KVAppError > {
1248
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1249
+
1250
+ if let Some ( table_id) = req. table_id {
1251
+ let name_ident = VirtualColumnNameIdent {
1252
+ tenant : req. tenant . clone ( ) ,
1253
+ table_id,
1254
+ } ;
1255
+ let ( _, virtual_column_opt) : ( _ , Option < VirtualColumnMeta > ) =
1256
+ get_pb_value ( self , & name_ident) . await ?;
1257
+
1258
+ if let Some ( virtual_column) = virtual_column_opt {
1259
+ return Ok ( vec ! [ virtual_column] ) ;
1260
+ } else {
1261
+ return Ok ( vec ! [ ] ) ;
1262
+ }
1263
+ }
1264
+
1265
+ // Get virtual columns list by `prefix_list` "<prefix>/<tenant>"
1266
+ let prefix_key = kvapi:: KeyBuilder :: new_prefixed ( VirtualColumnNameIdent :: PREFIX )
1267
+ . push_str ( & req. tenant )
1268
+ . done ( ) ;
1269
+
1270
+ let list = self . prefix_list_kv ( & prefix_key) . await ?;
1271
+ let mut virtual_column_list = Vec :: with_capacity ( list. len ( ) ) ;
1272
+ for ( _, seq) in list. iter ( ) {
1273
+ let virtual_column_meta: VirtualColumnMeta = deserialize_struct ( & seq. data ) ?;
1274
+ virtual_column_list. push ( virtual_column_meta) ;
1275
+ }
1276
+
1277
+ Ok ( virtual_column_list)
1278
+ }
1279
+
1074
1280
#[ tracing:: instrument( level = "debug" , ret, err, skip_all) ]
1075
1281
async fn create_table ( & self , req : CreateTableReq ) -> Result < CreateTableReply , KVAppError > {
1076
1282
debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
0 commit comments