SpringCloud系列--消息驱动

  |   0 评论   |   0 浏览

rabbitmq:

producer -----> Exchange ---绑定--->queue
4种类型交换器

  • direct: 根据生产者传过来的routing key是否等于binding key,来决定将消息发送给哪个队列
  • topic: 根据生产者传过来的routing key 是否匹配一定的表达式,来决定消息发送给哪个或者哪些队列
  • fanout: 将消息发送给交换器知道的全部队列,这种交换器会忽略设置的routing key
  • headers: 根据消息的头信息来决定将消息发送给哪个队列

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.crazyit.jms</groupId>
	<artifactId>rabbit-test</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>4.2.0</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.9</version>
		</dependency>
	</dependencies>

</project>

发送端:

package org.crazyit.jms;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生产者
 * @author 杨恩雄
 *
 */
public class Send {

	public static void main(String[] args) throws Exception {
		// 创建连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("xxxxxxx");
		factory.setVirtualHost("xxxx");
		factory.setUsername("xxxx");
		factory.setPassword("xxxx");
		Connection connection = factory.newConnection();
		// 建立通道
		Channel channel = connection.createChannel();
		// 声明队列
		String queueName = "hello";
		channel.queueDeclare(queueName, false, false, false, null);
		String message = "Hello World!555";
		// 进行消息发布
		channel.basicPublish("", queueName, null, message.getBytes());
		// 关闭通道和连接
	    channel.close();
	    connection.close();
	}
}

接收端:

package org.crazyit.jms;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消息者
 * @author 杨恩雄
 *
 */
public class Receive {

	public static void main(String[] argv) throws Exception {
		// 创建连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("public-rabbitmq01.paas.test");
		factory.setVirtualHost("hh_dev");
		factory.setUsername("hh_dev");
		factory.setPassword("Cr6pKGCahK83DNhC");
		Connection connection = factory.newConnection();
		// 创建通道
		Channel channel = connection.createChannel();
		// 声明队列
		String queueName = "hello";
		channel.queueDeclare(queueName, false, false, false, null);
		// 创建消费者
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope,
					AMQP.BasicProperties properties, byte[] body)
					throws IOException {
				String message = new String(body, "UTF-8");
				System.out.println("接收的消息:" + message);
			}
		};
		channel.basicConsume(queueName, true, consumer);
	}
}

接收端会一直阻塞,队列中有消息就会打印出来。

Kafka

整体结构和rabbitmq类似,生产者向kafka服务器发送消息,kafka接收到消息后再投递给消费者. 生产者产生的消息被发送到Topic中,Topic中保存着各类数据,每一条数据都是用键、值进行保存。每一个Topic中都包含一个或多个物理分区(Partition),这些分区维护者消息的内容和索引,它们有可能保存在不同的服务器中。对于客户端来说,无须关心数据如何被保存,只需关心将消息发往哪个Topic。

文本摘自此文章

1、kafka需要zookeeper管理,所以需要先安装zookeeper。 

下载zookeeper镜像
$ docker pull wurstmeister/zookeeper

2、启动镜像生成容器
## docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
$ docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2  --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

3、下载kafka镜像
$ docker pull wurstmeister/kafka

4、启动kafka镜像生成容器
## docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
$ docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092 -e KAFKA_LISTENERS=PLAINTEXT://172.16.0.13:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

参数说明:
-e KAFKA_BROKER_ID=0  在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=172.16.0.13:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.0.13:9092  把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

5、验证kafka是否可以使用

5.1、进入容器
$ docker exec -it kafka bash

5.2、进入 /opt/kafka_2.12-2.3.0/bin/ 目录下
$ cd /opt/kafka_2.12-2.3.0/bin/

5.3、运行kafka生产者发送消息
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic sun

发送消息
> {"datas":[{"channel":"","metric":"temperature","producer":"ijinus","sn":"IJA0101-00002245","time":"1543207156000","value":"80"}],"ver":"1.0"}
 
5.4、运行kafka消费者接收消息
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.crazyit.cloud</groupId>
	<artifactId>kafka-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.11.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.9</version>
		</dependency>
	</dependencies>


</project>

生产端:

package org.crazyit.cloud;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/**
 * 生产者
 * @author 杨恩雄
 *
 */
public class ProducerMain {

	public static void main(String[] args) throws Exception {
		// 配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "39.107.77.113:9092");
		// 设置数据key的序列化处理类
		props.put("key.serializer",
				"org.apache.kafka.common.serialization.StringSerializer");
		// 设置数据value的序列化处理类
		props.put("value.serializer",
				"org.apache.kafka.common.serialization.StringSerializer");
		// 创建生产者实例
		Producer<String, String> producer = new KafkaProducer<String,String>(props);
		// 创建一条新的记录,第一个参数为Topic名称
		ProducerRecord record = new ProducerRecord<String, String>("studyjava", "userName", "wujingjian");
		// 发送记录
		producer.send(record);
		producer.close();
	}
}

消费端1:

package org.crazyit.cloud;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消费者
 * @author 杨恩雄
 *
 */
public class ConsumerMain {

	public static void main(String[] args) {
		// 配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "39.107.77.113:9092");
		// 必须指定消费者组
		props.put("group.id", "test");
		props.put("key.deserializer",
				"org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer",
				"org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 订阅 studyjava 的消息
		consumer.subscribe(Arrays.asList("studyjava"));
		// 到服务器中读取记录
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
				System.out.println("这是消费者A,key: " + record.key() + ", value: " + record.value());
			}
		}
	}
}

消费端2

package org.crazyit.cloud;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消费者
 * @author 杨恩雄
 *
 */
public class ConsumerMainB {

	public static void main(String[] args) {
		// 配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "39.107.77.113:9092");
		// 指定了不同的消费者组,消息会被广播到所有的消费者实例
		props.put("group.id", "test2");
		props.put("key.deserializer",
				"org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer",
				"org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 订阅 my-topic 的消息
		consumer.subscribe(Arrays.asList("my-topic"));
		// 到服务器中读取记录
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
				System.out.println("这是消费者B,key: " + record.key() + ", value: " + record.value());
			}
		}
	}
}

消费者组

消费者会为自己添加一个消费者组的标识,每一条发布到Topic的记录都会被交付给消费者组的一个消费者实例。
简单来说,生产者生产一条消息,只会发给同一个消费者组中的一台实例上。如果消费者不属于同一个消费者组,则每个消费者都会受到广播消息。

image.png

开发消息微服务

EurekaServer

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.crazyit.cloud</groupId>
	<artifactId>spring-server</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR1</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka-server</artifactId>
		</dependency>
	</dependencies>
	
</project>

application.yml

server:
  port: 8761
eureka:
  client:
    registerWithEureka: false
    fetchRegistry: false
  server:
    enable-self-preservation: false
logging:
  level:
    com.netflix: INFO

启动类

package org.crazyit.cloud;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class ServerApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(ServerApplication.class).run(args);
	}

}

spring-producer:消息生产者

pom.xml 引入 spring-cloud-starter-stream-rabbit

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.crazyit.cloud</groupId>
	<artifactId>spring-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR1</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-config</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<!-- 
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>
		 -->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
	</dependencies>

</project>

application.yml

server:
  port: 8081
spring:
  application:
    name: spring-producer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: xxxxxx
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

SendService

package org.crazyit.cloud;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.SubscribableChannel;

public interface SendService {

	@Output("myInput")
	SubscribableChannel sendOrder();
	
	@Output("input")
	SubscribableChannel output();
}

注解@Output创建消息通道.调用该方法后会向指定名称(myInput, input)通道投递消息.
如果@Output不提供参数,则使用方法名作为通道名称。

ProducerController

package org.crazyit.cloud;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

	@Autowired
	SendService sendService;
	


	@RequestMapping(value = "/send", method = RequestMethod.GET)
	@ResponseBody
	public String sendRequest() {
		// 创建消息
		Message msg = MessageBuilder.withPayload("Hello World".getBytes()).build();
		// 发送消息	
		sendService.sendOrder().send(msg);
		return "SUCCESS";
	}
	
	@RequestMapping(value = "/test-source", method = RequestMethod.GET)
	@ResponseBody
	public String testSource() {
		// 创建消息
		Message msg = MessageBuilder.withPayload("Hello World 2".getBytes()).build();
		sendService.output().send(msg);
		return "SUCCESS";
	}
}

调用sendService.sendOrder方法得到SubscribableChannel(通道)实例,在调用send方法,发送消息。

ProducerApplication

package org.crazyit.cloud;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value = {SendService.class, Source.class})
public class ProducerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}
}

@EnableBinding开启绑定功能,该注解以SendService.class和Source.class作为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。

spring-consumer消费端

pom.xml 引入 spring-cloud-starter-stream-rabbit

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.crazyit.cloud</groupId>
	<artifactId>spring-consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR1</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-config</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<!-- 
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>
		-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
	</dependencies>
	
	
</project>

application.yml

server:
  port: 8080
spring:
  application:
    name: spring-consumer
  cloud:
    stream:
      bindings: 
        myInput:
          group: groupA
  rabbitmq:
    virtual-host: xxxxxx
    host: xxxxx
    username: xxxxx
    password: xxxxxx
    port: 5672
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

ReceiveService

package org.crazyit.cloud;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ReceiveService {

	@Input("myInput")
	SubscribableChannel myInput();
}

ReceiveService中定义了myInput消息输入通道

启动类

package org.crazyit.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
@EnableBinding(value = {ReceiveService.class, Sink.class})
public class ReceiverApplication {
	
	public static void main(String[] args) {
		SpringApplication.run(ReceiverApplication.class, args);
	}

	@StreamListener("myInput")
	public void receive(byte[] msg) {
		System.out.println("接收到的消息:  " + new String(msg));
	}
	
	@StreamListener(Sink.INPUT)
	public void receiveInput(byte[] msg) {
		System.out.println("receiveInput方法,接收到的消息:  " + new String(msg));
	}
}

@EnableBinding开启绑定,并声明了通道的接口类.新建receive方法并使用@StreamListener修饰,声明了订阅myInput通道的消息。

简化开发 Sink\Source\Processor

Sink.class

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

Source.class

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

Processor.class

package org.springframework.cloud.stream.messaging;

public interface Processor extends Source, Sink {
}

根据接口可知,实际上是帮助我们内置了input和output两个通道,大多数情况下我么不必编写服务接口(sendService),甚至不必使用@Input,@output注解. 以上面消费者为例,在@EnableBinding时候,将Sink.class指定上。

调用http://localhost:8081/send 消费端控制台打印 接收到的消息: Hello World
调用http://localhost:8081/test-source 消费端控制台打印receiveInput方法,接收到的消息: Hello World 2
说明Sink已经帮我们默认实现的input通道成功可用。

上述事例使用的rabbitmq, 更换绑定器kafka

直接替换pom引入包

<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-kafka</artifactId>
		</dependency>

并进行相关配置则可用。

消费者组

当消费者组相同时,仅由其中一个消费者实例处理,如果消费者不同,则会发送给全部的消费者实例。

在上文的消费者配置文件中配置了

spring:
  application:
    name: spring-consumer
  cloud:
    stream:
      bindings: 
        myInput:
          group: groupA

标识在myInput通道中,该消费者所属组为groupA.

然后我们将消费者另外复制两份,然后设置它们为GroupB

spring:
  application:
    name: spring-third-consumer
  cloud:
    stream:
      bindings: 
        myInput:
          group: groupB

这样当发送消息后,原先的消费者每次都会收到消息(GroupA), 另外新复制出来的两个消费者(GroupB)则会轮询收到消息。如下图:
image.png


标题:SpringCloud系列--消息驱动
作者:码农路上
地址:http://wujingjian.club/articles/2020/03/25/1585131170208.html