本文整理自 2022 年 8 月 Apache Pulsar Meetup 上傅腾题为《Apache Pulsar 企业级安全实践》的分享。数据安全已经成为企业的一项重要竞争优势。傅腾针对集群环境分享 Apache Pulsar 的认证和授权实现,讲述企业如何构建安全的 Apache Pulsar 集群并打造满足个性化要求的 Pulsar 认证或授权插件,以满足不同的企业安全需求。
Apache Pulsar 提供了多种安全组合,包括认证(支持全链路组件认证)、授权(ACL)、传输加密(TLS 和 mTLS)以及端到端加密(仅生产者和消费者可加解密数据),企业可根据实际环境的安全需求来搭配使用。
上图为全链路可信场景。该场景通常见于开放集群或内部测试、功能验证等。该场景中 Broker、Bookie 均开放,生产者明文写入数据。
上面两图皆为内网可信场景。其中,第一种设计会做传输层加密和 Broker 认证授权,内网依旧开放。第二种设计将传输层加密放在外部与 Proxy 之间,Proxy 也要做认证。
上图为内网可信场景中的第三种设计,即常见的负载均衡场景,外部到负载均衡器做传输层加密,后者将数据交给需要认证的 Proxy,再流向 Broker。
在内网不可信场景中,内部节点可能因为各种原因有安全隐患,这就需要在每一传输层都开启加密和认证,在 Broker 开启授权。该场景常见于企业跨部门协作、与子公司协作、与外部沟通等情况。
服务不可信场景一般是指第三方(如云厂商)提供 Pulsar 服务,为此所有链路都要端到端加密,且只有生产者和消费者才能获取原始数据。
Apache Pulsar 有着简洁、非常容易扩展的安全框架。企业可方便地自定义安全插件,包括可插拔式的认证和授权、易集成的代码结构、同时支持多种认证的认证链、认证缓存和检测机制。
对于可插拔式的认证和授权:
Pulsar 服务端(如 Broker、Proxy)会对客户端做身份认证,并记录一个 role 作为后续授权的客户端身份标识。该 role 可视为用户实际登录的 token。
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken,org.apache.pulsar.broker.authentication.AuthenticationProviderBasic
Pulsar 支持两种鉴权插件
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthentizationProviderToken
authorizationProvider=org.apache.pulsar.broker.authorization.MultiRolesTokenAuthorizationProvider
认证的触发时机发生在客户端建立连接时:
Pulsar 的权限体系中,支持的鉴权级别分别是:
插件实现有几点需要注意:initialize() 可以获取 Broker 级别的配置信息和管理元数据;每次鉴权需要先检查传进来的用户是否是 superuser 和 tenant admin,再检查具体的权限。还要注意,只有 superuser 或者 tenant admin 才能为用户赋权。
JSON Web Token(RFC-7519)是 Web 服务中常用的一种认证方案,简称 JWT 认证。JWT 基于一个 token 字符串来识别用户身份。该 token 有着严格的三层结构:
需要注意前两层是透明的,可以反解出来。
基于 JWT 的特性,在 Pulsar 中使用需要注意几点:
6.注意 token 参数的配置既可使用字符串形式,也可使用文件形式,可择优选择。
字符串形式
brokerClientAuthenticationParameters={"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.9OHgE9ZUDeBTZs7nSMEFIuGNEX18FLR3qvy8mqxSxXw"}
从文件读取
brokerClientAuthenticationParameters={"file":"///path/to/proxy-token.txt"}
7.使用 bin/pulsar tokens show 命令可以查看 token 的 header 和 payload:
bin/pulsar tokens show -i eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIiLCJleHAiOjE2NTY3NzYwOTh9.awbp6DreQwUyV8UCkYyOGXCFbfo4ZoV-dofXYTnFXO8
{"alg":"HS256"}
---
{"sub":"test-user","exp":1656776098}
8.使用 bin/pulsar tokens validate 命令可以用 secret key 或者 public key 验证 token:
bin/pulsar tokens validate -pk my-public.key -i "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.ijp-Qw4JDn1aOQbYy4g4YGBbXYIgLA9lCVrnP-heEtPCdDq11_c-9pQdQwc6RdphvlSfoj50qwL5OtmFPysDuF2caSYzSV1kWRWN-tFzrt-04_LRN-vlgb6D06aWubVFJQBC4DyS-INrYqbXETuxpO4PI9lB6lLXo6px-SD5YJzQmcYwi2hmQedEWszlGPDYi_hDG9SeDYmnMpXTtPU3BcjaDcg9fO6PlHdbnLwq2MfByeIj-VS6EVhKUdaG4kU2EJf5uq2591JJAL5HHiuTZRSFD6YbRXuYqQriw4RtnYWSvSeVMMbcL-JzcSJblNbMmIOdiez43MPYFPTB7TMr8g"
{sub=admin}
Kerberos 是大数据领域常用的认证方案,以简单易用和稳定性著称。Pulsar 使用 Java 的 JAAS 机制来支持通过 Kerberos 做身份认证。JAAS 中一个用户信息对应一个 section 。对于 Kerberos 认证而言,一个用户信息中最重要的是 principle 和 keytab,现在可以方便地封装到一个 section 里,最后将这些用户信息拼装到 jaas.conf 文件中。
例如:
SectionName {
com.sun.security.auth.module.Krb5LoginModule required
usekeyTab=true
storekey=true
useTicketCache=false
keyTab="/etc/security/keytabs/pulsarbroker.keytab"
principal="broker/localhost@EXAMPLE.COM".
};
AnotherSectionName{
...
};
SectionName 是指定了用户名标识,内部封装了一个 Kerberos 用户信息。Pulsar 使用 Kerberos 认证时,从配置上而言,就是告知进程该以哪个 section 的身份来启动程序。以 broker 为例:
使用 Kerberos 做认证时有几点需要注意:
Broker 的 Principal 命名格式示例:
#正确示范 broker/host1@MY.REALM
#错误示例 broker@MY.REALM pulsarbroker/host1@MY.REALM
broker
、proxy
(其他会报 warn)。REALM 域名建议使用大写,注意不要复制错。
这里以一个鉴权插件为例介绍可插拔的认证和授权实践。案例背景是我们需要界面化的鉴权过程,为此选用大数据领域常用的鉴权组件 Ranger 进行界面化权限管理。具体的开发流程是:
编写 Ranger 服务定义文件时分为几步:
Pulsar 中的鉴权接口代码如下:
@Override
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
String role,
TopicOperation operation,
AuthenticationDataSourceauthData) {
if (log.isDebugEnabled ()) {
log.debug ("'Check allowTopicOperationAsync ({}] on (t}].", operation.name (), topicName);
}
return validateTenantAdminAccess(topicName.getTenant(),role,authData)
.thenCompose(isSuperUserOrAdmin -> {
if (log. isDebugEnabled()) {
log.debug ("Verify if role (} is allowed to (} to topic {}: isSuperUserOrAdmin=(}",
role, operation, topicName, isSuperUserOrAdmin);
}
if (isSuperUserOrAdmin){
return CompletableFuture.completedFuture(true);
} else {
switch (operation) {
case LOOKUP:
case GET STATS:
case GET METADATA:
return canLookupAsync(topicName, role, authData);
case PRODUCE:
return canProduceAsync(topicName, role, authData);
case GET SUBSCRIPTIONS:
case CONSUME:
case SUBSCRIBE:
case UNSUBSCRIBE:
case SKIP:
case EXPIRE MESSAGES:
case PEEK MESSAGES:
case RESET_CURSOR:
case GET_BACKLOG_SIZE:
case SET_REPLICATED_SUBSCRIPTION_STATUS:
case GET_REPLICATED_SUBSCRIPTION_STATUS:
return canConsumeAsync(topicName, role, authData, authData.getSubscription());
case TERMINATE:
case COMPACT:
case OFFLOAD:
case UNLOAD:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
return CompletableFuture.completedFuture(false);
default:
return FutureUtil.failedFuture(new IllegalStateException(
"TopicOperation [" + operation.name() + "] is not supported.')) :
}
}
});
}
Ranger 方法的代码如下:
@Override
public CompletableFuture<Boolean> canProduceAsync (TopicName topicName, String role,
AuthenticationDataSource authenticationData) {
CompletableFuture<Boolean> future = new CompletableFuture@();
RangerAccessResourceImpl resource = new RangerAccessResourceImpl();
resource.setValue(KEY_TENANT, topicName.getTenant());
resource.setValue(KEY_NAMESPACE, topicName.getNamespacePortion());
resource.setValue(KEY_TOPIC, topicName.getLocalName().split("-partition-") [0]);
//resource.setValue(KEY_TAG, "*");
RangerAccessRequestImplrequest = new RangerAccessRequestImpl();
request.setAccessType(AuthAction.produce.name());
request.setUser(role);
request-setResource(resource);
request.setAction (AuthAction.produce.name () ) ;
try {
RangerAccessResult result = rangerPlugin. isAccessAllowed (request);
loq-info("request--->{}", request);
log.info("result--->{}", result);
if (result.getIsAllowed()) {
future. complete (true);
} else {
String errMsg = String
.format ("User '%s' doesn't have produce access to %s, matched policy id = %d",
request.getUser(), topicName.toString(), result.getPolicyId());
log-error(errMsg);
future.completeExceptionally (new Exception(errMsg));
}
} catch (Exception e) {
//access allowed in abnormal situation
log.error("User (] encounter exception in (} produce authorization step."
request.getUser(), topicName. toString(), e);
future. complete (true);
}
return future;
}
最终完成效果如下:
Q:JWT 认证读取本地 token 文件时是否实时?是否需要重启 Proxy 或 Broker?
A:Client 是被动读取 token 文件的。客户端 token 文件变更不影响服务端,即不需要重启。
Q:生产者是否支持在 HTTP 发送消息过程中在连接时认证?
A:Pulsar 使用 Pulsar 协议发送消息,在第一次连接的时候发起认证。对于管理侧如 pulsar-admin 使用 HTTP 协议,而 HTTP 协议每次连接都会发起认证。
Q:对外鉴权时,是否只需配置 Proxy 而不管 Broker?
A:鉴权是由 Broker 来做的,因此需要配置 Broker。
Q:请具体介绍使用 JWT 时 Broker 对客户端认证信息的缓存策略。
A:有线程专门负责周期性检测所缓存的客户端认证是否过期。如果发现过期,就发 auth challenge 命令给客户端,客户端接收到 auth challenge 命令后会读取 token 文件发送给 Broker;如果检测到更新后的 token 文件,认证就能继续通过,Broker 端重新缓存认证信息。如果客户端在下次检查周期内没有返回认证数据,就会关闭连接。
傅腾,StreamNative 技术支持工程师,有着 9 年电信运营商行业大数据相关经验,熟悉大数据的平台架构、建设、安全和维护,对实时数仓、云原生和 AI 具有平台类产品建设经验。