文章转自公众号:腾讯云中间件,作者:王俊飞
如果 producer 端要发送 POJO 类型的数据,则 Pulsar 需要一套序列化和反序列化工具,先将对象转化为字节数据再发送出去,下面为有无 schema 的两种情况:
若在不指定 schema 的情况下创建 producer,则 producer 只能发送字节数组类型的消息。在有 POJO 类数据要发送时,需要在发送消息前将 POJO 序列化为字节。
代码示例:
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
User user = new User(“Bill”, 40);
byte[] message = … // serialize the `user` by yourself;
producer.send(message);
若在指定 schema 的情况下创建 producer,则 producer 可以直接将类发送到 topic,无需考虑如何将 POJO 序列化为字节。
代码示例:
Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
.topic(topic)
.create();
User user = new User(“Bill”, 40);
producer.send(user);
此外,在上述 producer 发送数据、consumer 接收数据的流程中,还需考虑以下情况:
在这些情况下,为保证生产-消费模式的正常运行,所有 producer 与其相对应的 consumer 都需要进行相同的变化,若引入 schema 机制,可以简化上述操作。
Pulsar Schema 包含:
Pulsar Schema 支持的类型可分为 Primitive type
和 Complex type
Primitive type 包含的类型有 :
Primitive type | 描述 |
---|---|
BOOLEAN | 1 比特二进制数值 |
INT8 | 8 位有符号整数 |
INT16 | 16 位有符号整数 |
INT32 | 32 位有符号整数 |
INT64 | 64 位有符号整数 |
FLOATE | 单精度浮点数 |
DOUBLE | 双精度浮点数 |
BYTES | 字节序列 |
STRING | Unicode 字符集序列 |
TIMESTAMP(DATE, TIME) | 时间戳,保存形式为 64 位有符号整数 |
INSTANCE(2.7 版本新增) | 精度为纳秒的瞬时时间 |
LOCAL_DATE(2.7 版本新增) | 本地时间,格式为:yyyy-mm-dd |
LOCAL_TIME(2.7 版本新增) | 本地时间,格式为:hh-mm-ss |
LOCAL_DATE_TIME(2.7 版本新增) | 本地时间,格式为:yyyy-mm-dd : hh-mm-ss |
Complex type 目前支持的类型有:
该模式下,Pulsar 将键和值的 schemaInfo 存储在一起
Pulsar 提供以下两种编码方式:
下面是使用 INLINE 编码类型构造 key/value schema:
Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.INLINE
);
Pulsar 提供以下三种方式使用 Struct:
如果我们已知要发送消息的数据类型,可以使用 static schema
, 如下所示。
要发送的类为 User
,结构如下:
public class User {
String name;
int age;
}
使用 struct schema
创建生产者发送消息:
Producer producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage()
.value(User.builder().userName("Pulsar-user")
.userId(1L)
.build())
.send();
使用 struct schema
创建消费者接收消息:
Consumer consumer = client.newConsumer(Schema.AVRO(User.class)).create();
User user = consumer.receive();
如果我们不知道要发送消息的数据类型,可以使用 GenericSchemaBuilder
定义 struct schema,如下所示。
使用 RecordSchemaBuilder
构建一个 schema:
RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
recordSchemaBuilder.field("intField").type(SchemaType.INT32);
SchemaInfo schemaInfo =recordSchemaBuilder.build(SchemaType.AVRO);
Producer producer =client.newProducer(Schema.generic(schemaInfo))
.create();
使用 RecordSchemaBuilder
构建一个 struct schema:
producer.newMessage().value(schema.newRecordBuilder()
.set("intField", 32)
.build()).send();
可以通过 SchemaDefinition
生成一个 struct schema,示例如下。
要发送的类为 User
,结构如下:
public class User {
String name;
int age;
}
使用 Schema Definition
生成一个 producer 并发送消息:
SchemaDefinition<User> schemaDefinition =
SchemaDefinition.builder()
.withPojo(User.class)
.build();
Producer<User> producer = client.newProducer(schemaDefinition)
.create();
producer.newMessage()
.value(User.builder()
.userName("Pulsar-user")
.userId(1L).build())
.send();
使用 SchemaDefinition
生成一个 consumer 并发送消息:
SchemaDefinition<User> schemaDefinition = SchemaDefinition.builder()
.withPojo(User.class)
.build();
Consumer<User> consumer = client.newConsumer(schemaDefinition)
.subscribe();
User user = consumer.receive();
SchemaInfo 是定义 schema 的一种数据结构,它包含以下字段:
示例如下:
{
"name": "test-string-schema",
"type": "STRING",
"schema": "",
"properties": {}
}
在生产者端:
在消费者端:
Org. apache. Pulsar.common. schema
的 SchemaInfo With Version 有两个字段:long 类型的 version 和 SchemaInfo 类型的 schemaInfo。
Topic 下注册的 schema 会带有一个版本号,若版本号发生变化,需在原有版本号基础上 +1。Producer 发送带有 schemaInfo 的消息会附加一个版本号,所以当该消息被 consumer 消费时,客户端可以通过该版本号来获取对应的 schemaInfo,然后根据该 schemaInfo 对消息反序列化。
如果遇到业务发生变化的场景时,我们也许需要更新一下 schema,这种更新被称为 schema evolution,很显然,如果 schema 发生了更改,下游的 consumer 会受到影响,所以 schema evolution 应该能保证下游 consumer 能无缝处理旧版本和新版本的数据,这部分机制被称为 schema compatibility,该部分将在下一小节详细介绍。
以下为 schema evolution 的流程:
上小节介绍了 schema evolution,本小节将介绍 schema compatibility。Pulsar 有 8 种 schema 兼容性检查策略,如下表所示:
假设一个 topic 有三个 schema(V1, V2, V3),V1 是最早版本,V3 是最新版本。
兼容性检查策略名称 | 定义 | 是否允许更改 | 检查Schema | 优先级 |
---|---|---|---|---|
ALWAYS_COMPATIBLE | 总是兼容(禁止兼容性检查) | 允许所有更改 | 所有版本 | Any order |
ALWAYS_INCOMPATIBLE | 总是不兼容(禁止Schema Evolution) | 禁止所有更改 | 无 | 无 |
BACKWARD | 使用 schema v3的消费者可以处理使用 schema v2 或 v3 的生产者编写的数据 | - 添加可选字段 - 删除字段 | 最新版本 | Consumer |
BACKWARD_TRANSITIVE | 使用 schema v3的消费者可以处理使用 schema v1、v2 或 v3 的生产者编写的数据 | - 添加可选字段 - 删除字段 | 所有版本 | Consumer |
FORWARD | 使用 schema v2 或 v3 的消费者可以处理使用 schema v3 的生产者编写的数据 | - 添加字段 - 删除可选字段 | 最新版本 | Producer |
FORWARD_TEANSITIVE | 使用 schema v1、v2 或 v3 的消费者可以处理使用 schema v3的生产者编写的数据 | - 添加字段 - 删除可选字段 | 所有版本 | Producer |
FULL(默认策略) | 使用 schema v2 或 v3 的消费者可以处理使用 schema v2 或 v3的生产者编写的数据 | 修改可选字段 | 最新版本 | Any order |
FULL_TRANSITIVE | 使用 schema v1、v2 或 v3 的消费者可以处理使用 schema v1、v2 或 v3 的生产者编写的数据 | 修改可选字段 | 所有版本 | Any order |
如果不知道 topic 的模式类型,可以使用 Auto Schema
来生成,Auto Schema
有以下两种类型:
假设以下情况:
基于上面情况,可以使用 AUTO_PRODUCE
验证 K 生成的字节是否可以发送到 P
Produce<byte[]> PulsarProducer =
client.newProducer(Schema.AUTO_PRODUCE())
…
.create();
byte[] kafkaMessageBytes = … ;
PulsarProducer.produce(kafkaMessageBytes);
假设以下情况:
基于上面情况,可以使用 AUTO-CONSUME
验证 P 生成的字节是否可以发送到 MySQL
Consumer<GenericRecord> PulsarConsumer =
client.newConsumer(Schema.AUTO_CONSUME())
…
.subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();
如果 schema 通过了 schema 兼容性检测,则 producer 将自己的 schema 版本与 topic schema 版本同步
对于生产者,AutoUpdate
的流程如下:
对于消费者,AutoUpdate
的流程如下: