SpringCloud系列--消息驱动
rabbitmq:
producer -----> Exchange ---绑定--->queue
4种类型交换器
- direct: 根据生产者传过来的routing key是否等于binding key,来决定将消息发送给哪个队列
- topic: 根据生产者传过来的routing key 是否匹配一定的表达式,来决定消息发送给哪个或者哪些队列
- fanout: 将消息发送给交换器知道的全部队列,这种交换器会忽略设置的routing key
- headers: 根据消息的头信息来决定将消息发送给哪个队列
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.crazyit.jms</groupId>
<artifactId>rabbit-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.9</version>
</dependency>
</dependencies>
</project>
发送端:
package org.crazyit.jms;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生产者
* @author 杨恩雄
*
*/
public class Send {
public static void main(String[] args) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxxxxxx");
factory.setVirtualHost("xxxx");
factory.setUsername("xxxx");
factory.setPassword("xxxx");
Connection connection = factory.newConnection();
// 建立通道
Channel channel = connection.createChannel();
// 声明队列
String queueName = "hello";
channel.queueDeclare(queueName, false, false, false, null);
String message = "Hello World!555";
// 进行消息发布
channel.basicPublish("", queueName, null, message.getBytes());
// 关闭通道和连接
channel.close();
connection.close();
}
}
接收端:
package org.crazyit.jms;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 消息者
* @author 杨恩雄
*
*/
public class Receive {
public static void main(String[] argv) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("public-rabbitmq01.paas.test");
factory.setVirtualHost("hh_dev");
factory.setUsername("hh_dev");
factory.setPassword("Cr6pKGCahK83DNhC");
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
String queueName = "hello";
channel.queueDeclare(queueName, false, false, false, null);
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收的消息:" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
接收端会一直阻塞,队列中有消息就会打印出来。
Kafka
整体结构和rabbitmq类似,生产者向kafka服务器发送消息,kafka接收到消息后再投递给消费者. 生产者产生的消息被发送到Topic中,Topic中保存着各类数据,每一条数据都是用键、值进行保存。每一个Topic中都包含一个或多个物理分区(Partition),这些分区维护者消息的内容和索引,它们有可能保存在不同的服务器中。对于客户端来说,无须关心数据如何被保存,只需关心将消息发往哪个Topic。
文本摘自此文章
1、kafka需要zookeeper管理,所以需要先安装zookeeper。
下载zookeeper镜像
$ docker pull wurstmeister/zookeeper
2、启动镜像生成容器
## docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
$ docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
3、下载kafka镜像
$ docker pull wurstmeister/kafka
4、启动kafka镜像生成容器
## docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
$ docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 -e KAFKA_LISTENERS=PLAINTEXT://172.16.0.13:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
参数说明:
-e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口
-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
5、验证kafka是否可以使用
5.1、进入容器
$ docker exec -it kafka bash
5.2、进入 /opt/kafka_2.12-2.3.0/bin/ 目录下
$ cd /opt/kafka_2.12-2.3.0/bin/
5.3、运行kafka生产者发送消息
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic sun
发送消息
> {"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}
5.4、运行kafka消费者接收消息
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.crazyit.cloud</groupId>
<artifactId>kafka-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.9</version>
</dependency>
</dependencies>
</project>
生产端:
package org.crazyit.cloud;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
* 生产者
* @author 杨恩雄
*
*/
public class ProducerMain {
public static void main(String[] args) throws Exception {
// 配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "39.107.77.113:9092");
// 设置数据key的序列化处理类
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 设置数据value的序列化处理类
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// 创建生产者实例
Producer<String, String> producer = new KafkaProducer<String,String>(props);
// 创建一条新的记录,第一个参数为Topic名称
ProducerRecord record = new ProducerRecord<String, String>("studyjava", "userName", "wujingjian");
// 发送记录
producer.send(record);
producer.close();
}
}
消费端1:
package org.crazyit.cloud;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 消费者
* @author 杨恩雄
*
*/
public class ConsumerMain {
public static void main(String[] args) {
// 配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "39.107.77.113:9092");
// 必须指定消费者组
props.put("group.id", "test");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 studyjava 的消息
consumer.subscribe(Arrays.asList("studyjava"));
// 到服务器中读取记录
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("这是消费者A,key: " + record.key() + ", value: " + record.value());
}
}
}
}
消费端2
package org.crazyit.cloud;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 消费者
* @author 杨恩雄
*
*/
public class ConsumerMainB {
public static void main(String[] args) {
// 配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "39.107.77.113:9092");
// 指定了不同的消费者组,消息会被广播到所有的消费者实例
props.put("group.id", "test2");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅 my-topic 的消息
consumer.subscribe(Arrays.asList("my-topic"));
// 到服务器中读取记录
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("这是消费者B,key: " + record.key() + ", value: " + record.value());
}
}
}
}
消费者组
消费者会为自己添加一个消费者组的标识,每一条发布到Topic的记录都会被交付给消费者组的一个消费者实例。
简单来说,生产者生产一条消息,只会发给同一个消费者组中的一台实例上。如果消费者不属于同一个消费者组,则每个消费者都会受到广播消息。
开发消息微服务
EurekaServer
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.crazyit.cloud</groupId>
<artifactId>spring-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 8761
eureka:
client:
registerWithEureka: false
fetchRegistry: false
server:
enable-self-preservation: false
logging:
level:
com.netflix: INFO
启动类
package org.crazyit.cloud;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class ServerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(ServerApplication.class).run(args);
}
}
spring-producer:消息生产者
pom.xml 引入 spring-cloud-starter-stream-rabbit
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.crazyit.cloud</groupId>
<artifactId>spring-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 8081
spring:
application:
name: spring-producer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: xxxxxx
eureka:
instance:
hostname: localhost
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
SendService
package org.crazyit.cloud;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.SubscribableChannel;
public interface SendService {
@Output("myInput")
SubscribableChannel sendOrder();
@Output("input")
SubscribableChannel output();
}
注解@Output创建消息通道.调用该方法后会向指定名称(myInput, input)通道投递消息.
如果@Output不提供参数,则使用方法名作为通道名称。
ProducerController
package org.crazyit.cloud;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
SendService sendService;
@RequestMapping(value = "/send", method = RequestMethod.GET)
@ResponseBody
public String sendRequest() {
// 创建消息
Message msg = MessageBuilder.withPayload("Hello World".getBytes()).build();
// 发送消息
sendService.sendOrder().send(msg);
return "SUCCESS";
}
@RequestMapping(value = "/test-source", method = RequestMethod.GET)
@ResponseBody
public String testSource() {
// 创建消息
Message msg = MessageBuilder.withPayload("Hello World 2".getBytes()).build();
sendService.output().send(msg);
return "SUCCESS";
}
}
调用sendService.sendOrder方法得到SubscribableChannel(通道)实例,在调用send方法,发送消息。
ProducerApplication
package org.crazyit.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value = {SendService.class, Source.class})
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
@EnableBinding开启绑定功能,该注解以SendService.class和Source.class作为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。
spring-consumer消费端
pom.xml 引入 spring-cloud-starter-stream-rabbit
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.crazyit.cloud</groupId>
<artifactId>spring-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
</project>
application.yml
server:
port: 8080
spring:
application:
name: spring-consumer
cloud:
stream:
bindings:
myInput:
group: groupA
rabbitmq:
virtual-host: xxxxxx
host: xxxxx
username: xxxxx
password: xxxxxx
port: 5672
eureka:
instance:
hostname: localhost
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
ReceiveService
package org.crazyit.cloud;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ReceiveService {
@Input("myInput")
SubscribableChannel myInput();
}
ReceiveService中定义了myInput消息输入通道
启动类
package org.crazyit.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
@EnableBinding(value = {ReceiveService.class, Sink.class})
public class ReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(ReceiverApplication.class, args);
}
@StreamListener("myInput")
public void receive(byte[] msg) {
System.out.println("接收到的消息: " + new String(msg));
}
@StreamListener(Sink.INPUT)
public void receiveInput(byte[] msg) {
System.out.println("receiveInput方法,接收到的消息: " + new String(msg));
}
}
@EnableBinding开启绑定,并声明了通道的接口类.新建receive方法并使用@StreamListener修饰,声明了订阅myInput通道的消息。
简化开发 Sink\Source\Processor
Sink.class
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}
Source.class
package org.springframework.cloud.stream.messaging;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
Processor.class
package org.springframework.cloud.stream.messaging;
public interface Processor extends Source, Sink {
}
根据接口可知,实际上是帮助我们内置了input和output两个通道,大多数情况下我么不必编写服务接口(sendService),甚至不必使用@Input,@output注解. 以上面消费者为例,在@EnableBinding时候,将Sink.class指定上。
调用http://localhost:8081/send 消费端控制台打印 接收到的消息: Hello World
调用http://localhost:8081/test-source 消费端控制台打印receiveInput方法,接收到的消息: Hello World 2
说明Sink已经帮我们默认实现的input
通道成功可用。
上述事例使用的rabbitmq, 更换绑定器kafka
直接替换pom引入包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
并进行相关配置则可用。
消费者组
当消费者组相同时,仅由其中一个消费者实例处理,如果消费者不同,则会发送给全部的消费者实例。
在上文的消费者配置文件中配置了
spring:
application:
name: spring-consumer
cloud:
stream:
bindings:
myInput:
group: groupA
标识在myInput通道中,该消费者所属组为groupA.
然后我们将消费者另外复制两份,然后设置它们为GroupB
spring:
application:
name: spring-third-consumer
cloud:
stream:
bindings:
myInput:
group: groupB
这样当发送消息后,原先的消费者每次都会收到消息(GroupA), 另外新复制出来的两个消费者(GroupB)则会轮询收到消息。如下图: