sharetwitterlinkedIn

通过 Apache Pulsar + ScyllaDB 构建实时聊天消息流

May 20, 2022
head img

本文翻译自 StreamNative 博客《Streaming Real-Time Chat Messages into Scylla with Apache Pulsar》,作者 Tim Spann。 译者简介:何城波,就职于深信服 PaaS 平台部门,从事 Kafka、Pulsar 等消息中间件基础设施建设。

在这篇博客中,我们将逐步演示如何使用各种 OSS 库、Schema、语言、框架和工具利用 ScyllaDB 构建实时消息流应用程序。我们还将介绍 MQTT、Web Sockets、Java、Golang、Python、NodeJS、Apache NiFi、Kafka on Pulsar、Pulsar 协议等方面的相关内容,为大家介绍如何快速将应用部署到生产云集群,并使用 Apache Pulsar 和 Scylla 集成构建快应用。

在我们讨论如何进行集成之前,先看看为什么这样的集成可以用来快速构建应用程序。Scylla 是一个超快、低延时、高吞吐量的开源 NoSQL 平台,它和 Cassandra 完全兼容。使用与 Scylla 兼容的 Pulsar IO Sink 向 Scylla 表写入数据无需任何复杂或者专门的代码,并且 Sink 使用一个指向 Pulsar 主题的简单配置文件可以轻松将数据写入 Scylla,该配置将 Pulsar 主题内所有的事件直接流向 Scylla 表。

为什么选择 Apache Pulsar 构建基于流事件的应用程序

在创建聊天应用程序之前,我们知道该应用程序在有人填写 Web 表单时会将消息发布到事件总线。消息发布之后,将对其中的“comments”字段进行情感分析,并将分析的结果输出到下游主题。

事件驱动的程序,比如我们的聊天应用程序,使用消息总线在松耦合、相互协作的服务之间进行通信,不同服务通过异步交换消息来相互通信。在微服务领域,这些消息通常被称为事件。

消息总线从生产者接收消息、过滤事件,并且将事件推送给消费者,而无需将事件绑定到单独的服务。其他服务可以利用订阅事件总线来接收这些事件并进行处理(消费者)。

Apache Pulsar 是一个云原生分布式消息传递和事件流平台,它扮演着消息总线的角色。Pulsar 通过不同的订阅类型和消费模式支持常见的消息传递范式。

Pulsar 支持 IO 连接器,它是我们集成所必需的功能。Pulsar IO 连接器支持使用简单的配置文件、基本的 CLI 工具和 REST API 创建、部署和管理连接器。我们将利用 Pulsar IO 连接器将数据从 Pulsar 主题输出到 Scylla DB。

Scylla DB 的 Pulsar IO 连接器

首先,下载 Cassandra 连接器并将其部署到 Pulsar 集群,过程详情可以查看 Pulsar IO Cassandra Sink connector information 文档。

然后,将 pulsar-io-cassandra-X.nar 下载到连接器目录。Scylla DB 完全兼容 Cassandra,所以我们可以用这个连接器向它发送消息。

当使用 Pulsar IO 连接器(如演示中使用的 Scylla DB)时,你可以在 YAML 文件中指定配置细节,如下所示。

configs:
    roots: "172.17.0.2:9042"
    keyspace: "pulsar_test_keyspace"
    columnFamily: "pulsar_test_table"
    keyname: "key"
    columnName: "col"

以上主要配置以 YAML 格式显示,定义了含端口的 root 服务器地址(roots)、键空间(keyspace)、列族(columnFamily)、键名(keyname)和列名(columnName)。

首先,我们需要创建一个用来消费的主题。

bin/pulsar-admin topics create persistent://public/default/chatresult2

部署连接器时,通过命令行调用传入这些配置属性,如下所示。

bin/pulsar-admin sinks create --tenant public --namespace default --name "scylla-test-sink" --sink-type cassandra --sink-config-file conf/scylla.yml --inputs chatresult2

对于新数据,创建键空间、表和索引,或使用现有的键空间、表和索引资源。

CREATE KEYSPACE pulsar_test_keyspace with replication = {‘class’:’SimpleStrategy’, ‘replication_factor’:1};
CREATE TABLE pulsar_test_table (key text PRIMARY KEY, col text);
CREATE INDEX on pulsar_test_table(col);

使用 Pulsar Function 添加 ML 功能

在之前的小节中,我们讨论了为什么 Apache Pulsar 非常适合事件驱动的应用程序。本节将介绍 Pulsar Functions—轻量级的无服务器计算框架(类似于 AWS Lambda)。我们将利用一个 Pulsar Function 部署我们的 ML 模型来转换或处理 Pulsar 中的消息。下图为聊天应用程序示例。

请记住:Pulsar Functions 支持用户与开发者灵活地使用 Java、Python 或 Go 来实现处理逻辑,便于为情感分析算法使用替代的语言库。

下面的代码演示了一个 Pulsar Function 如何对事件流进行情绪分析(该函数对每个事件都运行一次)。

from pulsar import Function
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import json

class Chat(Function):
    def __init__(self):
        pass

    def process(self, input, context):
        logger = context.get_logger()
        logger.info("Message Content: {0}".format(input))
        msg_id = context.get_message_id()

        fields = json.loads(input)
        sid = SentimentIntensityAnalyzer()
        ss = sid.polarity_scores(fields["comment"])
        logger.info("Polarity: {0}".format(ss['compound']))
        sentimentVal = 'Neutral'
        if ss['compound'] == 0.00:
            sentimentVal = 'Neutral'
        elif ss['compound'] < 0.00:
            sentimentVal = 'Negative'
        else:
            sentimentVal = 'Positive'
        row = { }

        row['id'] = str(msg_id)
        row['sentiment'] = str(sentimentVal)
        row['userInfo'] = str(fields["userInfo"])
        row['comment'] = str(fields["comment"])
        row['contactInfo'] = str(fields["contactInfo"])
        json_string = json.dumps(row)
        return json_string

本示例使用 Vader Sentiment NLP ML Library 来分析用户对评论的情绪,用情绪丰富输入内容,并将其以 JSON 格式写入输出主题。

作为参考,可以使用 Pulsar 上下文进行日志记录,还可以将数据值推送到状态存储或记录一些指标。在本文中,我们只做一些日志记录。

部署 Function

部署脚本如下所示,你可以在 GitHub 目录中找到所有的选项和工具,必须确保在所有节点上都安装了 NLP 库。

bin/pulsar-admin functions create --auto-ack true 
--py pulsar-pychat-function/src/sentiment.py --classname "sentiment.Chat" --inputs "persistent://public/default/chat2" --log-topic "persistent://public/default/chatlog2" --name Chat --namespace default --output "persistent://public/default/chatresult2" --tenant public

pip3 install vaderSentiment

运行聊天程序

现在我们已经构建了主题、Function 和 Sink,接下来构建应用程序。完整的网页参见 GitHub 目录,此处仅列举关键部分。对于这个单页应用程序(SPA),我使用了 JQuery 和数据表单,这些数据表单包含在他们公开的 CDN 中。

         <form action="/datatable.html" method="post" enctype="multipart/form-data" id="form-id">    
<div id="demo" name="demo"></demo>    
<p><label>User: </label><input name="user" type="text" id="user-id" size="75" value="" maxlength="100"/></p>
<p><label>Question: </label><input type="text"  name="other-field" type="text" id="other-field-id" size="75" maxlength="200" value=""/></p>
<p><label>Contact Info: </label><input name="contactinfo" type="text" id="contactinfo-id" size="75" maxlength="100" value=""/></p>
<p><input type="button" value="Send to Pulsar" onClick="loadDoc()" /></p>
    </form>

在上面的 HTML 表单中,我们让用户在聊天中添加评论。

现在我们通过 WebSockets,使用 JavaScript 将表单数据以 JSON 格式发送到 Pulsar 主题。WebSockets 是 Apache Pulsar 支持的协议,其 URL 是 ws://pulsar1:8080/ws/v2/producer/persistent/public/default/chat2

其中 ws 是协议,pulsar1 是 Pulsar 服务器,8080 是 REST 端口,producer 是我们正在运行的生产者,persistent 是主题类型,public 是租户,default 是命名空间,chat2 是主题:我们填充一个对象并将其转换为 JSON 字符串,并将该有效负载编码为 Base64 编码的 ASCII 字符串。然后,我们将该编码字符串作为负载添加到一个新的 JSON 字符串中,该字符串包括 Pulsar 消息的负载、属性和上下文。WebSocket 协议需要这种格式才能在 Pulsar 主题中转换为常规消息。

​​<script>
function loadDoc() {
  var xhttp = new XMLHttpRequest();
  xhttp.onreadystatechange = function() {
    if (this.readyState == 4 && this.status == 200) {
      document.getElementById("demo").innerHTML = '';
    }
  };
var wsUri = "ws://pulsar1:8080/ws/v2/producer/persistent/public/default/chat2";

websocket = new WebSocket(wsUri);

const pulsarObject = { 
         userInfo: document.getElementById('user-id').value.substring(0,200), 
        contactInfo: document.getElementById('contactinfo-id').value.substring(0,200), 
        comment: document.getElementById('other-field-id').value.substring(0, 200)};
const jsonStr = JSON.stringify(pulsarObject);
var payloadStr = btoa(jsonStr);
const propertiesObject = {key: Date.now() }
var data = JSON.stringify({ "payload": payloadStr, "properties": propertiesObject, "context": "cs" });

websocket.onopen = function(evt) {
   if (websocket.readyState === WebSocket.OPEN) {
           websocket.send(data);
   }
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {}
websocket.onclose = function(evt) {
  if (evt.wasClean) {    console.log(evt);
  } else {    console.log('[close] Connection died');
  }
};
}
var form = document.getElementById('form-id');
form.onsubmit = function() {
    var formData = new FormData(form);
    var action = form.getAttribute('action');
    loadDoc();
    return false;
  }
</script>

在上面的代码中,我们从表单中获取字段的值,停止表单重新加载页面,然后将数据发送到 Pulsar。

现在,我们可以消费发送到 Sentiment Pulsar Function 结果主题中的消息。我们从如下代码中的 Pulsar 主题中消费: ws://pulsar1:8080/ws/v2/consumer/persistent/public/default/chatresult2/chatrreader?subscriptionType=Shared&receiverQueueSize=500.

通过这个 URI 可以看出它与生产者 URI 有些不同,它额外包含了 receiverQueueSize、消费者标签和共享订阅类型。

JavaScript:

$(document).ready(function() {
    var t = $('#example').DataTable();

var wsUri = "ws://pulsar1:8080/ws/v2/consumer/persistent/public/default/chatresult2/chatrreader?subscriptionType=Shared&receiverQueueSize=500";
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
   console.log('open');
};
websocket.onerror = function(evt) {console.log('ERR', evt)};
websocket.onmessage = function(evt) {

    var dataPoints = JSON.parse(evt.data);
    if ( dataPoints === undefined || dataPoints == null || dataPoints.payload === undefined || dataPoints.payload == null ) {
            return;
    }
    if (IsJsonString(atob(dataPoints.payload))) {
         var pulsarMessage = JSON.parse(atob(dataPoints.payload));
         if ( pulsarMessage === undefined || pulsarMessage == null ) {
                 return;
         }
         var sentiment = "";
         if ( !isEmpty(pulsarMessage.sentiment) ) {
                 sentiment = pulsarMessage.sentiment;
         }
         var publishTime = "";
         if ( !isEmpty(dataPoints.publishTime) ) {
                 publishTime = dataPoints.publishTime;
         }
         var comment = "";
         if ( !isEmpty(pulsarMessage.comment) ) {
                 comment = pulsarMessage.comment;
         }
         var userInfo= "";
         if ( !isEmpty(pulsarMessage.userInfo) ) {
                userInfo = pulsarMessage.userInfo;
         }
         var contactInfo= "";
         if ( !isEmpty(pulsarMessage.contactInfo) ) {
                 contactInfo = pulsarMessage.contactInfo;
         }

                 t.row.add( [ sentiment, publishTime, comment, userInfo, contactInfo ] ).draw(true);
       }
};

} );

对于在 JavaScript WebSockets 中使用的消息,我们必须对负载进行 Base64 解码,并将 JSON 解析为一个对象,然后使用 DataTable 的 row.Add 方法将这些新表行添加到结果中。每当收到消息时都会发生这种情况。

总结

在本博客中,我们介绍了如何在不用考虑数据源的情况下,使用 Apache Pulsar 构建简单的流应用程序。本文选择将兼容 Scylla 的 Sink 添加到聊天应用程序中;然而,我们也可以在 Apache Pulsar 的任何数据存储执行此操作。

相关资源

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私