@@ -11,17 +11,23 @@ mod cmd {
11
11
#[ cfg( feature = "producer-file-io" ) ]
12
12
use std:: path:: PathBuf ;
13
13
14
+ use async_std:: io:: stdin;
14
15
use async_trait:: async_trait;
16
+ use fluvio_future:: io:: StreamExt ;
17
+ use fluvio_sc_schema:: message:: MsgType ;
18
+ use fluvio_sc_schema:: topic:: TopicSpec ;
15
19
#[ cfg( feature = "producer-file-io" ) ]
16
20
use futures:: future:: join_all;
17
21
use clap:: Parser ;
22
+ use tokio:: select;
18
23
use tracing:: { error, warn} ;
19
24
use humantime:: parse_duration;
20
25
use anyhow:: Result ;
21
26
22
27
use fluvio:: {
23
- Compression , Fluvio , FluvioError , TopicProducerPool , TopicProducerConfigBuilder , RecordKey ,
24
- ProduceOutput , DeliverySemantic , SmartModuleContextData , Isolation , SmartModuleInvocation ,
28
+ Compression , DeliverySemantic , Fluvio , FluvioAdmin , FluvioError , Isolation , ProduceOutput ,
29
+ RecordKey , SmartModuleContextData , SmartModuleInvocation , TopicProducerConfigBuilder ,
30
+ TopicProducerPool ,
25
31
} ;
26
32
use fluvio_extension_common:: Terminal ;
27
33
use fluvio_types:: print_cli_ok;
@@ -243,16 +249,18 @@ mod cmd {
243
249
. await ?,
244
250
) ;
245
251
252
+ let admin = fluvio. admin ( ) . await ;
253
+
246
254
#[ cfg( feature = "producer-file-io" ) ]
247
255
if self . raw {
248
256
self . process_raw_file ( & producer) . await ?;
249
257
} else {
250
- self . produce_lines ( producer. clone ( ) ) . await ?;
258
+ self . produce_lines ( producer. clone ( ) , & admin ) . await ?;
251
259
} ;
252
260
253
261
#[ cfg( not( feature = "producer-file-io" ) ) ]
254
262
{
255
- self . produce_lines ( producer. clone ( ) ) . await ?;
263
+ self . produce_lines ( producer. clone ( ) , & admin ) . await ?;
256
264
}
257
265
258
266
producer. flush ( ) . await ?;
@@ -315,7 +323,11 @@ mod cmd {
315
323
}
316
324
}
317
325
318
- async fn produce_lines ( & self , producer : Arc < TopicProducerPool > ) -> Result < ( ) > {
326
+ async fn produce_lines (
327
+ & self ,
328
+ producer : Arc < TopicProducerPool > ,
329
+ admin : & FluvioAdmin ,
330
+ ) -> Result < ( ) > {
319
331
#[ cfg( feature = "producer-file-io" ) ]
320
332
if let Some ( path) = & self . file {
321
333
let reader = BufReader :: new ( File :: open ( path) ?) ;
@@ -340,7 +352,7 @@ mod cmd {
340
352
. collect :: < Result < Vec < _ > , _ > > ( ) ?;
341
353
}
342
354
} else {
343
- self . producer_stdin ( & producer) . await ?
355
+ self . producer_stdin ( & producer, admin ) . await ?
344
356
}
345
357
346
358
#[ cfg( not( feature = "producer-file-io" ) ) ]
@@ -349,27 +361,55 @@ mod cmd {
349
361
Ok ( ( ) )
350
362
}
351
363
352
- async fn producer_stdin ( & self , producer : & Arc < TopicProducerPool > ) -> Result < ( ) > {
353
- let mut lines = BufReader :: new ( std:: io:: stdin ( ) ) . lines ( ) ;
364
+ async fn producer_stdin (
365
+ & self ,
366
+ producer : & Arc < TopicProducerPool > ,
367
+ admin : & FluvioAdmin ,
368
+ ) -> Result < ( ) > {
369
+ use async_std:: io:: prelude:: * ;
370
+ use async_std:: io:: BufReader ;
371
+ let mut lines = BufReader :: new ( stdin ( ) ) . lines ( ) ;
372
+ let mut partition_stream = admin. watch :: < TopicSpec > ( ) . await ?;
373
+
354
374
if self . interactive_mode ( ) {
355
375
eprint ! ( "> " ) ;
356
376
}
357
377
358
- while let Some ( Ok ( line) ) = lines. next ( ) {
359
- let produce_output = self . produce_line ( producer, & line) . await ?;
360
-
361
- if let Some ( produce_output) = produce_output {
362
- if self . delivery_semantic != DeliverySemantic :: AtMostOnce {
363
- // ensure it was properly sent
364
- produce_output. wait ( ) . await ?;
378
+ loop {
379
+ select ! {
380
+ line = lines. next( ) => {
381
+ if let Some ( Ok ( line) ) = line {
382
+ let produce_output = self . produce_line( producer, & line) . await ?;
383
+
384
+ if let Some ( produce_output) = produce_output {
385
+ if self . delivery_semantic != DeliverySemantic :: AtMostOnce {
386
+ // ensure it was properly sent
387
+ produce_output. wait( ) . await ?;
388
+ }
389
+ }
390
+
391
+ if self . interactive_mode( ) {
392
+ print_cli_ok!( ) ;
393
+ eprint!( "> " ) ;
394
+ }
395
+ } else {
396
+ // When stdin is closed, we break the loop
397
+ break ;
398
+ }
399
+ }
400
+ stream = partition_stream. next( ) => {
401
+ if let Some ( stream) = stream {
402
+ let stream = stream?;
403
+ for change in stream. inner( ) . changes {
404
+ if change. header == MsgType :: DELETE && change. content. name == self . topic {
405
+ return Err ( CliError :: TopicDeleted ( self . topic. clone( ) ) . into( ) ) ;
406
+ }
407
+ }
408
+ }
365
409
}
366
- }
367
-
368
- if self . interactive_mode ( ) {
369
- print_cli_ok ! ( ) ;
370
- eprint ! ( "> " ) ;
371
410
}
372
411
}
412
+
373
413
Ok ( ( ) )
374
414
}
375
415
0 commit comments