@@ -50,6 +50,20 @@ pub fn build_source_command() -> Command {
50
50
. required ( true ) ,
51
51
] )
52
52
)
53
+ . subcommand (
54
+ Command :: new ( "update" )
55
+ . about ( "Update an existing source." )
56
+ . args ( & [
57
+ arg ! ( --index <INDEX_ID > "ID of the target index" )
58
+ . display_order ( 1 )
59
+ . required ( true ) ,
60
+ arg ! ( --source <SOURCE_ID > "ID of the source" )
61
+ . display_order ( 2 )
62
+ . required ( true ) ,
63
+ arg ! ( --"source-config" <SOURCE_CONFIG > "Path to source config file. Please, refer to the documentation for more details." )
64
+ . required ( true ) ,
65
+ ] )
66
+ )
53
67
. subcommand (
54
68
Command :: new ( "enable" )
55
69
. about ( "Enables a source for an index." )
@@ -147,6 +161,14 @@ pub struct CreateSourceArgs {
147
161
pub source_config_uri : Uri ,
148
162
}
149
163
164
+ #[ derive( Debug , Eq , PartialEq ) ]
165
+ pub struct UpdateSourceArgs {
166
+ pub client_args : ClientArgs ,
167
+ pub index_id : IndexId ,
168
+ pub source_id : SourceId ,
169
+ pub source_config_uri : Uri ,
170
+ }
171
+
150
172
#[ derive( Debug , Eq , PartialEq ) ]
151
173
pub struct ToggleSourceArgs {
152
174
pub client_args : ClientArgs ,
@@ -187,6 +209,7 @@ pub struct ResetCheckpointArgs {
187
209
#[ derive( Debug , Eq , PartialEq ) ]
188
210
pub enum SourceCliCommand {
189
211
CreateSource ( CreateSourceArgs ) ,
212
+ UpdateSource ( UpdateSourceArgs ) ,
190
213
ToggleSource ( ToggleSourceArgs ) ,
191
214
DeleteSource ( DeleteSourceArgs ) ,
192
215
DescribeSource ( DescribeSourceArgs ) ,
@@ -198,6 +221,7 @@ impl SourceCliCommand {
198
221
pub async fn execute ( self ) -> anyhow:: Result < ( ) > {
199
222
match self {
200
223
Self :: CreateSource ( args) => create_source_cli ( args) . await ,
224
+ Self :: UpdateSource ( args) => update_source_cli ( args) . await ,
201
225
Self :: ToggleSource ( args) => toggle_source_cli ( args) . await ,
202
226
Self :: DeleteSource ( args) => delete_source_cli ( args) . await ,
203
227
Self :: DescribeSource ( args) => describe_source_cli ( args) . await ,
@@ -212,6 +236,7 @@ impl SourceCliCommand {
212
236
. context ( "failed to parse source subcommand" ) ?;
213
237
match subcommand. as_str ( ) {
214
238
"create" => Self :: parse_create_args ( submatches) . map ( Self :: CreateSource ) ,
239
+ "update" => Self :: parse_update_args ( submatches) . map ( Self :: UpdateSource ) ,
215
240
"enable" => {
216
241
Self :: parse_toggle_source_args ( & subcommand, submatches) . map ( Self :: ToggleSource )
217
242
}
@@ -244,6 +269,26 @@ impl SourceCliCommand {
244
269
} )
245
270
}
246
271
272
+ fn parse_update_args ( mut matches : ArgMatches ) -> anyhow:: Result < UpdateSourceArgs > {
273
+ let client_args = ClientArgs :: parse ( & mut matches) ?;
274
+ let index_id = matches
275
+ . remove_one :: < String > ( "index" )
276
+ . expect ( "`index` should be a required arg." ) ;
277
+ let source_id = matches
278
+ . remove_one :: < String > ( "source" )
279
+ . expect ( "`source` should be a required arg." ) ;
280
+ let source_config_uri = matches
281
+ . remove_one :: < String > ( "source-config" )
282
+ . map ( |uri_str| Uri :: from_str ( & uri_str) )
283
+ . expect ( "`source-config` should be a required arg." ) ?;
284
+ Ok ( UpdateSourceArgs {
285
+ client_args,
286
+ index_id,
287
+ source_id,
288
+ source_config_uri,
289
+ } )
290
+ }
291
+
247
292
fn parse_toggle_source_args (
248
293
subcommand : & str ,
249
294
mut matches : ArgMatches ,
@@ -342,6 +387,23 @@ async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
342
387
Ok ( ( ) )
343
388
}
344
389
390
+ async fn update_source_cli ( args : UpdateSourceArgs ) -> anyhow:: Result < ( ) > {
391
+ debug ! ( args=?args, "update-source" ) ;
392
+ println ! ( "❯ Updating source..." ) ;
393
+ let storage_resolver = StorageResolver :: unconfigured ( ) ;
394
+ let source_config_content = load_file ( & storage_resolver, & args. source_config_uri ) . await ?;
395
+ let source_config_str: & str = std:: str:: from_utf8 ( & source_config_content)
396
+ . with_context ( || format ! ( "source config is not utf-8: {}" , args. source_config_uri) ) ?;
397
+ let config_format = ConfigFormat :: sniff_from_uri ( & args. source_config_uri ) ?;
398
+ let qw_client = args. client_args . client ( ) ;
399
+ qw_client
400
+ . sources ( & args. index_id )
401
+ . update ( & args. source_id , source_config_str, config_format)
402
+ . await ?;
403
+ println ! ( "{} Source successfully updated." , "✔" . color( GREEN_COLOR ) ) ;
404
+ Ok ( ( ) )
405
+ }
406
+
345
407
async fn toggle_source_cli ( args : ToggleSourceArgs ) -> anyhow:: Result < ( ) > {
346
408
debug ! ( args=?args, "toggle-source" ) ;
347
409
println ! ( "❯ Toggling source..." ) ;
@@ -604,6 +666,32 @@ mod tests {
604
666
assert_eq ! ( command, expected_command) ;
605
667
}
606
668
669
+ #[ test]
670
+ fn test_parse_update_source_args ( ) {
671
+ let app = build_cli ( ) . no_binary_name ( true ) ;
672
+ let matches = app
673
+ . try_get_matches_from ( vec ! [
674
+ "source" ,
675
+ "update" ,
676
+ "--index" ,
677
+ "hdfs-logs" ,
678
+ "--source" ,
679
+ "kafka-foo" ,
680
+ "--source-config" ,
681
+ "/source-conf.yaml" ,
682
+ ] )
683
+ . unwrap ( ) ;
684
+ let command = CliCommand :: parse_cli_args ( matches) . unwrap ( ) ;
685
+ let expected_command =
686
+ CliCommand :: Source ( SourceCliCommand :: UpdateSource ( UpdateSourceArgs {
687
+ client_args : ClientArgs :: default ( ) ,
688
+ index_id : "hdfs-logs" . to_string ( ) ,
689
+ source_id : "kafka-foo" . to_string ( ) ,
690
+ source_config_uri : Uri :: from_str ( "file:///source-conf.yaml" ) . unwrap ( ) ,
691
+ } ) ) ;
692
+ assert_eq ! ( command, expected_command) ;
693
+ }
694
+
607
695
#[ test]
608
696
fn test_parse_toggle_source_args ( ) {
609
697
{
0 commit comments