打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
一文彻底读懂高性能消息组件Apache Pulsar

内容介绍

Pulsar介绍

Pulsar关键特性

Pulsar vs Kafka

Pulsar架构设计

Pulsar消息机制

Pulsar Schema

Pulsar Functions

Pulsar Connectors

Pulsar Deployment

Pulsar Admin

Pulsar Manager

Pulsar Flink

什么是Pulsar?

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。

Pulsar 的关键特性

  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  • 极低的发布延迟和端到端延迟。
  • 可无缝扩展到超过 一百万 个 topic。
  • 简单的客户端 API,支持 Java、Go、Python 和 C++。
  • 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

Pulsar vs Kafka

下方链接为Pulsar与 Kafka详细对比报告,可自行下载查看

https://streamnative.io/en/blog/tech/2020-07-08-pulsar-vs-kafka-part-1

https://streamnative.io/zh/blog/tech/2020-07-22-pulsar-vs-kafka-part-2

  • 性能与可用性

基准测试(StreamNative)

数据来源

https://mp.weixin.qq.com/s/UZJTOEpzX8foUJv9XMJxOw

https://streamnative.io/en/blog/tech/2020-11-09-benchmark-pulsar-kafka-performance

https://streamnative.io/whitepaper/benchmark-pulsar-vs-kafka

  • 吞吐量(Throughput)

在与 Kafka 的持久性保证相同的情况下,Pulsar 可达到 605 MB/s 的发布和端到端吞吐量(与 Kafka 相同)以及 3.5 GB/s 的 catch-up read 吞吐量(比 Kafka 高 3.5 倍)。Pulsar 的吞吐量不会因分区数量的增加和持久性级别的改变而受到影响,而 Kafka 的吞吐量会因分区数量或持久性级别的改变而受到严重影响。

  • 延迟性(Latency)

在不同的测试实例(包括不同订阅数量、不同主题数量和不同持久性保证)中,Pulsar 的延迟显著低于 Kafka。Pulsar P99 延迟在 5 到 15 毫秒之间。Kafka P99 延迟可能长达数秒,并且会因主题数量、订阅数量和不同持久性保证而受到巨大影响。

  • 功能性

  1. 多语言客户端(C/C++、Python、Java、Go ...)
  2. 管理工具(Pulsar Manager vs Kafka Manager)
  3. 内置流处理Built-In Stream Processing(Pulsar Function vs Kafka Streams)
  4. Rich Integrations (Pulsar Connectors)
  5. Exactly-Once Processing
  6. 日志压缩
  7. 多租户(Pulsar)
  8. 安全管理(Pulsar)

架构设计

Pulsar采用存储和计算分离的软件架构。在消息领域,Pulsar 是第一个将存储计算分离云原生架构落地的开源项目。由于在 Broker 层不存储任何数据,这种架构为用户带来了更高的可用性、更灵活的扩容和管理、避免数据的 reblance 和 catch-up。

在 Apache Pulsar 的分层架构中,服务层 Broker 和存储层 BookKeeper 的每个节点都是对等的。Broker 仅仅负责消息的服务支持,不存储数据。这为服务层和存储层提供了瞬时的节点扩展和无缝的失效恢复。

持久化存储(Persistent storage)

Pulsar 使用 BookKeeper 分布式日志存储数据库作为存储组件,在底层使用日志作为存储模型。

Pulsar 将所有未确认消息(即未处理消息)存储在 BookKeeper 中的多个“bookie”服务器上。

BookKeeper 通过 Quorum Vote 的方式来实现数据的一致性,跟 Master/Slave 模式不同,BookKeeper 中每个节点也是对等的,对一份数据会并发地同时写入指定数目的存储节点。

一个Topic实际上是一个ledgers流。Ledger本身就是一个日志。所以一系列的子日志(Ledgers)组成了一个父日志(Topic)。

Ledgers追加到一个Topic,条目(消息或者一组消息)追加到Ledgers。Ledger一旦关闭是不可变的。Ledger作为最小的删除单元,也就是说我们不能删除单个条目而是去删除整个Ledger。

Ledgers本身也被分解为多个Fragment。Fragment是BookKeeper集群中最小的分布单元。

每个Ledger(由一个或多个Fragment组成)可以跨多个BookKeeper节点(Bookies)进行复制,以实现数据容灾和提升读取性能。每个Fragment都在一组不同的Bookies中复制(存在足够的Bookies)。

conf/bookkeeper.conf

#############################################################################
## Server parameters
#############################################################################
# Directories BookKeeper outputs its write ahead log.
# Could define multi directories to store write head logs, separated by ','.
journalDirectories=/data/appData/pulsar/bookkeeper/journal
#############################################################################
## Ledger storage settings
#############################################################################
# Directory Bookkeeper outputs ledger snapshots
# could define multi directories to store snapshots, separated by ','
ledgerDirectories=/data/appData/pulsar/bookkeeper/ledgers

conf/broker.conf

### --- Managed Ledger --- ###
# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=2
# Number of copies to store for each message
managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2

元数据存储(Metadata storage)

Pulsar和BookKeeper都使用Apache Zookeeper来存储元数据和监控节点健康状况。

$PULSAR_HOME/bin/pulsar zookeeper-shell
> ls /
[admin, bookies, counters, ledgers, loadbalance, managed-ledgers, namespace, pulsar, schemas, stream, zookeeper]

消息机制

Pulsar 采用发布-订阅(pub-sub)的设计模式 。该设计模式中,producer 发布消息到 topic, Consumer 订阅 topic、处理发布的消息,并在处理完成后发送确认。

一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。在 consumer 确认消息已处理成功后,才会删除消息。

主题(Topic)

逻辑上一个Topic是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar使用游标来跟踪偏移量(Cursor Tracking)。

Pulsar 支持两种基本的 topic 类型:持久 topic 与非持久 topic。

{persistent|non-persistent}://tenant/namespace/topic
  • Non-Partitioned topics
$PULSAR_HOME/bin/pulsar-admin topics \
list public/default
$PULSAR_HOME/bin/pulsar-admin topics \
create persistent://public/default/input-seed-avro-topic
$PULSAR_HOME/bin/pulsar-admin topics \
lookup persistent://public/default/input-seed-avro-topic
$PULSAR_HOME/bin/pulsar-admin topics \
delete persistent://public/default/input-seed-avro-topic
$PULSAR_HOME/bin/pulsar-admin topics \
stats persistent://public/default/input-seed-avro-topic
$ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool

Partitioned topics

$PULSAR_HOME/bin/pulsar-admin topics \
create-partitioned-topic persistent://public/default/output-seed-avro-topic \
--partitions 2
$PULSAR_HOME/bin/pulsar-admin topics \
list-partitioned-topics public/default
$PULSAR_HOME/bin/pulsar-admin topics \
get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic
$PULSAR_HOME/bin/pulsar-admin topics \
delete-partitioned-topic persistent://public/default/output-seed-avro-topic

消息(Message)

Messages are the basic 'unit' of Pulsar.

public interface Message<T{
    Map<String, String> getProperties();
    boolean hasProperty(String var1);
    String getProperty(String var1);
    byte[] getData();
    getValue();
    MessageId getMessageId();
    long getPublishTime();
    long getEventTime();
    long getSequenceId();
    String getProducerName();
    boolean hasKey();
    String getKey();
    boolean hasBase64EncodedKey();
    byte[] getKeyBytes();
    boolean hasOrderingKey();
    byte[] getOrderingKey();
    String getTopicName();
    Optional<EncryptionContext> getEncryptionCtx();
    int getRedeliveryCount();
    byte[] getSchemaVersion();
    boolean isReplicated();
    String getReplicatedFrom();
}

生产者(Producer)

public void send() throws PulsarClientException {
    final String serviceUrl = 'pulsar://server-100:6650';
    // final String serviceUrl = 'pulsar://server-101:6650,server-102:6650,server-103:6650';
    // http://pulsar.apache.org/docs/en/client-libraries-java/#client
    final PulsarClient client = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .connectionTimeout(10000, TimeUnit.MILLISECONDS)
            .build();
    final String topic = 'persistent://public/default/topic-sensor-temp';
    // http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
    final Producer<byte[]> producer = client.newProducer()
            .producerName('sensor-temp')
            .topic(topic)
            .compressionType(CompressionType.LZ4)
            .enableChunking(true)
            .enableBatching(true)
            .batchingMaxBytes(1024)
            .batchingMaxMessages(10)
            .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
            .blockIfQueueFull(true)
            .maxPendingMessages(512)
            .sendTimeout(1, TimeUnit.SECONDS)
            .create();
    MessageId mid = producer.send('sensor-temp'.getBytes());
    System.out.printf('\nmessage with ID %s successfully sent', mid);
    mid = producer.newMessage()
            .key('sensor-temp-key')
            .value('sensor-temp-key'.getBytes())
            .property('my-key''my-value')
            .property('my-other-key''my-other-value')
            .send();
    System.out.printf('message-key with ID %s successfully sent', mid);
    producer.close();
    client.close();
}

消费者(Consumer)

public void consume() throws PulsarClientException {
    final String serviceUrl = 'pulsar://server-101:6650';
    final String topic = 'input-seed-avro-topic';
    final PulsarClient client = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .enableTcpNoDelay(true)
            .build();
    final Consumer<byte[]> consumer = client
            .newConsumer()
            .consumerName('seed-avro-consumer')
            .subscriptionName('seed-avro-subscription')
            .subscriptionType(SubscriptionType.Exclusive)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .topic(topic)
            .receiverQueueSize(10)
            .subscribe();
    final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);
    while (true) {
        try {
            final Message<byte[]> msg = consumer.receive();
            LOG.info('接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}',
                    Thread.currentThread().getId(),
                    msg.getTopicName(),
                    msg.getMessageId(),
                    msg.getSequenceId(),
                    msg.getEventTime(),
                    msg.getPublishTime(),
                    msg.getProducerName(),
                    msg.getKey(), schema.decode(msg.getValue()));
            try {
                consumer.acknowledge(msg);
            } catch (final PulsarClientException e) {
                consumer.negativeAcknowledge(msg);
                LOG.error('acknowledge:' + e.getLocalizedMessage(), e);
            }
        } catch (final PulsarClientException e) {
            LOG.error('receive:' + e.getLocalizedMessage(), e);
        }
    }
}

订阅(Subscriptions)

消费者通过订阅来消费Topic中的消息。订阅是游标(跟踪偏移量)的逻辑实体,一个Topic可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。

每个Subscription都存储一个Cursor。Cursor是日志中的当前偏移量。Subscription将其Cursor存储至BookKeeper的Ledger中。这使Cursor跟踪可以像Topic一样进行扩展。

订阅类型(subscription-type)

  • Exclusive 独享

一个订阅只能有一个消息者消费消息。

  • Failover 灾备

一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。


  • Shared 共享

一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。

  • Key_Shared

有序性保证(Ordering guarantee)

如果对顺序性有要求,可以使用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在消费,可以保证顺序性。

如果使用 Shared 订阅模式,多个 Consumer 可以并发消费同一个 Topic。通过动态增加 Consumer 的数量,可以加速 Topic 的消费,减少消息在服务端的堆积。

KeyShared 模式保证在 Shared 模式下同一个 Key 的消息也会发送到同一个 Consumer,在并发的同时也保证了顺序性。

多主题订阅(Multi-topic subscriptions)

Pattern:

  • persistent://public/default/.*
  • persistent://public/default/foo.*

Reader

public void read() throws IOException {
    final String serviceUrl = 'pulsar://server-101:6650';
    final PulsarClient client = PulsarClient.builder()
            .serviceUrl(serviceUrl)
            .build();
    // http://pulsar.apache.org/docs/en/client-libraries-java/#reader
    final Reader<byte[]> reader = client.newReader()
            .topic('my-topic')
            .startMessageId(MessageId.earliest()) // MessageId.latest
            .create();
    while (true
        final Message<byte[]> message = reader.readNext();
        System.out.println(new String(message.getData()));
    }
}

分片主题(Partitioned topics)

消息保留和过期(Message retention and expiry)

如果没有对Topic设置数据保留策略,一旦一个Topic的所有订阅的游标都已经成功消费到一个偏移量时,此偏移量前面的消息就会被自动删除。

如果Topic设置了数据保留策略,已经消费确认的消息超过保留策略阈值(Topic的消息存储大小、Topic中消息保留的时间)后会被删除。

conf/broker.conf

# Default message retention time
# 默认0, 修改为3天=60*24*3
defaultRetentionTimeInMinutes=4320
# Default retention size
# 默认为0, 修改为10G
defaultRetentionSizeInMB=10240
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0

retention policy (for a namespace)

$PULSAR_HOME/bin/pulsar-admin namespaces \
get-retention public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool
$PULSAR_HOME/bin/pulsar-admin namespaces \
set-retention public/default \
--size 1024M \
--time 5m
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \
--header 'Content-Type:application/json' \
--data '{
  'retentionTimeInMinutes' : 5,
  'retentionSizeInMB' : 1024
}'

message expiry / message-ttl

$PULSAR_HOME/bin/pulsar-admin namespaces \
get-message-ttl public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL
$PULSAR_HOME/bin/pulsar-admin namespaces \
set-message-ttl public/default \
--messageTTL 1800
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \
--header 'Content-Type:application/json' \
--data '1800'

Pulsar Schema

Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types to more complex application-specific types.

  • 类型安全(序列化和反序列化)
  • Schema 帮助 Pulsar 保留了数据在其他系统中原有的含义

Schema类型(Schema type)

  • Primitive type
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value('Hello Pulsar!').send();
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
consumer.receive();
  • Complex type

1. keyvalue key/value pair.

Schema<KeyValue<Integer, String>> schema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
// Producer
Producer<KeyValue<Integer, String>> producer = client.newProducer(schema)
    .topic(TOPIC)
    .create();
final int key = 100;
final String value = 'value-100';
producer.newMessage().value(new KeyValue<>(key, value)).send();
// Consumer
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(schema)
    .topic(TOPIC).subscriptionName(SubscriptionName).subscribe();
Message<KeyValue<Integer, String>> msg = consumer.receive();

2.struct AVRO, JSON, and Protobuf.

Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(User.builder().userName('pulsar-user').userId(1L).build()).send();
Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
User user = consumer.receive();

Schema工作方式(How does schema work)

Producer

Consumer

Schema管理(Schema manual management)

查询Schema

$PULSAR_HOME/bin/pulsar-admin schemas \
get persistent://public/default/spirit-avro-topic
$PULSAR_HOME/bin/pulsar-admin schemas \
get persistent://public/default/spirit-avro-topic \
--version=2

更新Schema

$PULSAR_HOME/bin/pulsar-admin schemas upload \
persistent://public/default/test-topic \
--filename $PULSAR_HOME/connectors/json-schema.json

提取Schema

$PULSAR_HOME/bin/pulsar-admin schemas \
extract persistent://public/default/test-topic  \
--classname com.cloudwise.modal.Packet \
--jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \
--type json
public void schemaInfo() {
    System.out.println('AvroSchema:' + AvroSchema.of(SeedEvent.class).getSchemaInfo());
    System.out.println('Schema.AVRO:' + Schema.AVRO(SeedEvent.class).getSchemaInfo());
}

删除Schema

$PULSAR_HOME/bin/pulsar-admin schemas \
delete persistent://public/default/spirit-avro-topic

Pulsar Functions

编程模型(Programming model)

开启Functions

  1. conf/bookkeeper.conf
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
  1. conf/broker.conf
functionsWorkerEnabled=true
  1. conf/functions_worker.yml
pulsarFunctionsCluster: pulsar-cluster
numFunctionPackageReplicas: 2

窗口(window)

  • windowLengthCount 每个窗口的消息数量
  • slidingIntervalCount 窗口滑动后的消息数量
  • windowLengthDurationMs 窗口时间
  • slidingIntervalDurationMs 窗口滑动后的时间

开窗函数

public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<StringVoid{
    @Override
    public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
        for (Record<String> input : inputs) {
        }
        return null;
    }
}

运行函数

  • 时间,滑动窗口

--user-config '{'windowLengthDurationMs':'60000', 'slidingIntervalDurationMs':'1000'}'

  • 时间,滚动窗口

--user-config '{'windowLengthDurationMs':'60000'}'

  • 数量,滑动窗口

--user-config '{'windowLengthCount':'100', 'slidingIntervalCount':'10'}'

  • 数量,滚动窗口

--user-config '{'windowLengthCount':'100'}'

Java编程

pom.xml

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>${pulsar.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-api</artifactId>
    <version>${pulsar.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-functions-local-runner</artifactId>
    <version>${pulsar.version}</version>
</dependency>
  1. WordCount
public class WordCountFunction implements org.apache.pulsar.functions.api.Function<StringVoid{
    @Override
    public Void process(String input, Context context) throws Exception {
        Arrays.asList(input.split(' ')).forEach(word -> {
            String counterKey = word.toLowerCase();
            if (context.getCounter(counterKey) == 0) {
                context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100)));
            }
            context.incrCounter(counterKey, 1);
        });
        return null;
    }
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--broker-service-url pulsar://server-101:6650 \
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
--classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \
--tenant public \
--namespace default \
--name word-count-function \
--inputs persistent://public/default/sentences \
--output persistent://public/default/wordcount
  1. 动态路由
/**
 * 基本思路是检查每条消息的内容,根据消息内容将消息路由到不同目的地。
 */

public class RoutingFunction implements org.apache.pulsar.functions.api.Function<StringString{
    @Override
    public String process(String input, Context context) throws Exception {
        String regex = context.getUserConfigValue('regex').toString();
        String matchedTopic = context.getUserConfigValue('matched-topic').toString();
        String unmatchedTopic = context.getUserConfigValue('unmatched-topic').toString();
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(input);
        if (matcher.matches()) {
            context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send();
        } else {
            context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send();
        }
        return null;
    }
}
  1. log-topic
public class LoggingFunction implements org.apache.pulsar.functions.api.Function<StringVoid{
    @Override
    public Void process(String s, Context context) throws Exception {
        Logger LOG = context.getLogger();
        String messageId = context.getFunctionId();
        if (s.contains('danger')) {
            LOG.warn('A warning was received in message {}', messageId);
        } else {
            LOG.info('Message {} received\nContent: {}', messageId, s);
        }
        return null;
    }
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--jar cloudwise-pulsar-functions-1.0.0.jar \
--classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs
  1. user-config
public class UserConfigFunction implements org.apache.pulsar.functions.api.Function<StringVoid{
    @Override
    public Void process(String s, Context context) throws Exception {
        Logger log = context.getLogger();
        Optional<Object> value = context.getUserConfigValue('word-of-the-day');
        if (value.isPresent()) {
            log.info('The word of the day is {}', value);
        } else {
            log.warn('No word of the day provided');
        }
        return null;
    }
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--broker-service-url pulsar://server-101:6650 \
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
--classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \
--tenant public \
--namespace default \
--name word-count-function \
--inputs persistent://public/default/userconfig \
--user-config '{'word-of-the-day':'verdure'}'

Pulsar Connectors

img

消息处理(Processing guarantee)

  • at-most-once
  • at-least-once
  • effectively-once

操作流程(JDBC sink)

  1. Add a configuration file.
  2. Create a schema.
  3. Upload a schema to a topic.
  4. Create a JDBC sink
  5. Stop a JDBC sink
  6. Restart a JDBC sink
  7. Update a JDBC sink

内建连接器(Built-in connector)

Source connector

  • Canal
  • File
  • Flume
  • Kafka
  • RabbitMQ

Sink connector

  • ElasticSearch/Solr
  • Flume
  • HBase
  • HDFS2/HDFS3
  • InfluxDB
  • JDBC ClickHouse/MariaDB/PostgreSQL
  • Kafka
  • MongoDB
  • RabbitMQ
  • Redis

ClickHouse Sink

  1. 创建表
CREATE DATABASE IF NOT EXISTS monitor;
CREATE TABLE IF NOT EXISTS monitor.pulsar_clickhouse_jdbc_sink
(
    id   UInt32,
    name String
ENGINE = TinyLog;
INSERT INTO monitor.pulsar_clickhouse_jdbc_sink (idname)
VALUES (1'tmp');
SELECT *
FROM monitor.pulsar_clickhouse_jdbc_sink;
  1. 创建配置
$ vi $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml
{
    'userName''sysop',
    'password''123456',
    'jdbcUrl''jdbc:clickhouse://server-101:8123/monitor',
    'tableName''pulsar_clickhouse_jdbc_sink'
}
  1. 创建schema
$ vi $PULSAR_HOME/connectors/json-schema.json
{
  'name''',
  'schema': {
    'type''record',
    'name''SeedEvent',
    'namespace''com.cloudwise.quickstart.model',
    'fields': [
      {
        'name''id',
        'type': [
          'null',
          'int'
        ]
      },
      {
        'name''name',
        'type': [
          'null',
          'string'
        ]
      }
    ]
  },
  'type''JSON',
  'properties': {
    '__alwaysAllowNull''true',
    '__jsr310ConversionEnabled''false'
  }
}
  1. 上传schema
$PULSAR_HOME/bin/pulsar-admin schemas upload \
pulsar-postgres-jdbc-sink-topic \
-f $PULSAR_HOME/connectors/json-schema.json
  1. 运行
$PULSAR_HOME/bin/pulsar-admin sinks create \
--tenant public \
--namespace default \
--name pulsar-clickhouse-jdbc-sink \
--inputs pulsar-clickhouse-jdbc-sink-topic \
--sink-config-file $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml \
--archive $PULSAR_HOME/connectors/pulsar-io-jdbc-clickhouse-2.6.2.nar \
--processing-guarantees EFFECTIVELY_ONCE \
--parallelism 1

Pulsar Deployment

目录结构

/opt/pulsar-2.6.2

├── bin

│ ├── bookkeeper

│ ├── function-localrunner

│ ├── proto

│ ├── pulsar

│ ├── pulsar-admin

│ ├── pulsar-admin-common.sh

│ ├── pulsar-client

│ ├── pulsar-daemon

│ ├── pulsar-managed-ledger-admin

│ └── pulsar-perf

├── conf

│ ├── bkenv.sh

│ ├── bookkeeper.conf

│ ├── broker.conf

│ ├── client.conf

│ ├── discovery.conf

│ ├── filesystem_offload_core_site.xml

│ ├── functions-logging

│ ├── functions_worker.yml

│ ├── global_zookeeper.conf

│ ├── log4j2-scripts

│ ├── log4j2.yaml

│ ├── presto

│ ├── proxy.conf

│ ├── pulsar_env.sh

│ ├── pulsar_tools_env.sh

│ ├── schema_example.conf

│ ├── standalone.conf

│ ├── websocket.conf

│ └── zookeeper.conf

├── examples

│ ├── api-examples.jar

│ ├── example-function-config.yaml

│ ├── example-window-function-config.yaml

│ └── python-examples

├── instances

│ ├── deps

│ ├── java-instance.jar

│ └── python-instance

├── lib

│ └── presto

├── LICENSE

├── licenses

├── NOTICE

└── README

单机(Standalone)

# 前台启动
$PULSAR_HOME/bin/pulsar standalone
# 后台启动
$PULSAR_HOME/bin/pulsar-daemon start standalone
$ jps | grep -v Jps
1873 PulsarStandaloneStarter
# 后台停止
$PULSAR_HOME/bin/pulsar-daemon stop standalone -force

集群(Cluster)

  1. 部署ZooKeeper集群
  2. 初始化集群元信息
  3. 部署BookKeeper集群
  4. 部署一个或多个PulsarBroker

客户端(Client)

# consumer
$PULSAR_HOME/bin/pulsar-client consume \
persistent://public/default/seed-avro-topic \
--subscription-name cli-pack-avro-subscription \
--subscription-type Exclusive \
--subscription-position Latest \
--num-messages 0
# producer
$PULSAR_HOME/bin/pulsar-client produce \
persistent://public/default/seed-avro-topic \
--num-produce 100 \
--messages 'Hello Pulsar' \
--separator ','

Pulsar Admin

API

  • pulsar-admin
  • REST API

源码:apache-pulsar-2.6.2-src/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/*.java

Bookies.java Namespaces.java Tenants.java

BrokerStats.java NonPersistentTopics.java Worker.java

Brokers.java PersistentTopics.java WorkerStats.java

Clusters.java ResourceQuotas.java

Functions.java SchemasResource.java

  • Java admin client
public void createNonPartitionedTopic() throws PulsarClientException {
    final String serviceHttpUrl = 'http://10.2.2.26:8080';
    final PulsarAdmin admin = PulsarAdmin.builder()
            .serviceHttpUrl(serviceHttpUrl)
            .build();
    try {
        final String namespace = 'public/monitor';
        List<String> topics = admin.topics().getList(namespace);
        topics.forEach(t -> System.err.println('before topic:' + t));
        // 以下几种写法是等效的
        // final String topic = 'input-3-seed-avro-topic';
        // final String topic = 'public/monitor/input-seed-avro-topic';
        final String topic = 'persistent://public/default/input-5-seed-avro-topic';
        if (topics.indexOf(topic) == -1) {
            admin.topics().createNonPartitionedTopic(topic);
            admin.schemas().createSchema(topic,
                    AvroSchema.of(SeedEvent.class).getSchemaInfo());
        }
        topics = admin.topics().getList(namespace);
        topics.forEach(t -> System.err.println('after topic:' + t));
        System.err.println('schema:' + admin.schemas().getSchemaInfo(topic));
    } catch (final PulsarAdminException e) {
        e.printStackTrace();
    }
    admin.close();
}

Manage Pulsar

  • Clusters
$PULSAR_HOME/bin/pulsar-admin clusters
  • Tenants
$PULSAR_HOME/bin/pulsar-admin tenants
  • Brokers
$PULSAR_HOME/bin/pulsar-admin brokers
  • Namespaces
$PULSAR_HOME/bin/pulsar-admin namespaces
  • Permissions
  • Persistent topics
  • Non-Persistent topics
  • Partitioned topics
  • Non-Partitioned topics
$PULSAR_HOME/bin/pulsar-admin topics
  • Schemas
$PULSAR_HOME/bin/pulsar-admin schemas
  • Functions
$PULSAR_HOME/bin/pulsar-admin functions

Pulsar Manager

http://pulsar.apache.org/docs/zh-CN/administration-pulsar-manager/

https://github.com/apache/pulsar-manager

WebUI

http://localhost:7750/ui/index.html

username/password: admin/123456

  • Environments
  • Management
  • Clusters
  • Tenants
  • Namespaces
  • Topics
  • Tokents

Pulsar Flink

https://github.com/streamnative/pulsar-flink

https://dl.bintray.com/streamnative/maven/io/streamnative/connectors/

pom.xml

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <flink.version>1.11.2</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
    <repository>
        <id>central</id>
        <layout>default</layout>
        <url>https://repo1.maven.org/maven2</url>
    </repository>
    <repository>
        <id>bintray</id>
        <name>bintray</name>
        <url>https://dl.bintray.com/streamnative/maven</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!--statebackend -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <!--pulsar -->
    <dependency>
        <groupId>io.streamnative.connectors</groupId>
        <artifactId>pulsar-flink-${scala.binary.version}-${flink.version}</artifactId>
        <version>2.5.4.1</version>
    </dependency>
    <!-- format -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency><!--  -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

FlinkPulsarSink

public class PulsarSinkJob {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkJob.class);
    public static SourceFunction<SeedEvent> getSeedSource() {
        final int interval = 5000;
        return new PeriodicEventSource<>(
                Integer.MAX_VALUE, interval, new PeriodicEventSource.Creator<SeedEvent>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Collection<SeedEvent> build(long i) {
                return Arrays.stream(new String[]{'TEM-A-01''HUM-A-01''PRS-A-01'})
                        .map(code -> {
                            final SeedEvent event = new SeedEvent(
                                    Instant.now().toEpochMilli(), code, Long.toString(i));
                            LOG.info('创建消息:[{}] {}', Thread.currentThread().getId(), event);
                            return event;
                        })
                        .collect(Collectors.toList());
            }
            @Override
            public Class<SeedEvent> clazz() {
                return SeedEvent.class;
            }
        });
    }
    public static FlinkPulsarSink<SeedEvent> getPulsarSink(ParameterTool params) {
        // String adminUrl = 'http://server-101:8080,server-102:8080,server-103:8080';
        final String serviceUrl = params.get('serviceUrl''pulsar://10.2.2.26:6650');
        final String adminUrl = params.get('adminUrl''http://10.2.2.26:8080');
        final String outputTopic = params.get('topic''output-seed-avro-topic');
        final String authPlugin = 'org.apache.pulsar.client.impl.auth.AuthenticationToken';
        final String authParams = params.get('authParams');
        final Properties props = new Properties();
        props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, 'true');
        props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, '5000');
        final ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(serviceUrl);
        clientConf.setConnectionTimeoutMs(6000);
        clientConf.setUseTcpNoDelay(true);
        if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
            clientConf.setUseTls(true);
            clientConf.setAuthPluginClassName(authPlugin);
            clientConf.setAuthParams(authParams);
        }
        final TopicKeyExtractor<SeedEvent> topicKeyExtractor = new TopicKeyExtractor<SeedEvent>() {
            private static final long serialVersionUID = 1L;
            @Override
            public byte[] serializeKey(SeedEvent element) {
                LOG.info('serializeKey:[{}] {}', Thread.currentThread().getId(), element);
                return element.getCode().getBytes();
            }
            @Override
            public String getTopic(SeedEvent element) {
                return null;
            }
        };
        final FlinkPulsarSink<SeedEvent> sink = new FlinkPulsarSink<>(
                adminUrl, Optional.of(outputTopic), clientConf, props, topicKeyExtractor, SeedEvent.class);
        return sink;
    }
    @SuppressWarnings('deprecation')
    public static void main(String[] args) {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(410000));
        env.setStateBackend(new RocksDBStateBackend(new FsStateBackend('file:///tmp/checkpoint/')));
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        // Job取消和故障时会保留Checkpoint数据, 以便根据实际需要恢复到指定的Checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 确保Checkpoint之间有至少500ms的间隔(Checkpoint最小间隔)
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 检查点必须在一分钟内完成, 否则被丢弃(Checkpoint的超时时间)
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getConfig().setGlobalJobParameters(params);
        // DataStream<SeedEvent> stream = env.fromCollection(getSeedEvents()).name('Collection');
        final DataStream<SeedEvent> stream = env.addSource(getSeedSource()).name('SourceFunction');
        final DataStream<SeedEvent> result = stream
                .keyBy(new KeySelector<SeedEvent, String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public String getKey(SeedEvent value) throws Exception {
                        return value.getCode();
                    }
                })
                .process(new KeyedProcessFunction<String, SeedEvent, SeedEvent>() {
                    private static final long serialVersionUID = 1L;
                    private Map<String, String> infos;
                    private transient ListState<String> state;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        LOG.info('open...');
                        this.state = getRuntimeContext().getListState(
                                new ListStateDescriptor<>('state', String.class));
                        this.infos = new HashMap<>();

                        this.infos.put('open', LocalDateTime.now().toString());
                    }
                    @Override
                    public void close() throws Exception {
                        LOG.info('close...');
                    }
                    @Override
                    public void processElement(SeedEvent value,
                                               KeyedProcessFunction<String, SeedEvent, SeedEvent>.Context ctx, Collector<SeedEvent> out)

                            throws Exception 
{
                        LOG.info('processElement...');
                        final StringBuffer buffer = new StringBuffer();
                        this.state.get().forEach(t -> buffer.append(t));
                        LOG.info('CurrentKey:{} Input:{} State:{} Infos:{}',
                                ctx.getCurrentKey(), value, buffer, this.infos);
                        value.setPayload('[Prev]' + value.getPayload());
                        this.state.clear();
                        this.state.add(value.toString());
                        out.collect(value);
                    }
                })
                .setParallelism(1);
        result
                .print()
                .setParallelism(1);
        result
                .addSink(getPulsarSink(params))
                .name('FlinkPulsarSink')
                .setParallelism(2);
        LOG.info('ExecutionPlan:{}', env.getExecutionPlan());
        try {
            env.execute('PulsarSinkJob');
        } catch (final Exception e) {
            e.printStackTrace();
        }
    }
}

FlinkPulsarSource

public class PulsarSourceJob {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceJob.class);
    public static FlinkPulsarSource<SeedEvent> getPulsarSource(ParameterTool params) {
        // String serviceUrl = 'pulsar://server-101:6650,server-102:6650,server-103:6650';
        // String adminUrl = 'http://server-101:8080,server-102:8080,server-103:8080';
        // final String serviceUrl = params.get('serviceUrl', 'pulsar://server-101:6650');
        // final String adminUrl = params.get('adminUrl', 'http://server-101:8080');
        final String serviceUrl = params.get('serviceUrl''pulsar://10.2.2.26:6650');
        final String adminUrl = params.get('adminUrl''http://10.2.2.26:8080');
        // final String inputTopic = params.get('topic', 'input-1-seed-avro-topic');
        // final String subscription = params.get('subscription', 'seed-subscription');
        final String inputTopics = params.get('topic''persistent://public/yang11/zlp.gjsjbz.gjbzcd3');
        // final String inputTopics = params.get('topic', 'public/monitor/input-0-seed-avro-topic');
        final String subscription = params.get('subscription''mutil-seed-subscription');
        // final String inputTopicPatten = params.get('topicPatten', 'input-1-seed-avro-topic');
        // final String subscription = params.get('subscription', 'patten-seed-subscription');
        final String authPlugin = 'org.apache.pulsar.client.impl.auth.AuthenticationToken';
        final String authParams = params.get('authParams');
        final Properties props = new Properties();
        // http://pulsar.apache.org/docs/en/client-libraries-java/#reader
        props.setProperty(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX + 'receiverQueueSize''2000');
        // props.setProperty(PulsarOptions.TOPIC_SINGLE_OPTION_KEY, inputTopic);
        props.setProperty(PulsarOptions.TOPIC_MULTI_OPTION_KEY, inputTopics);
        // props.setProperty(PulsarOptions.TOPIC_PATTERN_OPTION_KEY, inputTopicPatten);
        // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPartitionDiscoveryIntervalInMillis
        props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, '5000'); // 自动发现topic时间间隔,默认-1
        // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getClientCacheSize
        props.setProperty(PulsarOptions.CLIENT_CACHE_SIZE_OPTION_KEY, '5');
        // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.flushOnCheckpoint
        props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, 'true');
        // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.failOnWrite
        props.setProperty(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, 'false');
        // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPollTimeoutMs
        props.setProperty(PulsarOptions.POLL_TIMEOUT_MS_OPTION_KEY, '120000');
        // org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher
        props.setProperty(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, subscription);
        // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getCommitMaxRetries
        props.setProperty(PulsarOptions.COMMIT_MAX_RETRIES, '3');
        // org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.PulsarFetcher
        props.setProperty(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY, 'false');
        final ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(serviceUrl);
        clientConf.setConnectionTimeoutMs(6000);
        if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
            clientConf.setUseTls(true);
            clientConf.setAuthPluginClassName(authPlugin);
            clientConf.setAuthParams(authParams);
        }
        PulsarDeserializationSchema<SeedEvent> deserializer = null;
        deserializer = new PulsarDeserializationSchemaWrapper<>(AvroDeser.of(SeedEvent.class));
        deserializer = new PulsarDeserializationSchema<SeedEvent>() {
            private static final long serialVersionUID = 1L;
            private final DeserializationSchema<SeedEvent> schema = AvroDeser.of(SeedEvent.class);
            public void open(DeserializationSchema.InitializationContext context) throws Exception {
                this.schema.open(context);
            }
            @Override
            public TypeInformation<SeedEvent> getProducedType() {
                return this.schema.getProducedType();
            }
            @Override
            public boolean isEndOfStream(SeedEvent nextElement) {
                return this.schema.isEndOfStream(nextElement);
            }
            @Override
            public SeedEvent deserialize(@SuppressWarnings('rawtypes') Message message) throws IOException {
                LOG.info('{}'new String(message.getData()));
                final SeedEvent value = new SeedEvent();
                // final SeedEvent value = this.schema.deserialize(message.getData());
                LOG.info('接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}',
                        Thread.currentThread().getId(),
                        message.getTopicName(),
                        message.getMessageId(),
                        message.getSequenceId(),
                        message.getEventTime(),
                        message.getPublishTime(),
                        message.getProducerName(),
                        message.getKey(), value);
                return value;
            }
        };
        final FlinkPulsarSource<SeedEvent> source = new FlinkPulsarSource<>(
                adminUrl, clientConf, deserializer, props);
        source.setStartFromEarliest();
        // source.setStartFromSubscription(subscription);
        // source.setStartFromLatest();
        return source;
    }
    @SuppressWarnings('deprecation')
    public static void main(String[] args) {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setStateBackend(new RocksDBStateBackend(new FsStateBackend('file:///tmp/checkpoint/')));
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(410000));
        // 必须开启Checkpoint, 才能从上一次未消费处开始消费, 否则从头开始消费
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        final DataStream<String> stream = env
                .addSource(getPulsarSource(params))
                .name('FlinkPulsarSource')
                .uid('PulsarSource')
                .setParallelism(1)
                .map(new MapFunction<SeedEvent, String>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public String map(SeedEvent value) throws Exception {
                        return '[SourceJob]' + value;
                    }
                });
        stream
                .print()
                .name('[Print]')
                .uid('PrintSink')
                .setParallelism(1);
        try {
            env.execute('PulsarSourceJob');
        } catch (final Exception e) {
            e.printStackTrace();
        }
    }
}

讲师介绍:Larry Zhang (张磊) ,云智慧服务工程部-服务效能二部运维开发工程师,致力于云智慧开源项目运维管理平台(OMP)的产品研发,支撑内部交付体系快速部署监控产品,拥有丰富的运维管理平台和PaaS平台设计开发经验。

(版权归原作者所有,侵删)


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Java: 使用pulsar-flink-connector读取pulsar catalog元数据
博文推荐|借助 Flink 与 Pulsar,BIGO 打造实时消息处理系统
三万字 | Kafka 知识体系保姆级教程宝典
Flink学习笔记:Connectors之kafka
Flink自定义 Sink 函数从kafka往kudu写数据
Apache Kafka 迎来了“后浪”
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服