Hello! 欢迎来到小浪资源网!

RxJava中如何优雅地在onComplete调用时取消订阅?


RxJava中如何优雅地在onComplete调用时取消订阅?

如何在 rxjava 中在 oncomplete 调用时取消订阅

rxjava 中,通常希望在 observable 的 oncomplete 事件上取消订阅,以防止进一步的事件发射。这可以通过设置 completablefuture 来实现。

android 环境中使用 rxjava 2.x 的示例

flowable 由持久层方法返回,作为调用方中的 result.getall(dbname.get(), strategy) 和 result.gettablecolumn(table)。

立即学习Java免费学习笔记(深入)”;

public class consoleschemaflowableoutput implements schemaflowableoutput {     private static final logger logger = loggerfactory.getlogger(consoleschemaflowableoutput.class);     private volatile completablefuture<string> future = new completablefuture<>();     private atomicinteger count = new atomicinteger(0);      @override     public disposable flush(information information, flowable<table> table) throws schemaexportexception {         disposable export_flush_complete = table.subscribe(tableins -> {             system.out.println(printasciitable(tableins));             system.out.println(printasciicolumns(tableins.getcolumns()));             system.out.println("  ");             count.addandget(1);         }, throwable -> {             logger.debug("export break, reason: " + throwable.getmessage());             future.cancel(true);             throw new schemaexportexception(throwable);         }, new action() {             @override             public void run() throws exception {                 logger.debug("export complete, affect size:" + count.get());                 future.complete("ok");             }         });         return export_flush_complete;     }      @override     public completablefuture<string> getfuture() {         return future;     }     ... }

调用方

public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException {     long startStamp = System.currentTimeMillis();     // Flowable     Flowable<Table> tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function<Table, Publisher<Table>>() {         @Override         public Publisher<Table> apply(@NonNull Table table) throws Exception {             return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {                 @Override                 public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {                     return Single.just(table.fillColumn(columns));                 }             }).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {                 @Override                 public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {                     return Flowable.just(table);                 }             });         }     });     Disposable disposable = null;     try {         disposable = out.flush(info, tableFlowable);         CompletableFuture<String> future = out.getFuture();         while (!future.isDone()) {             logger.info("[ERE-Flowable]未完成,线程休眠1秒");             Thread.currentThread().sleep(1000, 0);         }         String result = future.get();         logger.info("[ERE-Flowable]完成, 结果:" + result);         if (result.equals("OK")) {             long finishStamp = System.currentTimeMillis();             clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: " + (finishStamp - startStamp));         }     } catch (Exception e) {         clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: " + e.getMessage());     } }  private void clearHander(Disposable disposable, String reason) {     logger.info(reason);     if (null != disposable && !disposable.isDisposed()) {         disposable.dispose();     } else {         if (null != disposable) {             logger.info("[CH]disposable status:" + disposable.isDisposed());         } else {             logger.info("[CH]disposable is null:");         }     }     // 结束后的回调,执行一些清理工作     completeHandler.apply(); }

通过将 completablefuture 设置为 “ok” 来通知调用方 observable 已结束,这将取消订阅并执行后续处理。

相关阅读