博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊reactive streams的tranform操作
阅读量:6275 次
发布时间:2019-06-22

本文共 9233 字,大约阅读时间需要 30 分钟。

本文主要展示一下reactive streams的一些transform操作

mergeWith

@Test    public void testMerge(){        Flux
flux1 = Flux.interval(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux1]:"+e); Flux
mergeFlux = Flux.interval(Duration.ofSeconds(1)) .delayElements(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux2]:"+e) .mergeWith(flux1); mergeFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); mergeFlux.blockLast(); }

输出实例

21:18:07.583 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:18:08.618 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:021:18:09.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:121:18:09.645 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:021:18:10.619 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:221:18:10.649 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:121:18:11.654 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux2]:2
可以发现,他们是交叉合并的。

concatWith

@Test    public void testConcat(){        Flux
flux1 = Flux.interval(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux1]:"+e); Flux
concatFlux = Flux.interval(Duration.ofSeconds(1)) .delayElements(Duration.ofSeconds(1)) .take(3) .map(e -> "[flux2]:"+e) .concatWith(flux1); concatFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); concatFlux.blockLast(); }

输出

21:19:00.779 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:19:02.832 [parallel-4] INFO com.example.demo.TransformTest - subscribe:[flux2]:021:19:03.836 [parallel-6] INFO com.example.demo.TransformTest - subscribe:[flux2]:121:19:04.840 [parallel-8] INFO com.example.demo.TransformTest - subscribe:[flux2]:221:19:05.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:021:19:06.845 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:121:19:07.844 [parallel-2] INFO com.example.demo.TransformTest - subscribe:[flux1]:2
可以发现concatWith只是连接两个flux的数据,并不是按emit的顺序交叉来

zipWith

@Test    public void testZip(){        List
firstList = Lists.newArrayList("a","b","c","d","e","a","b"); List
secondList = Lists.newArrayList("1","2","3","4","5"); Flux
> zipFlux = Flux.fromIterable(firstList) .zipWith(Flux.fromIterable(secondList)); zipFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); }

输出如下

21:20:59.506 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:20:59.516 [main] INFO com.example.demo.TransformTest - subscribe:[a,1]21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[b,2]21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[c,3]21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[d,4]21:20:59.517 [main] INFO com.example.demo.TransformTest - subscribe:[e,5]
可以发现flux1相比flux2多余的数据没有被zip

flatMap

@Test    public void testFlatMap(){        List
secondList = Lists.newArrayList("1","2","3","4","5"); Flux
flatMapFlux = Flux.fromIterable(secondList) .flatMap((str) ->{ return Mono.just(str).repeat(2).map(String::toUpperCase).delayElements(Duration.ofMillis(1)); }); flatMapFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); flatMapFlux.blockLast(); Flux
mapFlux = Flux.fromIterable(secondList) .repeat(2) .map(String::toUpperCase); mapFlux.subscribe(e -> { LOGGER.info("map subscribe:{}",e); }); mapFlux.blockLast(); }

输出

21:33:46.904 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:33:46.958 [parallel-1] INFO com.example.demo.TransformTest - subscribe:121:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:221:33:46.959 [parallel-1] INFO com.example.demo.TransformTest - subscribe:321:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:421:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:521:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:221:33:46.960 [parallel-7] INFO com.example.demo.TransformTest - subscribe:321:33:46.960 [parallel-8] INFO com.example.demo.TransformTest - subscribe:421:33:46.960 [parallel-1] INFO com.example.demo.TransformTest - subscribe:521:33:46.961 [parallel-6] INFO com.example.demo.TransformTest - subscribe:121:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:121:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:221:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:321:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:421:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:521:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:121:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:221:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:321:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:421:33:46.963 [main] INFO com.example.demo.TransformTest - map subscribe:5
flatMap是异步的

reduce

@Test    public void testReduce(){        List
secondList = Lists.newArrayList("1","2","3","4","5"); Mono
reduceMono = Flux.fromIterable(secondList) .flatMap(e -> Mono.just(e).map(item -> Integer.valueOf(item))) .reduce((total, e) -> total + e); reduceMono.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); }

输出

21:36:29.978 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:36:30.014 [main] INFO com.example.demo.TransformTest - subscribe:15

groupBy

@Test    public void testGroup(){        List
firstList = Lists.newArrayList("a","b","c","d","e","a","b"); Flux
> groupFlux = Flux.fromIterable(firstList) .map(String::toUpperCase) .groupBy(key -> key); groupFlux.subscribe(e -> { LOGGER.info("subscribe:{}",e.collectList().subscribe(item -> { LOGGER.info("item:{}",item); })); }); }

输出

21:37:00.912 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:37:00.949 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@5faeada121:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@1563da521:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@2bbf4b8b21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@30a3107a21:37:00.951 [main] INFO com.example.demo.TransformTest - subscribe:reactor.core.publisher.LambdaMonoSubscriber@33c7e1bb21:37:00.951 [main] INFO com.example.demo.TransformTest - item:[A, A]21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[B, B]21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[C]21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[D]21:37:00.952 [main] INFO com.example.demo.TransformTest - item:[E]

first

@Test    public void testFirst(){        List
firstList = Lists.newArrayList("a","b","c","d","e","a","b"); List
secondList = Lists.newArrayList("1","2","3","4","5"); Flux
firstFlux = Flux.fromIterable(firstList) .delayElements(Duration.ofMillis(200)); Flux
secondFlux = Flux.fromIterable(secondList) .take(2); Flux
result = Flux.first(firstFlux, secondFlux); result.subscribe(e -> { LOGGER.info("subscribe:{}",e); }); }

toIterable

@Test    public void testToIterable(){        List
firstList = Lists.newArrayList("a","b","c","d","e","a","b"); Iterable
itr = Flux.fromIterable(firstList) .map(String::toUpperCase) .toIterable(); itr.forEach(e -> LOGGER.info(e)); }

输出

21:39:35.031 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework21:39:35.045 [main] INFO com.example.demo.TransformTest - A21:39:35.045 [main] INFO com.example.demo.TransformTest - B21:39:35.045 [main] INFO com.example.demo.TransformTest - C21:39:35.045 [main] INFO com.example.demo.TransformTest - D21:39:35.045 [main] INFO com.example.demo.TransformTest - E21:39:35.045 [main] INFO com.example.demo.TransformTest - A21:39:35.045 [main] INFO com.example.demo.TransformTest - B

小结

reactive streams的操作相当于在jdk的streams的基础上实现了reactive化,可以参照着了解。

doc

转载地址:http://mogpa.baihongyu.com/

你可能感兴趣的文章
Mellanox公司计划利用系统芯片提升存储产品速度
查看>>
白帽子守护网络安全,高薪酬成大学生就业首选!
查看>>
ARM想将芯片装进人类大脑 降低能耗是一大挑战
查看>>
Oracle数据库的备份方法
查看>>
Selenium 自动登录考勤系统
查看>>
关于如何以编程的方式执行TestNG
查看>>
智能照明造福千家万户 家居智能不再是梦
查看>>
物联网如何跳出“看起来很美”?
查看>>
浅谈MySQL 数据库性能优化
查看>>
《UNIX/Linux 系统管理技术手册(第四版)》——1.10 其他的权威文档
查看>>
灵动空间 创享生活
查看>>
《UNIX网络编程 卷1:套接字联网API(第3版)》——8.6 UDP回射客户程序:dg_cli函数...
查看>>
不要将时间浪费到编写完美代码上
查看>>
《第一桶金怎么赚——淘宝开店创业致富一册通》一一第1章 创业梦想,怎样起步...
查看>>
基于容器服务的持续集成与云端交付(三)- 从零搭建持续交付系统
查看>>
《算法基础:打开算法之门》一3.4 归并排序
查看>>
高德开放平台开放源代码 鼓励开发者创新
查看>>
《高并发Oracle数据库系统的架构与设计》一2.5 索引维护
查看>>
《Exchange Server 2010 SP1/SP2管理实践》——2.4 部署外部网络环境
查看>>
Firefox 是 Pwn2own 2014 上攻陷次数最多的浏览器
查看>>