Fork me on GitHub

Spring Cloud Stream实现消息过滤消费

目录

TIPS

本文基于Spring Cloud Greenwich SR1 + spring-cloud-starter-stream-rocketmq 0.9.0

理论兼容:Spring Cloud Finchley+ + spring-cloud-starter-stream-rocketmq 0.2.2+

MQ使用的是RocketMQ,也可使用Kafka或者RabbitMQ。

本文探讨Spring Cloud Stream & RocketMQ过滤消息的各种姿势。

在实际项目中,我们可能需要实现消息消费的过滤。

举个例子:实现消息的分流处理:

生产者生产的消息,虽然消息体可能一样,但是header不一样。可编写两个或者更多的消费者,对不同header的消息做针对性的处理!

condition

生产者

生产者设置一下header,比如my-header,值根据你的需要填写:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
private Source source;

public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.setHeader("my-header","你的header")
.build()
);
return "success";
}

消费者

1
2
3
4
5
6
7
8
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(value = Sink.INPUT,condition = "headers['my-header']=='你的header'")
public void receive(String messageBody) {
log.info("通过stream收到了消息:messageBody ={}", messageBody);
}
}

如代码所示,使用 StreamListener 注解的 condition 属性。当 headers['my-header']=='你的header' 条件满足,才会进入到方法体。

Tags

TIPS

该方式只支持RoketMQ,不支持Kafka/RabbitMQ

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Autowired
private Source source;

public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
// 注意:只能设置1个tag
.setHeader(RocketMQHeaders.TAGS, "tag1")
.build()
);
return "success";
}

消费者

  • 接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public interface MySink {
    String INPUT1 = "input1";
    String INPUT2 = "input2";

    @Input(INPUT1)
    SubscribableChannel input();

    @Input(INPUT2)
    SubscribableChannel input2();
    }
  • 注解

    1
    @EnableBinding({MySink.class})
  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    spring:
    cloud:
    stream:
    rocketmq:
    binder:
    name-server: 127.0.0.1:9876
    bindings:
    input1:
    consumer:
    # 表示input2消费带有tag1的消息
    tags: tag1
    input2:
    consumer:
    # 表示input2消费带有tag2或者tag3的消息
    tags: tag2||tag3
    bindings:
    input1:
    destination: test-topic
    group: test-group1
    input2:
    destination: test-topic
    group: test-group2
  • 消费代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Service
    @Slf4j
    public class MyTestStreamConsumer {
    /**
    * 我消费带有tag1的消息
    *
    * @param messageBody 消息体
    */
    @StreamListener(MySink.INPUT1)
    public void receive1(String messageBody) {
    log.info("带有tag1的消息被消费了:messageBody ={}", messageBody);
    }

    /**
    * 我消费带有tag1或者tag2的消息
    *
    * @param messageBody 消息体
    */
    @StreamListener(MySink.INPUT2)
    public void receive2(String messageBody) {
    log.info("带有tag2/tag3的消息被消费了:messageBody ={}", messageBody);
    }
    }
  • 日志:

    1
    2019-08-04 19:10:03.799  INFO 53760 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : 带有tag1的消息被消费了:messageBody =消息体

Sql 92

TIPS

  • 该方式只支持RoketMQ,不支持Kafka/RabbitMQ
  • 用了sql,就不要用Tag

RocketMQ支持使用SQL语法过滤消息。官方文档:http://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/

Spring Clous Stream RocketMQ也为此特性提供了支持。

开启SQL 92支持

默认情况下,RocketMQ的SQL过滤支持是关闭的,要想使用SQL 92过滤消息,需要:

  • conf/broker.conf 添加

    1
    enablePropertyFilter = true
  • 启动RocketMQ

    1
    nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

    生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Autowired
private Source source;

public String testStream() {
this.source.output()
.send(
MessageBuilder
.withPayload("消息体")
.setHeader("index", 1000)
.build()
);
return "success";
}

消费者

  • 接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public interface MySink {
    String INPUT1 = "input1";
    String INPUT2 = "input2";

    @Input(INPUT1)
    SubscribableChannel input();

    @Input(INPUT2)
    SubscribableChannel input2();
    }
  • 注解

    1
    @EnableBinding({MySink.class})
  • 配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    spring:
    cloud:
    stream:
    rocketmq:
    binder:
    name-server: 127.0.0.1:9876
    bindings:
    input1:
    consumer:
    sql: 'index < 1000'
    input2:
    consumer:
    sql: 'index >= 1000'
    bindings:
    input1:
    destination: test-topic
    group: test-group1
    input2:
    destination: test-topic
    group: test-group2
  • 消费代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Service
    @Slf4j
    public class MyTestStreamConsumer {
    /**
    * 我消费带有tag1的消息
    *
    * @param messageBody 消息体
    */
    @StreamListener(MySink.INPUT1)
    public void receive1(String messageBody) {
    log.info("index > 1000的消息被消费了:messageBody ={}", messageBody);
    }

    /**
    * 我消费带有tag1或者tag2的消息
    *
    * @param messageBody 消息体
    */
    @StreamListener(MySink.INPUT2)
    public void receive2(String messageBody) {
    log.info("index <=1000 的消息被消费了:messageBody ={}", messageBody);
    }
    }
  • 日志

    1
    2019-08-04 19:58:59.787  INFO 56375 --- [MessageThread_1] c.i.u.rocketmq.MyTestStreamConsumer      : index <=1000 的消息被消费了:messageBody =消息体

相关代码

org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties

参考文档

相关文章

评论系统未开启,无法评论!