13
13
// limitations under the License.
14
14
15
15
use std:: collections:: BTreeMap ;
16
+ use std:: collections:: HashSet ;
16
17
use std:: fmt:: Display ;
17
18
use std:: sync:: Arc ;
18
19
@@ -41,6 +42,7 @@ use common_meta_app::app_error::UnknownDatabaseId;
41
42
use common_meta_app:: app_error:: UnknownIndex ;
42
43
use common_meta_app:: app_error:: UnknownTable ;
43
44
use common_meta_app:: app_error:: UnknownTableId ;
45
+ use common_meta_app:: app_error:: VirtualColumnNotExist ;
44
46
use common_meta_app:: app_error:: WrongShare ;
45
47
use common_meta_app:: app_error:: WrongShareObject ;
46
48
use common_meta_app:: schema:: CountTablesKey ;
@@ -54,6 +56,8 @@ use common_meta_app::schema::CreateTableLockRevReply;
54
56
use common_meta_app:: schema:: CreateTableLockRevReq ;
55
57
use common_meta_app:: schema:: CreateTableReply ;
56
58
use common_meta_app:: schema:: CreateTableReq ;
59
+ use common_meta_app:: schema:: CreateVirtualColumnReply ;
60
+ use common_meta_app:: schema:: CreateVirtualColumnReq ;
57
61
use common_meta_app:: schema:: DBIdTableName ;
58
62
use common_meta_app:: schema:: DatabaseId ;
59
63
use common_meta_app:: schema:: DatabaseIdToName ;
@@ -71,6 +75,8 @@ use common_meta_app::schema::DropIndexReply;
71
75
use common_meta_app:: schema:: DropIndexReq ;
72
76
use common_meta_app:: schema:: DropTableByIdReq ;
73
77
use common_meta_app:: schema:: DropTableReply ;
78
+ use common_meta_app:: schema:: DropVirtualColumnReply ;
79
+ use common_meta_app:: schema:: DropVirtualColumnReq ;
74
80
use common_meta_app:: schema:: EmptyProto ;
75
81
use common_meta_app:: schema:: ExtendTableLockRevReq ;
76
82
use common_meta_app:: schema:: GetDatabaseReq ;
@@ -85,6 +91,7 @@ use common_meta_app::schema::ListDatabaseReq;
85
91
use common_meta_app:: schema:: ListIndexesReq ;
86
92
use common_meta_app:: schema:: ListTableLockRevReq ;
87
93
use common_meta_app:: schema:: ListTableReq ;
94
+ use common_meta_app:: schema:: ListVirtualColumnsReq ;
88
95
use common_meta_app:: schema:: RenameDatabaseReply ;
89
96
use common_meta_app:: schema:: RenameDatabaseReq ;
90
97
use common_meta_app:: schema:: RenameTableReply ;
@@ -111,6 +118,8 @@ use common_meta_app::schema::UpdateTableMetaReq;
111
118
use common_meta_app:: schema:: UpsertTableCopiedFileReq ;
112
119
use common_meta_app:: schema:: UpsertTableOptionReply ;
113
120
use common_meta_app:: schema:: UpsertTableOptionReq ;
121
+ use common_meta_app:: schema:: VirtualColumnMeta ;
122
+ use common_meta_app:: schema:: VirtualColumnNameIdent ;
114
123
use common_meta_app:: share:: ShareGrantObject ;
115
124
use common_meta_app:: share:: ShareId ;
116
125
use common_meta_app:: share:: ShareNameIdent ;
@@ -1071,6 +1080,195 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
1071
1080
Ok ( index_metas)
1072
1081
}
1073
1082
1083
+ // virtual column
1084
+
1085
+ async fn create_virtual_column (
1086
+ & self ,
1087
+ req : CreateVirtualColumnReq ,
1088
+ ) -> Result < CreateVirtualColumnReply , KVAppError > {
1089
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1090
+
1091
+ let mut retry = 0 ;
1092
+ while retry < TXN_MAX_RETRY_TIMES {
1093
+ retry += 1 ;
1094
+
1095
+ let ( _, old_virtual_column_opt) : ( _ , Option < VirtualColumnMeta > ) =
1096
+ get_pb_value ( self , & req. name_ident ) . await ?;
1097
+
1098
+ // If virtual columns already exist, merge them together
1099
+ let virtual_column_meta = if let Some ( old_virtual_column) = old_virtual_column_opt {
1100
+ let mut virtual_columns = req. virtual_columns . clone ( ) ;
1101
+ virtual_columns. extend ( old_virtual_column. virtual_columns . clone ( ) ) ;
1102
+
1103
+ let virtual_columns_set: HashSet < String > =
1104
+ HashSet :: from_iter ( virtual_columns. into_iter ( ) ) ;
1105
+ let virtual_columns = Vec :: from_iter ( virtual_columns_set. into_iter ( ) ) ;
1106
+
1107
+ VirtualColumnMeta {
1108
+ table_id : req. name_ident . table_id ,
1109
+ virtual_columns,
1110
+ created_on : old_virtual_column. created_on . clone ( ) ,
1111
+ updated_on : Some ( Utc :: now ( ) ) ,
1112
+ drop_on : old_virtual_column. drop_on . clone ( ) ,
1113
+ }
1114
+ } else {
1115
+ VirtualColumnMeta {
1116
+ table_id : req. name_ident . table_id ,
1117
+ virtual_columns : req. virtual_columns . clone ( ) ,
1118
+ created_on : Utc :: now ( ) ,
1119
+ updated_on : None ,
1120
+ drop_on : None ,
1121
+ }
1122
+ } ;
1123
+
1124
+ // Create virtual column by inserting these record:
1125
+ // (tenant, table_id) -> virtual_column_meta
1126
+ {
1127
+ let condition = vec ! [ ] ;
1128
+ let if_then = vec ! [
1129
+ txn_op_put( & req. name_ident, serialize_struct( & virtual_column_meta) ?) , /* (tenant, table_id) -> virtual_column_meta */
1130
+ ] ;
1131
+
1132
+ let txn_req = TxnRequest {
1133
+ condition,
1134
+ if_then,
1135
+ else_then : vec ! [ ] ,
1136
+ } ;
1137
+
1138
+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1139
+
1140
+ debug ! (
1141
+ req. name_ident = debug( & virtual_column_meta) ,
1142
+ succ = display( succ) ,
1143
+ "create_virtual_column"
1144
+ ) ;
1145
+
1146
+ if succ {
1147
+ return Ok ( CreateVirtualColumnReply { } ) ;
1148
+ }
1149
+ }
1150
+ }
1151
+
1152
+ Err ( KVAppError :: AppError ( AppError :: TxnRetryMaxTimes (
1153
+ TxnRetryMaxTimes :: new ( "create_virtual_column" , TXN_MAX_RETRY_TIMES ) ,
1154
+ ) ) )
1155
+ }
1156
+
1157
+ async fn drop_virtual_column (
1158
+ & self ,
1159
+ req : DropVirtualColumnReq ,
1160
+ ) -> Result < DropVirtualColumnReply , KVAppError > {
1161
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1162
+
1163
+ let mut retry = 0 ;
1164
+ while retry < TXN_MAX_RETRY_TIMES {
1165
+ retry += 1 ;
1166
+
1167
+ let ( _, old_virtual_column_opt) : ( _ , Option < VirtualColumnMeta > ) =
1168
+ get_pb_value ( self , & req. name_ident ) . await ?;
1169
+
1170
+ // If virtual columns already exist, merge them together
1171
+ let virtual_column_meta = if let Some ( old_virtual_column) = old_virtual_column_opt {
1172
+ let mut virtual_columns_set: HashSet < String > = HashSet :: from_iter (
1173
+ old_virtual_column
1174
+ . virtual_columns
1175
+ . iter ( )
1176
+ . cloned ( )
1177
+ . collect :: < Vec < _ > > ( ) ,
1178
+ ) ;
1179
+ for remove_virtual_column in & req. virtual_columns {
1180
+ virtual_columns_set. remove ( remove_virtual_column) ;
1181
+ }
1182
+ let virtual_columns = Vec :: from_iter ( virtual_columns_set. into_iter ( ) ) ;
1183
+
1184
+ VirtualColumnMeta {
1185
+ table_id : req. name_ident . table_id ,
1186
+ virtual_columns,
1187
+ created_on : old_virtual_column. created_on . clone ( ) ,
1188
+ updated_on : old_virtual_column. updated_on . clone ( ) ,
1189
+ drop_on : Some ( Utc :: now ( ) ) ,
1190
+ }
1191
+ } else {
1192
+ return Err ( KVAppError :: AppError ( AppError :: VirtualColumnNotExist (
1193
+ VirtualColumnNotExist :: new (
1194
+ * & req. name_ident . table_id ,
1195
+ format ! (
1196
+ "drop virtual column with tenant: {} table_id: {}" ,
1197
+ req. name_ident. tenant, req. name_ident. table_id
1198
+ ) ,
1199
+ ) ,
1200
+ ) ) ) ;
1201
+ } ;
1202
+
1203
+ // Create virtual column by inserting these record:
1204
+ // (tenant, table_id) -> virtual_column_meta
1205
+ {
1206
+ let condition = vec ! [ ] ;
1207
+ let if_then = vec ! [
1208
+ txn_op_put( & req. name_ident, serialize_struct( & virtual_column_meta) ?) , /* (tenant, table_id) -> virtual_column_meta */
1209
+ ] ;
1210
+
1211
+ let txn_req = TxnRequest {
1212
+ condition,
1213
+ if_then,
1214
+ else_then : vec ! [ ] ,
1215
+ } ;
1216
+
1217
+ let ( succ, _responses) = send_txn ( self , txn_req) . await ?;
1218
+
1219
+ debug ! (
1220
+ req. name_ident = debug( & virtual_column_meta) ,
1221
+ succ = display( succ) ,
1222
+ "drop_virtual_column"
1223
+ ) ;
1224
+
1225
+ if succ {
1226
+ return Ok ( DropVirtualColumnReply { } ) ;
1227
+ }
1228
+ }
1229
+ }
1230
+
1231
+ Err ( KVAppError :: AppError ( AppError :: TxnRetryMaxTimes (
1232
+ TxnRetryMaxTimes :: new ( "drop_virtual_column" , TXN_MAX_RETRY_TIMES ) ,
1233
+ ) ) )
1234
+ }
1235
+
1236
+ async fn list_virtual_columns (
1237
+ & self ,
1238
+ req : ListVirtualColumnsReq ,
1239
+ ) -> Result < Vec < VirtualColumnMeta > , KVAppError > {
1240
+ debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
1241
+
1242
+ if let Some ( table_id) = req. table_id {
1243
+ let name_ident = VirtualColumnNameIdent {
1244
+ tenant : req. tenant . clone ( ) ,
1245
+ table_id,
1246
+ } ;
1247
+ let ( _, virtual_column_opt) : ( _ , Option < VirtualColumnMeta > ) =
1248
+ get_pb_value ( self , & name_ident) . await ?;
1249
+
1250
+ if let Some ( virtual_column) = virtual_column_opt {
1251
+ return Ok ( vec ! [ virtual_column] ) ;
1252
+ } else {
1253
+ return Ok ( vec ! [ ] ) ;
1254
+ }
1255
+ }
1256
+
1257
+ // Get virtual columns list by `prefix_list` "<prefix>/<tenant>"
1258
+ let prefix_key = kvapi:: KeyBuilder :: new_prefixed ( VirtualColumnNameIdent :: PREFIX )
1259
+ . push_str ( & req. tenant )
1260
+ . done ( ) ;
1261
+
1262
+ let list = self . prefix_list_kv ( & prefix_key) . await ?;
1263
+ let mut virtual_column_list = Vec :: with_capacity ( list. len ( ) ) ;
1264
+ for ( _, seq) in list. iter ( ) {
1265
+ let virtual_column_meta: VirtualColumnMeta = deserialize_struct ( & seq. data ) ?;
1266
+ virtual_column_list. push ( virtual_column_meta) ;
1267
+ }
1268
+
1269
+ Ok ( virtual_column_list)
1270
+ }
1271
+
1074
1272
#[ tracing:: instrument( level = "debug" , ret, err, skip_all) ]
1075
1273
async fn create_table ( & self , req : CreateTableReq ) -> Result < CreateTableReply , KVAppError > {
1076
1274
debug ! ( req = debug( & req) , "SchemaApi: {}" , func_name!( ) ) ;
0 commit comments