网站建设 响应式,aspnet东莞网站建设多少钱,云seo关键词排名优化软件,电脑网站打不开怎么解决一、介绍
#xff08;1#xff09;提供统一接口操作不同厂商的消息队列组件#xff0c;降低学习成本 #xff08;2#xff09;生产者和消费者只需操作binder对象即可与消息队列交互#xff0c;生产者output#xff0c;消费者input #xff08;3#xff09;核心概念1提供统一接口操作不同厂商的消息队列组件降低学习成本 2生产者和消费者只需操作binder对象即可与消息队列交互生产者output消费者input 3核心概念发布订阅、消费组、分区 4使用topic模式
二、项目搭建
1生产者 a、编写pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIddemo20220821/artifactIdgroupIdcom.wsh.springcloud/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdcloud-stream-rabbitmq-provider8801/artifactIddependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependencydependencygroupIdcom.wsh.springcloud/groupIdartifactIdcloud-api-common/artifactIdversion1.0-SNAPSHOT/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies/projectb、编写application.yml
server:port: 8801spring:application:name: cloud-stream-rabbit-providercloud:stream:binders:defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.0.166port: 5672username: guestpassword: guestbindings:output:destination: testExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client:# 客户端设置为trueregister-with-eureka: true# 客户端设置为truefetch-registry: trueservice-url:# defaultZone: http://localhost:7001/eurekadefaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eurekainstance:instance-id: cloudSreamRabbitProvider8801prefer-ip-address: truemanagement:endpoints:web:exposure:include: *c、编写启动类
package com.wsh.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;/*** ClassName ConfigMain3344* Description: TODO* Author wshaha* Date 2023/10/15* Version V1.0**/
SpringBootApplication
EnableEurekaClient
public class StreamRabbitMqProvider8801 {public static void main(String[] args) {SpringApplication.run(StreamRabbitMqProvider8801.class, args);}
}
d、编写接口及实现类
package com.wsh.springcloud.service;public interface IMessageProvider {public String send();
}
package com.wsh.springcloud.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;/*** ClassName MessageProviderImpl* Description: TODO* Author wshaha* Date 2023/10/15* Version V1.0**/
EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {AutowiredQualifier(output)private MessageChannel messageChannel;Overridepublic String send() {messageChannel.send(MessageBuilder.withPayload(hello).build());return null;}
}
e、编写Controller
package com.wsh.springcloud.controller;import com.wsh.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;/*** ClassName MessageController* Description: TODO* Author wshaha* Date 2023/10/15* Version V1.0**/
RestController
public class MessageController {Autowiredprivate IMessageProvider messageProvider;GetMapping(/sendMessage)public void sendMessage(){messageProvider.send();}
}
2编写消费者 a、编写pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIddemo20220821/artifactIdgroupIdcom.wsh.springcloud/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdcloud-stream-rabbitmq-consumer8802/artifactIddependenciesdependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencydependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-netflix-eureka-client/artifactId/dependencydependencygroupIdcom.wsh.springcloud/groupIdartifactIdcloud-api-common/artifactIdversion1.0-SNAPSHOT/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependencies
/projectb、编写application.yml
server:port: 8802spring:application:name: cloud-stream-rabbit-providercloud:stream:binders:defaultRabbit:type: rabbitenvironment:spring:rabbitmq:host: 192.168.0.166port: 5672username: guestpassword: guestbindings:input:destination: testExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client:# 客户端设置为trueregister-with-eureka: true# 客户端设置为truefetch-registry: trueservice-url:# defaultZone: http://localhost:7001/eurekadefaultZone: http://eureka1.com:7001/eureka, http://eureka2.com:7002/eurekainstance:instance-id: cloudSreamRabbitProvider8801prefer-ip-address: truemanagement:endpoints:web:exposure:include: *c、编写启动类
package com.wsh.springcloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;/*** ClassName ConfigMain3344* Description: TODO* Author wshaha* Date 2023/10/15* Version V1.0**/
SpringBootApplication
EnableEurekaClient
public class StreamRabbitMqConsumer8802 {public static void main(String[] args) {SpringApplication.run(StreamRabbitMqConsumer8802.class, args);}
}
d、编写Controller
package com.wsh.springcloud.controller;import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;/*** ClassName ConsumerController* Description: TODO* Author wshaha* Date 2023/10/15* Version V1.0**/
RestController
EnableBinding(Sink.class)
public class ConsumerController {StreamListener(Sink.INPUT)public void receiveMessage(MessageString message){System.out.println(message.getPayload());}
}
3运行
三、解决消息重复消费
1绑定同一交换机且不同组的消费者会收到相同消息 2解决方式同一组的消费者只有一个消费者会收到消息故配置这群消费者为同一个组即可 3配置
四、消息持久化
1定义分组后会实现消息持久化原理没定义分组时服务对应的队列是autodelete服务停止后就删除队列手续发送的消息无法收到