本文翻译自 StreamNative 博客《Streaming Real-Time Chat Messages into Scylla with Apache Pulsar》,作者 Tim Spann。 译者简介:何城波,就职于深信服 PaaS 平台部门,从事 Kafka、Pulsar 等消息中间件基础设施建设。
在这篇博客中,我们将逐步演示如何使用各种 OSS 库、Schema、语言、框架和工具利用 ScyllaDB 构建实时消息流应用程序。我们还将介绍 MQTT、Web Sockets、Java、Golang、Python、NodeJS、Apache NiFi、Kafka on Pulsar、Pulsar 协议等方面的相关内容,为大家介绍如何快速将应用部署到生产云集群,并使用 Apache Pulsar 和 Scylla 集成构建快应用。
在我们讨论如何进行集成之前,先看看为什么这样的集成可以用来快速构建应用程序。Scylla 是一个超快、低延时、高吞吐量的开源 NoSQL 平台,它和 Cassandra 完全兼容。使用与 Scylla 兼容的 Pulsar IO Sink 向 Scylla 表写入数据无需任何复杂或者专门的代码,并且 Sink 使用一个指向 Pulsar 主题的简单配置文件可以轻松将数据写入 Scylla,该配置将 Pulsar 主题内所有的事件直接流向 Scylla 表。
在创建聊天应用程序之前,我们知道该应用程序在有人填写 Web 表单时会将消息发布到事件总线。消息发布之后,将对其中的“comments”字段进行情感分析,并将分析的结果输出到下游主题。
事件驱动的程序,比如我们的聊天应用程序,使用消息总线在松耦合、相互协作的服务之间进行通信,不同服务通过异步交换消息来相互通信。在微服务领域,这些消息通常被称为事件。
消息总线从生产者接收消息、过滤事件,并且将事件推送给消费者,而无需将事件绑定到单独的服务。其他服务可以利用订阅事件总线来接收这些事件并进行处理(消费者)。
Apache Pulsar 是一个云原生分布式消息传递和事件流平台,它扮演着消息总线的角色。Pulsar 通过不同的订阅类型和消费模式支持常见的消息传递范式。
Pulsar 支持 IO 连接器,它是我们集成所必需的功能。Pulsar IO 连接器支持使用简单的配置文件、基本的 CLI 工具和 REST API 创建、部署和管理连接器。我们将利用 Pulsar IO 连接器将数据从 Pulsar 主题输出到 Scylla DB。
首先,下载 Cassandra 连接器并将其部署到 Pulsar 集群,过程详情可以查看 Pulsar IO Cassandra Sink connector information 文档。
然后,将 pulsar-io-cassandra-X.nar 下载到连接器目录。Scylla DB 完全兼容 Cassandra,所以我们可以用这个连接器向它发送消息。
当使用 Pulsar IO 连接器(如演示中使用的 Scylla DB)时,你可以在 YAML 文件中指定配置细节,如下所示。
configs:
roots: "172.17.0.2:9042"
keyspace: "pulsar_test_keyspace"
columnFamily: "pulsar_test_table"
keyname: "key"
columnName: "col"
以上主要配置以 YAML 格式显示,定义了含端口的 root 服务器地址(roots)、键空间(keyspace)、列族(columnFamily)、键名(keyname)和列名(columnName)。
首先,我们需要创建一个用来消费的主题。
bin/pulsar-admin topics create persistent://public/default/chatresult2
部署连接器时,通过命令行调用传入这些配置属性,如下所示。
bin/pulsar-admin sinks create --tenant public --namespace default --name "scylla-test-sink" --sink-type cassandra --sink-config-file conf/scylla.yml --inputs chatresult2
对于新数据,创建键空间、表和索引,或使用现有的键空间、表和索引资源。
CREATE KEYSPACE pulsar_test_keyspace with replication = {‘class’:’SimpleStrategy’, ‘replication_factor’:1};
CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
CREATE INDEX on pulsar_test_table(col);
在之前的小节中,我们讨论了为什么 Apache Pulsar 非常适合事件驱动的应用程序。本节将介绍 Pulsar Functions—轻量级的无服务器计算框架(类似于 AWS Lambda)。我们将利用一个 Pulsar Function 部署我们的 ML 模型来转换或处理 Pulsar 中的消息。下图为聊天应用程序示例。
请记住:Pulsar Functions 支持用户与开发者灵活地使用 Java、Python 或 Go 来实现处理逻辑,便于为情感分析算法使用替代的语言库。
下面的代码演示了一个 Pulsar Function 如何对事件流进行情绪分析(该函数对每个事件都运行一次)。
from pulsar import Function
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import json
class Chat(Function):
def __init__(self):
pass
def process(self, input, context):
logger = context.get_logger()
logger.info("Message Content: {0}".format(input))
msg_id = context.get_message_id()
fields = json.loads(input)
sid = SentimentIntensityAnalyzer()
ss = sid.polarity_scores(fields["comment"])
logger.info("Polarity: {0}".format(ss['compound']))
sentimentVal = 'Neutral'
if ss['compound'] == 0.00:
sentimentVal = 'Neutral'
elif ss['compound'] < 0.00:
sentimentVal = 'Negative'
else:
sentimentVal = 'Positive'
row = { }
row['id'] = str(msg_id)
row['sentiment'] = str(sentimentVal)
row['userInfo'] = str(fields["userInfo"])
row['comment'] = str(fields["comment"])
row['contactInfo'] = str(fields["contactInfo"])
json_string = json.dumps(row)
return json_string
本示例使用 Vader Sentiment NLP ML Library 来分析用户对评论的情绪,用情绪丰富输入内容,并将其以 JSON 格式写入输出主题。
作为参考,可以使用 Pulsar 上下文进行日志记录,还可以将数据值推送到状态存储或记录一些指标。在本文中,我们只做一些日志记录。
部署脚本如下所示,你可以在 GitHub 目录中找到所有的选项和工具,必须确保在所有节点上都安装了 NLP 库。
bin/pulsar-admin functions create --auto-ack true
--py pulsar-pychat-function/src/sentiment.py --classname "sentiment.Chat" --inputs "persistent://public/default/chat2" --log-topic "persistent://public/default/chatlog2" --name Chat --namespace default --output "persistent://public/default/chatresult2" --tenant public
pip3 install vaderSentiment
现在我们已经构建了主题、Function 和 Sink,接下来构建应用程序。完整的网页参见 GitHub 目录,此处仅列举关键部分。对于这个单页应用程序(SPA),我使用了 JQuery 和数据表单,这些数据表单包含在他们公开的 CDN 中。
<form action="/datatable.html" method="post" enctype="multipart/form-data" id="form-id">
<div id="demo" name="demo"></demo>
<p><label>User: </label><input name="user" type="text" id="user-id" size="75" value="" maxlength="100"/></p>
<p><label>Question: </label><input type="text" name="other-field" type="text" id="other-field-id" size="75" maxlength="200" value=""/></p>
<p><label>Contact Info: </label><input name="contactinfo" type="text" id="contactinfo-id" size="75" maxlength="100" value=""/></p>
<p><input type="button" value="Send to Pulsar" onClick="loadDoc()" /></p>
</form>
在上面的 HTML 表单中,我们让用户在聊天中添加评论。
现在我们通过 WebSockets,使用 JavaScript 将表单数据以 JSON 格式发送到 Pulsar 主题。WebSockets 是 Apache Pulsar 支持的协议,其 URL 是 ws://pulsar1:8080/ws/v2/producer/persistent/public/default/chat2
。
其中 ws 是协议,pulsar1 是 Pulsar 服务器,8080 是 REST 端口,producer 是我们正在运行的生产者,persistent 是主题类型,public 是租户,default 是命名空间,chat2 是主题:我们填充一个对象并将其转换为 JSON 字符串,并将该有效负载编码为 Base64 编码的 ASCII 字符串。然后,我们将该编码字符串作为负载添加到一个新的 JSON 字符串中,该字符串包括 Pulsar 消息的负载、属性和上下文。WebSocket 协议需要这种格式才能在 Pulsar 主题中转换为常规消息。
<script>
function loadDoc() {
var xhttp = new XMLHttpRequest();
xhttp.onreadystatechange = function() {
if (this.readyState == 4 && this.status == 200) {
document.getElementById("demo").innerHTML = '';
}
};
var wsUri = "ws://pulsar1:8080/ws/v2/producer/persistent/public/default/chat2";
websocket = new WebSocket(wsUri);
const pulsarObject = {
userInfo: document.getElementById('user-id').value.substring(0,200),
contactInfo: document.getElementById('contactinfo-id').value.substring(0,200),
comment: document.getElementById('other-field-id').value.substring(0, 200)};
const jsonStr = JSON.stringify(pulsarObject);
var payloadStr = btoa(jsonStr);
const propertiesObject = {key: Date.now() }
var data = JSON.stringify({ "payload": payloadStr, "properties": propertiesObject, "context": "cs" });
websocket.onopen = function(evt) {
if (websocket.readyState === WebSocket.OPEN) {
websocket.send(data);
}
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {}
websocket.onclose = function(evt) {
if (evt.wasClean) { console.log(evt);
} else { console.log('[close] Connection died');
}
};
}
var form = document.getElementById('form-id');
form.onsubmit = function() {
var formData = new FormData(form);
var action = form.getAttribute('action');
loadDoc();
return false;
}
</script>
在上面的代码中,我们从表单中获取字段的值,停止表单重新加载页面,然后将数据发送到 Pulsar。
现在,我们可以消费发送到 Sentiment Pulsar Function 结果主题中的消息。我们从如下代码中的 Pulsar 主题中消费:
ws://pulsar1:8080/ws/v2/consumer/persistent/public/default/chatresult2/chatrreader?subscriptionType=Shared&receiverQueueSize=500.
通过这个 URI 可以看出它与生产者 URI 有些不同,它额外包含了 receiverQueueSize
、消费者标签和共享订阅类型。
JavaScript:
$(document).ready(function() {
var t = $('#example').DataTable();
var wsUri = "ws://pulsar1:8080/ws/v2/consumer/persistent/public/default/chatresult2/chatrreader?subscriptionType=Shared&receiverQueueSize=500";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
console.log('open');
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {
var dataPoints = JSON.parse(evt.data);
if ( dataPoints === undefined || dataPoints == null || dataPoints.payload === undefined || dataPoints.payload == null ) {
return;
}
if (IsJsonString(atob(dataPoints.payload))) {
var pulsarMessage = JSON.parse(atob(dataPoints.payload));
if ( pulsarMessage === undefined || pulsarMessage == null ) {
return;
}
var sentiment = "";
if ( !isEmpty(pulsarMessage.sentiment) ) {
sentiment = pulsarMessage.sentiment;
}
var publishTime = "";
if ( !isEmpty(dataPoints.publishTime) ) {
publishTime = dataPoints.publishTime;
}
var comment = "";
if ( !isEmpty(pulsarMessage.comment) ) {
comment = pulsarMessage.comment;
}
var userInfo= "";
if ( !isEmpty(pulsarMessage.userInfo) ) {
userInfo = pulsarMessage.userInfo;
}
var contactInfo= "";
if ( !isEmpty(pulsarMessage.contactInfo) ) {
contactInfo = pulsarMessage.contactInfo;
}
t.row.add( [ sentiment, publishTime, comment, userInfo, contactInfo ] ).draw(true);
}
};
} );
对于在 JavaScript WebSockets 中使用的消息,我们必须对负载进行 Base64 解码,并将 JSON 解析为一个对象,然后使用 DataTable 的 row.Add 方法将这些新表行添加到结果中。每当收到消息时都会发生这种情况。
在本博客中,我们介绍了如何在不用考虑数据源的情况下,使用 Apache Pulsar 构建简单的流应用程序。本文选择将兼容 Scylla 的 Sink 添加到聊天应用程序中;然而,我们也可以在 Apache Pulsar 的任何数据存储执行此操作。