sharetwitterlinkedIn

零经验玩转隔离策略:多个 Pulsar 集群

June 3, 2021
head img

本文是「Pulsar 傻瓜手册」,手把手为零经验小白排坑避雷。只要按照指引操作,成功率 100%!

本文是《Pulsar 隔离策略系列》的第 2 篇,该系列的第 1 篇博客 — 《深度解析如何在 Pulsar 中实现隔离》— 重点介绍了 Pulsar 隔离策略以及如何通过以下 3 种方式实现资源隔离:

  • 多个 Pulsar 集群
  • 共享 BookKeeper 集群
  • 单个 Pulsar 集群

本文将详细讲解第 1 种方式,即如何在「多个 Pulsar 集群」的环境中,玩转 Pulsar 隔离策略,并为以下操作提供详细步骤:

  1. 验证数据隔离。
  2. 同步并迁移集群数据。
  3. 扩缩容节点。

准备环境

本文以在 macOS(版本 11.2.3, 内存 8G)上操作为示例。

如下图所示,本文部署 2 个 Pulsar 集群,每个集群提供以下服务:

  • 1 个 ZooKeeper 节点
  • 1 个 bookie 节点
  • 1 个 broker 节点


软件要求

  • Java 8

环境详情

下文即将部署 2 个 Pulsar 集群,组件详情如下表所示。

Pulsar Cluster 1

Pulsar Cluster 2

ZooKeeper1

localhost:2181

web-service-url localhost:8080

web-service-url-tls localhost:8443

broker-service-url localhost:6650

broker-service-url-tls localhost:6651

ZooKeeper1

localhost:2186 

web-service-url localhost:8081

web-service-url-tls localhost:8444

broker-service-url localhost:6660

broker-service-url-tls localhost:6661

Bookie1

prometheusStatsHttpPort=8002

httpServerPort=8002

(The default value of bookiePort is 3181, no need for manual configuration.)

Bookie1

prometheusStatsHttpPort=8003

httpServerPort=8003

bookiePort=3182

Broker1

zookeeperServers=127.0.0.1:2181

Broker1

zookeeperServers=127.0.0.1:2186

configurationStoreServers=127.0.0.1:2184

部署准备

  1. 下载 Pulsar 并解压。

    本文以安装 Pulsar 2.7.0 为例。

  2. 在本地任意位置,按照以下目录结构,创建相应的空文件夹。
```
|-separate-clusters
    |-configuration-store
        |-zk1
    |-cluster1
        |-zk1
        |-bk1
        |-broker1
    |-cluster2
        |-zk1
        |-bk1
        |-broker1
```
  1. 复制解压后 Pulsar 文件夹中的内容至上一步创建的每个文件夹中。
  2. 启动 configuration store.

    Configuration store 为 Pulsar 实例提供跨集群配置管理和任务协调。Pulsar 集群 1 和 Pulsar 集群 2 共享 configuration store。

    cd configuration-store/zk1
    
    bin/pulsar-daemon start configuration-store
    

部署 Pulsar 集群 1

  1. 启动 local ZooKeeper.

    为每个 Pulsar 集群部署 1 个 local ZooKeeper,它负责为该集群管理配置和协调任务。

    cd cluster1/zk1
    
    bin/pulsar-daemon start zookeeper
    
  2. 初始化元数据.

    设置好 configuration store 和 local ZooKeeper 后,你需要向 ZooKeeper 写入元数据。

    cd cluster1/zk1
    
    bin/pulsar initialize-cluster-metadata \
      --cluster cluster1 \
      --zookeeper localhost:2181 \
      --configuration-store localhost:2184 \
      --web-service-url http://localhost:8080/ \
      --web-service-url-tls https://localhost:8443/ \
      --broker-service-url pulsar://localhost:6650/ \
      --broker-service-url-tls pulsar+ssl://localhost:6651/
    
  3. 部署 BookKeeper.

    BookKeeper 为 Pulsar 提供消息持久化存储。每个 Pulsar broker 拥有 bookie。BookKeeper 集群和 Pulsar 集群共享 local ZooKeeper。

    (1) 配置 bookie。

    更改 cluster1/bk1/conf/bookkeeper.conf文件中以下配置项的值。

    allowLoopback=true
    prometheusStatsHttpPort=8002
    httpServerPort=8002
    

    (2) 启动 bookie。

    cd cluster1/bk1
    
    bin/pulsar-daemon start bookie
    

    验证是否成功启动 bookie。

    bin/bookkeeper shell bookiesanity
    

    输出

    Bookie sanity test succeeded
    
  4. 部署 broker。

    (1) 配置 broker。

    更改 cluster1/broker1/conf/broker.conf 文件中以下配置项的值。

    zookeeperServers=127.0.0.1:2181
    configurationStoreServers=127.0.0.1:2184
    clusterName=cluster1
    managedLedgerDefaultEnsembleSize=1
    managedLedgerDefaultWriteQuorum=1
    managedLedgerDefaultAckQuorum=1
    

    (2) 启动 broker。

    cd cluster1/broker1
    
    bin/pulsar-daemon start broker
    

部署 Pulsar 集群 2

  1. 部署 local ZooKeeper。

    (1) 配置 local ZooKeeper。

    • 更改 cluster2/zk1/conf/zookeeper.conf 文件中以下配置项的值。

      clientPort=2186
      admin.serverPort=9992
      
    • 将以下配置项添加至 cluster2/zk1/conf/pulsar_env.sh 文件。

      OPTS="-Dstats_server_port=8011"
      

    (2) 启动 local ZooKeeper。

    cd cluster2/zk1
    
    bin/pulsar-daemon start zookeeper
    
  2. 初始化元数据。

    bin/pulsar initialize-cluster-metadata \
      --cluster cluster2 \
      --zookeeper localhost:2186 \
      --configuration-store localhost:2184 \
      --web-service-url http://localhost:8081/ \
      --web-service-url-tls https://localhost:8444/ \
      --broker-service-url pulsar://localhost:6660/ \
      --broker-service-url-tls pulsar+ssl://localhost:6661/
    
  3. 部署 BookKeeper。

    (1) 配置 bookie。

    更改 cluster2/bk1/conf/bookkeeper.conf 文件中以下配置项的值。

    bookiePort=3182
    zkServers=localhost:2186
    allowLoopback=true
    prometheusStatsHttpPort=8003
    httpServerPort=8003
    

    (2) 启动 bookie。

    cd cluster2/bk1
    
    bin/pulsar-daemon start bookie
    

    验证是否成功启动 bookie。

    bin/bookkeeper shell bookiesanity
    

    输出

    Bookie sanity test succeeded
    
  4. 部署 broker。

    (1) 配置 broker。

    • 更改 cluster2/broker1/conf/broker.conf 文件中以下配置项的值。

      clusterName=cluster2
      zookeeperServers=127.0.0.1:2186
      configurationStoreServers=127.0.0.1:2184
      brokerServicePort=6660
      webServicePort=8081
      managedLedgerDefaultEnsembleSize=1
      managedLedgerDefaultWriteQuorum=1
      managedLedgerDefaultAckQuorum=1
      
    • 更改 cluster2/broker1/conf/client.conf 文件中以下配置项的值。

      webServiceUrl=http://localhost:8081/
      brokerServiceUrl=pulsar://localhost:6660/
      

    (2) 启动 broker。

    cd cluster2/broker1
    
    bin/pulsar-daemon start broker
    

验证数据隔离

本章验证 2 个 Pulsar 集群中的数据是否隔离。

  1. 创建 namespace1,并将 namespace1 分配给 cluster1。

    提示:namespace 的命名规则是 /。更多关于 namespace 的信息,参阅这里

```
cd cluster1/broker1

bin/pulsar-admin namespaces create -c cluster1 public/namespace1
```

验证结果



```
bin/pulsar-admin namespaces list public
```

**输出**

```
"public/default"
"public/namespace1"
```
  1. 设置 namespace1 的消息保留策略。

    注意:如果不设置消息保留策略且 topic 未被订阅,一段时间后,topic 的数据会被自动清理。

    bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/namespace1
    
  2. 在 namespace1 创建 topic1,并使用写入 1000 条数据。

    提示:pulsar-client 是发送和消费数据的命令行工具。更多关于 Pulsar 命令行工具的信息,参阅这里

```
bin/pulsar-client produce -m 'hello c1 to c2' -n 1000 public/namespace1/topic1

09:56:34.504 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1000 messages successfully produced
```

验证结果

```
bin/pulsar-admin --admin-url http://localhost:8080 topics stats-internal public/namespace1/topic1
```

**输出**

`entriesAddedCounter` 显示增加了 1000 条数据。

```
{
  "entriesAddedCounter" : 1000,
  "numberOfEntries" : 1000,
  "totalSize" : 65616,
  "currentLedgerEntries" : 1000,
  "currentLedgerSize" : 65616,
  "lastLedgerCreatedTimestamp" : "2021-04-22T10:24:00.582+08:00",
  "waitingCursorsCount" : 0,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "4:999",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 4,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : { },
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false
  }
}
```
  1. 通过 cluster2(localhost:8081)查看 public/namespace1/topic1 的数据。

    bin/pulsar-admin --admin-url http://localhost:8081 topics stats-internal public/namespace1/topic1
    

    输出

    查看失败。打印信息显示 public/namespace1 仅分配至 cluster1,未分配至 cluster 2。此时验证了数据已隔离。

    Namespace missing local cluster name in clusters list: local_cluster=cluster2 ns=public/namespace1 clusters=[cluster1]
    
    Reason: Namespace missing local cluster name in clusters list: local_cluster=cluster2 ns=public/namespace1 clusters=[cluster1]
    
  2. 在 cluster2 中,向 public/namespace1/topic1 写入数据。

    cd cluster2/broker1
    
    bin/pulsar-client produce -m 'hello c1 to c2' -n 1000 public/namespace1/topic1
    

    输出

    结果显示写入消息数量为 0,操作失败,因为 namespace1 仅分配至 cluster1,未分配至 cluster 2。此时验证了数据已隔离。

    12:09:50.005 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 0 messages successfully produced
    

迁移数据

以下步骤接着前文「验证数据隔离」继续操作。

确认数据已隔离后,本章讲述如何同步(使用跨地域复制功能)并迁移集群数据。

  1. 分配 namespace1 至 cluster2,即添加 cluster2 至 namesapce1 的 cluster 列表。

    该步骤启用跨地域复制功能,同步 cluster1 和 cluster2 的数据。

```
bin/pulsar-admin namespaces set-clusters --clusters cluster1,cluster2 public/namespace1
```

验证结果

```
bin/pulsar-admin namespaces get-clusters public/namespace1
```

**输出**

```
"cluster1"
"cluster2"
```
  1. 查看 cluster2 是否有 topic1。
```
bin/pulsar-admin --admin-url http://localhost:8081 topics stats-internal public/namespace1/topic1
```

**输出**

结果显示 cluster2 的 topic1 有 1000 条数据,说明 cluster1 的 topic1 数据已成功复制至 cluster2。

```
{
  "entriesAddedCounter" : 1000,
  "numberOfEntries" : 1000,
  "totalSize" : 75616,
  "currentLedgerEntries" : 1000,
  "currentLedgerSize" : 75616,
  "lastLedgerCreatedTimestamp" : "2021-04-23T12:02:52.929+08:00",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "1:999",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 1,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "pulsar.repl.cluster1" : {
      "markDeletePosition" : "1:999",
      "readPosition" : "1:1000",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 1000,
      "cursorLedger" : 2,
      "cursorLedgerLastEntry" : 2,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-04-23T12:02:53.248+08:00",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  },
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false
  }
}
```
  1. 迁移 cluster1 的 producer 和 consumer 至 cluster2。

    PulsarClient pulsarClient1 = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
    // migrate the client to cluster2 pulsar://localhost:6660
    PulsarClient pulsarClient2 = PulsarClient.builder().serviceUrl("pulsar://localhost:6660").build();
    
  2. 从 namespace1 的 cluster 列表中移除 cluster1 。

    bin/pulsar-admin namespaces set-clusters --clusters cluster2 public/namespace1
    

    检查数据是否存储在 cluster1/topic1。

```
cd cluster1/broker1

bin/pulsar-admin --admin-url http://localhost:8080 topics stats-internal public/namespace1/topic1
```

**输出**

结果显示数据为空,说明数据已从 cluster1 的 topic1 中成功移除。

```
{
  "entriesAddedCounter" : 0,
  "numberOfEntries" : 0,
  "totalSize" : 0,
  "currentLedgerEntries" : 0,
  "currentLedgerSize" : 0,
  "lastLedgerCreatedTimestamp" : "2021-04-23T15:20:08.1+08:00",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "3:-1",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 3,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "pulsar.repl.cluster2" : {
      "markDeletePosition" : "3:-1",
      "readPosition" : "3:0",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 0,
      "cursorLedger" : 4,
      "cursorLedgerLastEntry" : 0,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-04-23T15:20:08.122+08:00",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  },
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false
  }
}
```

至此,我们已将 cluster1 的 topic1 数据成功复制至 cluster2,之后移除了 cluster1 的 topic1 数据。

扩缩容节点

本章讲述如何扩缩容 broker 和 bookie 节点。

Broker

增加 broker 节点

本示例在 cluster1/broker1 创建 2 个 partitioned topic,再增加 2 个 broker 节点。之后卸载 partitioned topic 数据,并查看数据在 3 个 broker 之间的分配情况。

  1. 查看 cluster1 的 broker 信息。

    cd/cluster1/broker1
    
    bin/pulsar-admin brokers list cluster1
    

    输出

    结果说明当前 cluster1 只有 broker1。

    "192.168.0.105:8080"
    
  2. 在 cluster1/broker1 上创建 2 个 partitioned topic。

    为 partitioned-topic1 创建 6 个分区,为 partitioned-topic2 创建 7 个分区。

    bin/pulsar-admin topics create-partitioned-topic -p 6 public/namespace1/partitioned-topic1
    
    bin/pulsar-admin topics create-partitioned-topic -p 7 public/namespace1/partitioned-topic2
    

    查看结果。

    bin/pulsar-admin topics partitioned-lookup public/namespace1/partitioned-topic1
    

    输出

    结果显示,partitioned-topic1 所有数据属于 broker1。

    "persistent://public/namespace1/partitioned-topic1-partition-0    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic1-partition-1    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic1-partition-2    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic1-partition-3    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic1-partition-4    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic1-partition-5    pulsar://192.168.0.105:6650"
    

    输入

    bin/pulsar-admin topics partitioned-lookup public/namespace1/partitioned-topic2
    

    输出

    结果显示,partitioned-topic2 所有数据属于 broker1。

    "persistent://public/namespace1/partitioned-topic2-partition-0    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic2-partition-1    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic2-partition-2    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic2-partition-3    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic2-partition-4    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic2-partition-5    pulsar://192.168.0.105:6650"
    "persistent://public/namespace1/partitioned-topic2-partition-6    pulsar://192.168.0.105:6650"
    
  3. 新增 2 个 broker 节点:broker2 和 broker3。

    (1) 部署准备。

    在 cluster1 文件夹中创建 broker2 和 broker3 文件夹,复制解压后 Pulsar 文件夹中的内容至 broker2 和 broker3 文件夹。

    |-separate-clusters
        |-configuration-store
            |-zk1
        |-cluster1
            |-zk1
            |-bk1
            |-broker1
            |-broker2
            |-broker3
        |-cluster2
            |-zk1
            |-bk1
            |-broker1
    

    (2) 部署 broker。

    (2.a) 配置 broker。

    配置 broker2 配置 broker3
    更改cluster1/broker2/conf/broker.conf文件中以下配置项的值。 更改cluster1/broker3/conf/broker.conf文件中以下配置项的值。
    brokerServicePort=6652
    webServicePort=8082
    zookeeperServers=127.0.0.1:2181
    configurationStoreServers=127.0.0.1:2184
    clusterName=cluster1
    managedLedgerDefaultEnsembleSize=1
    managedLedgerDefaultWriteQuorum=1
    managedLedgerDefaultAckQuorum=1
    brokerServicePort=6653
    webServicePort=8083
    zookeeperServers=127.0.0.1:2181
    configurationStoreServers=127.0.0.1:2184
    clusterName=cluster1
    managedLedgerDefaultEnsembleSize=1
    managedLedgerDefaultWriteQuorum=1
    managedLedgerDefaultAckQuorum=1

    (2.b) 启动 broker。

    启动 broker2 启动 broker3
    输入
    cd cluster1/broker2
    bin/pulsar-daemon start broker
    输入
    cd cluster1/broker3
    bin/pulsar-daemon start broker

    (2.c) 查看 cluster1 中已启动的 broker。

```
bin/pulsar-admin brokers list cluster1
```

**输出**

```
"192.168.0.105:8080" // broker1
"192.168.0.105:8082" // broker2
"192.168.0.105:8083" // broker3
```
  1. 卸载 namespace 1 中 partitioned-topic1 的数据。
```
bin/pulsar-admin namespaces unload public/namespace1
```

验证结果。

(1) 查看 partitioned-topic1 的数据分配情况。



```
bin/pulsar-admin topics partitioned-lookup public/namespace1/partitioned-topic1
```

**输出**

结果显示,partitioned-topic1的数据平均分配在 broker1、broker2 和 broker3 上。

```
"persistent://public/namespace1/partitioned-topic1-partition-0    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic1-partition-1    pulsar://192.168.0.105:6653"
"persistent://public/namespace1/partitioned-topic1-partition-2    pulsar://192.168.0.105:6652"
"persistent://public/namespace1/partitioned-topic1-partition-3    pulsar://192.168.0.105:6653"
"persistent://public/namespace1/partitioned-topic1-partition-4    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic1-partition-5    pulsar://192.168.0.105:6653"
```
  
(2)  查看 partitioned-topic2 的数据分配情况。


```
bin/pulsar-admin topics partitioned-lookup public/namespace1/partitioned-topic2
```

结果显示,partitioned-topic2 的数据平均分配在 broker1、broker2 和 broker3 上。

**输出**

```
"persistent://public/namespace1/partitioned-topic2-partition-0    pulsar://192.168.0.105:6653"
"persistent://public/namespace1/partitioned-topic2-partition-1    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic2-partition-2    pulsar://192.168.0.105:6653"
"persistent://public/namespace1/partitioned-topic2-partition-3    pulsar://192.168.0.105:6652"
"persistent://public/namespace1/partitioned-topic2-partition-4    pulsar://192.168.0.105:6653"
"persistent://public/namespace1/partitioned-topic2-partition-5    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic2-partition-6    pulsar://192.168.0.105:6653"
```

减少 broker 节点

以下步骤接着前文「如何增加 broker 节点」继续操作。

本示例在 cluster1 中减少 1 个 broker 节点,并查看 partitioned topic 数据在其余 2 个 broker 之间的分配情况。

  1. 1.减少 1 个 broker 节点,即停止 broker3。

    cd/cluster1/broker3
    
    bin/pulsar-daemon stop broker
    

    验证结果。

    bin/pulsar-admin brokers list cluster1
    

    输出

    结果显示,当前 cluster1 仅启动了 broker1 和 broker2。

    "192.168.0.105:8080" // broker1
    "192.168.0.105:8082" // broker2
    
  2. 查看 partitioned-topic1 数据的分配情况。
```
bin/pulsar-admin topics partitioned-lookup public/namespace1/partitioned-topic1
```

**输出**

结果显示,partitioned-topic1 数据平均分配至 broker1 和 broker2,即原属于 broker3 的数据已被重新平均分配至 broker1 和 broker2。

```
"persistent://public/namespace1/partitioned-topic1-partition-0    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic1-partition-1    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic1-partition-2    pulsar://192.168.0.105:6652"
"persistent://public/namespace1/partitioned-topic1-partition-3    pulsar://192.168.0.105:6652"
"persistent://public/namespace1/partitioned-topic1-partition-4    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic1-partition-5    pulsar://192.168.0.105:6650"
```

同理,partitioned-topic2 的数据也被平均分配至 broker1 和 broker2。


```
bin/pulsar-admin topics partitioned-lookup public/namespace1/partitioned-topic2
```

**输出**

```
"persistent://public/namespace1/partitioned-topic2-partition-0    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic2-partition-1    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic2-partition-2    pulsar://192.168.0.105:6652"
"persistent://public/namespace1/partitioned-topic2-partition-3    pulsar://192.168.0.105:6652"
"persistent://public/namespace1/partitioned-topic2-partition-4    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic2-partition-5    pulsar://192.168.0.105:6650"
"persistent://public/namespace1/partitioned-topic2-partition-6    pulsar://192.168.0.105:6652"
```

Bookie

增加 bookie 节点

本示例在 cluster1/bookkeeper 1 已有 bookie1,增加 2 个 bookie 节点后,向 topic1 写入数据,并查看数据是否保存了多个副本。

  1. 查看 cluster1 的 bookie 信息。
```
cd cluster1/bk1

bin/bookkeeper shell listbookies -rw -h
```

**输出**

结果说明当前 cluster1 只有 bookie1。

```
12:31:34.933 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - ReadWrite Bookies :
12:31:34.946 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3181, IP:192.168.0.105, Port:3181, Hostname:192.168.0.105
```
  1. 允许 3 个 bookie 节点服务。

    更改 cluster1/broker1/conf/broker.conf 文件中以下配置项的值。

    managedLedgerDefaultEnsembleSize=3 // specify the number of bookies to use when creating a ledger
    managedLedgerDefaultWriteQuorum=3 // specify the number of copies to store for each message
    managedLedgerDefaultAckQuorum=2  // specify the number of guaranteed copies (acks to wait before writing is completed) 
    
  2. 重启 broker1,使配置生效。
```
cd cluster1/broker1

bin/pulsar-daemon stop broker

bin/pulsar-daemon start broker
```
  1. 设置 public/default 的消息保留策略。

    注意:如果不设置消息保留策略且 topic 未被订阅,一段时间后,topic 的数据会被自动清理。

```
cd cluster1/broker1

bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
```
  1. 在 public/default 创建 topic1,并写入 100 条数据。
```
bin/pulsar-client produce -m 'hello' -n 100 topic1
```

**输出**

结果显示 bookie 节点数量不足导致数据写入失败。

```
···

12:40:38.886 [pulsar-client-io-1-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0x56f92aff, L:/192.168.0.105:53069 - R:/192.168.0.105:6650] Received error from server: org.apache.bookkeeper.mledger.ManagedLedgerException: Not enough non-faulty bookies available

...

12:40:38.886 [main] ERROR org.apache.pulsar.client.cli.PulsarClientTool - Error while producing messages

…

12:40:38.890 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 0 messages successfully produced
```
  1. 新增 2 个 bookie 节点:bookie2 和 bookie3。

    (1) 部署准备。

    在 cluster1 中新增 bk2 和 bk3 文件夹,复制解压后 Pulsar 文件夹中的内容至 bk2 和 bk3 文件夹。

    |-separate-clusters
        |-configuration-store
            |-zk1
        |-cluster1
            |-zk1
            |-bk1
            |-bk2
            |-bk3
            |-broker1
        |-cluster2
            |-zk1
            |-bk1
            |-broker1
    

    (2) 部署 bookie。

    (2.a) 配置 bookie。

    配置 bookie2 配置 bookie3
    更改cluster1/bk2/conf/bookkeeper.conf文件中以下配置项的值。 更改cluster1/bk3/conf/bookkeeper.conf文件中以下配置项的值。
    allowLoopback=true
    bookiePort=3183
    prometheusStatsHttpPort=8004
    httpServerPort=8004
    allowLoopback=true
    bookiePort=3184
    prometheusStatsHttpPort=8005
    httpServerPort=8005

    (2.b) 启动 bookie。

    启动 bookie2 启动 bookie3
    输入
    cd cd cluster1/bk2
    bin/pulsar-daemon start bookie
    bin/bookkeeper shell bookiesanity
    输入
    cd cluster1/bk3
    bin/pulsar-daemon start bookie
    bin/bookkeeper shell bookiesanity

    (2.c) 检查 cluster1 中已启动的 bookie。

```
bin/bookkeeper shell listbookies -rw -h
```

**输出**

结果显示 cluster 1 中已启动 3 个 bookie:
- bookie1:192.168.0.105:3181
- bookie2:192.168.0.105:3183
- bookie3:192.168.0.105:3184

```
12:12:47.574 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3183, IP:192.168.0.105, Port:3183, Hostname:192.168.0.105 
12:12:47.575 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3184, IP:192.168.0.105, Port:3184, Hostname:192.168.0.105
12:12:47.576 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3181, IP:192.168.0.105, Port:3181, Hostname:192.168.0.105 
```
        
  1. 设置 public/default 的消息保留策略。

    注意:如果不设置消息保留策略且 topic 未被订阅,一段时间后,topic 的数据会被自动清理。

```
cd cluster1/broker1

bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
```
  1. 在 public/default 创建 topic1,并写入 100 条数据。

    bin/pulsar-client produce -m 'hello' -n 100 topic1
    

    输出

    结果显示数据写入成功。

    ...
    12:17:40.222 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 100 messages successfully produced
    
  2. 查看 topic1 的信息。
```
bin/pulsar-admin topics stats-internal topic1
```

**输出**

结果显示 ledgerId 5 保存了 topic1 的数据。

```
{
  "entriesAddedCounter" : 100,
  "numberOfEntries" : 100,
  "totalSize" : 5500,
  "currentLedgerEntries" : 100,
  "currentLedgerSize" : 5500,
  "lastLedgerCreatedTimestamp" : "2021-05-11T12:17:38.881+08:00",
  "waitingCursorsCount" : 0,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "5:99",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 5,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : { },
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false
  }
}
```
  1. 查看 ledgerid 5 存储在哪些 bookie 节点上。
```
bin/bookkeeper shell ledgermetadata -ledgerid 5
```

**输出**

结果显示正如前文所配置, ledgerid 5 存储在 bookie1(3181)、bookie2(3183) 和 bookie3(3184)上。

```
...
12:23:17.705 [main] INFO  org.apache.bookkeeper.tools.cli.commands.client.LedgerMetaDataCommand - ledgerID: 5
12:23:17.714 [main] INFO  org.apache.bookkeeper.tools.cli.commands.client.LedgerMetaDataCommand - LedgerMetadata{formatVersion=3, ensembleSize=3, writeQuorumSize=3, ackQuorumSize=2, state=OPEN, digestType=CRC32C, password=base64:, ensembles={0=[192.168.0.105:3184, 192.168.0.105:3181, 192.168.0.105:3183]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC90b3BpYzE=, application=base64:cHVsc2Fy}}
…
```

减少 bookie 节点

以下步骤接着前文「如何增加 bookie 节点」继续操作。

本示例在 cluster1 中减少 2 个 bookie 节点,再向 topic2 写入数据,并查看数据保存在哪些节点。

  1. 允许 1 个 bookie 节点服务。

    更改 cluster1/broker1/conf/broker.conf 文件中以下配置项的值。

    managedLedgerDefaultEnsembleSize=1 // specify the number of bookies to use when creating a ledger
    managedLedgerDefaultWriteQuorum=1 // specify the number of copies to store for each message
    managedLedgerDefaultAckQuorum=1  // specify the number of guaranteed copies (acks to wait before writing is completed) 
    
  2. 重启 broker1,使配置生效。
```
cd cluster1/broker1

bin/pulsar-daemon stop broker

bin/pulsar-daemon start broker
```
  1. 查看 cluster1 的 bookie 信息。

    cd cluster1/bk1
    
    bin/bookkeeper shell listbookies -rw -h
    

    输出

    结果说明当前 cluster1 已启动了 bookie1(3181)、bookie2(3183) 和 bookie3(3184)。

    ...
    15:47:41.370 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - ReadWrite Bookies :
    15:47:41.382 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3183, IP:192.168.0.105, Port:3183, Hostname:192.168.0.105
    15:47:41.383 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3184, IP:192.168.0.105, Port:3184, Hostname:192.168.0.105
    15:47:41.384 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3181, IP:192.168.0.105, Port:3181, Hostname:192.168.0.105
    …
    
  2. 减少 2 个 bookie 节点,即停止 bookie2 和 bookie3。

    提示:更多关于如何减少 bookie 的信息,参阅这里

```
cd cluster1/bk2

bin/bookkeeper shell listunderreplicated

bin/pulsar-daemon stop bookie

bin/bookkeeper shell decommissionbookie
```

**输入**

```
cd cluster1/bk3

bin/bookkeeper shell listunderreplicated

bin/pulsar-daemon stop bookie

bin/bookkeeper shell decommissionbookie
```
  1. 查看 cluster1 的 bookie 信息。
```
cd cluster1/bk1

bin/bookkeeper shell listbookies -rw -h
```

**输出**

结果说明当前 cluster1 仅启动了 bookie1(3181)。

```
...
16:05:28.690 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - ReadWrite Bookies :
16:05:28.700 [main] INFO  org.apache.bookkeeper.tools.cli.commands.bookies.ListBookiesCommand - BookieID:192.168.0.105:3181, IP:192.168.0.105, Port:3181, Hostname:192.168.0.105
...
```
  1. 设置 public/default 的消息保留策略。

    注意:如果不设置消息保留策略且没有订阅,一段时间后,数据会被自动清理。

```
cd cluster1/broker1

bin/pulsar-admin namespaces set-retention -s 100M -t 3d public/default
```
  1. 在 public/default 创建 topic2,并写入 100 条数据。
```
bin/pulsar-client produce -m 'hello' -n 100 topic2
```

**输出**

结果显示数据写入成功。

```
...
16:06:59.448 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 100 messages successfully produced
```
  1. 查看 topic2 的信息。
```
bin/pulsar-admin topics stats-internal topic2
```

**输出**

结果显示 ledgerId 7 保存了 topic 2 的数据。

```
{
  "entriesAddedCounter" : 100,
  "numberOfEntries" : 100,
  "totalSize" : 5400,
  "currentLedgerEntries" : 100,
  "currentLedgerSize" : 5400,
  "lastLedgerCreatedTimestamp" : "2021-05-11T16:06:59.058+08:00",
  "waitingCursorsCount" : 0,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "7:99",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 7,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : { },
  "compactedLedger" : {
    "ledgerId" : -1,
    "entries" : -1,
    "size" : -1,
    "offloaded" : false
  }
}
```
  1. 查看 ledgerid 7 存储在哪些 bookie 节点上。
```
bin/bookkeeper shell ledgermetadata -ledgerid 7
```

**输出**

结果显示 ledgerid 7 存储在 bookie1(3181)上。

```
...
16:11:28.843 [main] INFO  org.apache.bookkeeper.tools.cli.commands.client.LedgerMetaDataCommand - ledgerID: 7
16:11:28.846 [main] INFO  org.apache.bookkeeper.tools.cli.commands.client.LedgerMetaDataCommand - LedgerMetadata{formatVersion=3, ensembleSize=1, writeQuorumSize=1, ackQuorumSize=1, state=OPEN, digestType=CRC32C, password=base64:, ensembles={0=[192.168.0.105:3181]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:cHVibGljL2RlZmF1bHQvcGVyc2lzdGVudC90b3BpYzM=, application=base64:cHVsc2Fy}}
...
```

总结

本文是「Pulsar 隔离策略系列」的第 2 篇,讲解了如何在「多个 Pulsar 集群」的环境中验证数据隔离、同步集群数据和扩缩容节点。

本系列的下一篇博客将详细分析如何在「共享 BookKeeper 集群」的环境中玩转 Pulsar 隔离策略,同样也会是「Pulsar 傻瓜手册」喔!敬请期待!

更多信息

如果你对「Pulsar 隔离策略系列」感兴趣,欢迎查阅更多相关资源:

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私