博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
reactor-rabbitmq小试牛刀
阅读量:6721 次
发布时间:2019-06-25

本文共 3982 字,大约阅读时间需要 13 分钟。

本文主要研究一下如何使用reactor-rabbitmq

maven

io.projectreactor.rabbitmq
reactor-rabbitmq
1.0.0.M2
复制代码

rabbitmq

  • 参考
  • 当前使用的镜像是bijukunjummen/rabbitmq-server:3.7.0,docker-compose文件配置的账号密码为myuser/mypass
  • 访问http://192.168.99.100:15672可以查看界面

实例

@Test    public void testProducer() throws InterruptedException {        int count = 100;        ConnectionFactory connectionFactory = new ConnectionFactory();        connectionFactory.useNio();        connectionFactory.setUsername("myuser");        connectionFactory.setPassword("mypass");        SenderOptions senderOptions =  new SenderOptions()                .connectionFactory(connectionFactory)                .connectionSupplier(cf -> cf.newConnection(                        new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)},                        "reactive-sender"))                .resourceCreationScheduler(Schedulers.elastic());        Sender sender = ReactorRabbitMq.createSender(senderOptions);        Flux
confirmations = sender.sendWithPublishConfirms(Flux.range(1, count) .map(i -> new OutboundMessage("", QUEUE, ("Message_" + i).getBytes()))); CountDownLatch latch = new CountDownLatch(count); sender.declareQueue(QueueSpecification.queue(QUEUE)) .thenMany(confirmations) .doOnError(e -> LOGGER.error("Send failed", e)) .subscribe(r -> { if (r.isAck()) { LOGGER.info("Message {} sent successfully", new String(r.getOutboundMessage().getBody())); latch.countDown(); } }); latch.await(10, TimeUnit.SECONDS); sender.close(); } @Test public void testConsumer() throws InterruptedException { int count = 100; CountDownLatch latch = new CountDownLatch(count); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.useNio(); connectionFactory.setUsername("myuser"); connectionFactory.setPassword("mypass"); SenderOptions senderOptions = new SenderOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)}, "reactive-sender")) .resourceCreationScheduler(Schedulers.elastic()); Sender sender = ReactorRabbitMq.createSender(senderOptions); Mono
queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE)); ReceiverOptions receiverOptions = new ReceiverOptions() .connectionFactory(connectionFactory) .connectionSupplier(cf -> cf.newConnection( new Address[] {new Address("192.168.99.100",5672), new Address("192.168.99.100",5673), new Address("192.168.99.100",5674)}, "reactive-receiver")) .connectionSubscriptionScheduler(Schedulers.elastic()); Receiver receiver = ReactorRabbitMq.createReceiver(receiverOptions); Flux
messages = receiver.consumeAutoAck(QUEUE); Disposable disposable = queueDeclaration.thenMany(messages).subscribe(m -> { LOGGER.info("Received message {}", new String(m.getBody())); latch.countDown(); }); latch.await(10, TimeUnit.SECONDS); disposable.dispose(); sender.close(); receiver.close(); }复制代码
  • 由于设置了账号密码,因而需要在ConnectionFactory那里指定账号密码
  • 另外由于使用了rabbitmq集群,因而通过connectionSupplier指定要连接的多个rabbitmq地址
  • 这里不管是producer还是consumer,都通过queueDeclaration进行操作

小结

reactor-rabbitmq对rabbitmq的api进行封装,改造为reactive streams模式,提供了Non-blocking Back-pressure以及End-to-end Reactive Pipeline特性。

doc

转载地址:http://yyjmo.baihongyu.com/

你可能感兴趣的文章
php设计模式——适配器模式
查看>>
C#文件、文件夹操作
查看>>
MySQL编译安装加入service
查看>>
以rsync进行同步镜像备份
查看>>
热烈祝贺VMware View4.5荣获“2010年度最佳产品”大奖
查看>>
ORACLE 11G 中表空间传输 TransportableTablespace
查看>>
自动化1
查看>>
Jenkins 2.32.3参数化构建maven项目
查看>>
使用Oracle存储过程批量生成测试数据
查看>>
正则表达式 - ×××
查看>>
Target runtime Apache Tomcat v6.0 is not defined
查看>>
.net密码找回
查看>>
安装mysql遇到的问题
查看>>
我的友情链接
查看>>
大道至简--GoEasy推送
查看>>
免费邮箱服务器(收藏)
查看>>
org.aspectj.lang.JoinPoint-中文简要API
查看>>
数据库内存使用
查看>>
shell-9-函数(tc与限速实例)
查看>>
[战略]Fans未来战略--第4篇--2012年的IT技术学习规划
查看>>