sharetwitterlinkedIn

ActorCloud(物联网平台)中的 Apache Pulsar 使用案例

head img

背景

EMQ 是一家开源软件公司,为 5G 时代的物联网平台和应用提供高度可扩展的实时消息传递和流引擎。 目前,EMQ 是全球使用最广泛的 MQTT 消息代理之一,已成功支持全球各种客户,包括 HPE、爱立信、华为、中国移动、中国银联等。

ActorCloud 是 EMQ 推出的开源物联网平台,在安全可靠的基础上为设备提供多协议访问、消息流管理、数据解析和数据处理能力。 ActorCloud 使用 Apache Pulsar 存储和处理流数据,利用 Apache Pulsar Functions 更快地处理数据,并通过暴露给上层的 SQL 引擎分析 IoT 数据。

问题

ActorCloud 作为一个物联网平台,需要访问数据、管理设备、存储和分析数据,并向上层提供编程接口,方便开发者开发应用。 由于 ActorCloud 设备多,数据量大,需要具备横向扩展能力,才能满足业务需求。

为什么 Pulsar 最适合

为了解决前面提到的问题,我们需要一个高可用、分布式、可扩展的消息传递平台,即 Apache Pulsar。

  • Apache Pulsar 是一个高度可用和可扩展的消息传递平台,易于部署和维护。
  • Apache Pulsar 实现了一个分区每秒 180 万条消息的高吞吐量,完全满足了我们对海量数据的需求。
  • Apache Pulsar Functions 是轻量级计算进程,它使用来自一个或多个 Pulsar 主题的消息,将用户提供的处理逻辑应用于每条消息,并将计算结果发布到另一个主题。 Apache Pulsar Functions 支持三种运行时:线程、进程和 Kubernetes,为编写、运行和部署 Functions 提供了高度的灵活性。 因此,我们只需要关注计算逻辑,而不是处理复杂的配置和管理,这有助于我们轻松构建流媒体平台。

凭借高可用和可扩展的能力、Function 和连接器,Pulsar 帮助我们更快地开发 ActorCloud,从而最终选择 Pulsar 作为我们的消息传递平台。

我们如何在 ActorCloud 使用 Pulsar

ActorCloud 将用 SQL 编写的业务逻辑通过 API 传输到引擎,并将业务规则转换为 Pulsar 中的连接器和函数。 Sources 通过共享订阅消费来自 EMQ X Brokers 的数据,然后 Pulsar 将这些数据持久化并使用 Functions 实时处理,并通过 sinks 发送到外部系统。

Example

  
    { 
        "id": "mailTest",
        "sql": "SELECT temp FROM sensor WHERE temp > 0",
        "enabled": true,
        "actions": [{
            "mail": {
                "title": "temperature warning",
                "content": "temperature is ${temp} degrees, please take action promptly",
                "emails": [ "alert@emqx.io" ]
            }
        }]
    }
    

我们如何在 ActorCloud 中使用 Pulsar 函数

Apache Pulsar 为无服务器功能提供本机支持,其中数据以流式方式到达后立即处理,并提供灵活的部署选项(线程、进程、容器)。 我们只需要关注计算逻辑,而不是处理复杂的配置或管理,这有助于我们更快、更方便地构建流媒体平台。

  • 为了更好地支持批处理和流处理场景,ActorCloud 使用了 Pulsar 窗口函数。 目前,Pulsar 支持基于计数的窗口和基于时间的窗口。
  • ActorCloud 使用 Pulsar Functions API 和 Pulsar 管理工具(创建、删除、更新、重启、停止、获取等)来操作函数,简化了部署和管理。
  • ActorCloud 使用 Pulsar 的共享订阅模式来扩展数据消费能力。 此外,Pulsar 支持独占、故障转移和 key_shared 订阅模式。
  • Pulsar Functions 提供三种消息传递语义,即最多一次传递、至少一次传递和有效一次传递。 ActorCloud 使用至少一次传递来确保发送到函数的每条消息至少被处理一次。
  • Pulsar 部署了计算和存储分离的多层架构。 存储数据时,ActorCloud 配置消息保留策略以选择数据保留期限。 同时,Pulsar 与 Presto SQL 集成,允许用户使用 Presto SQL 查询存储在 BookKeeper 中的数据。 因此,ActorCloud 使用 Presto SQL 来查询实时和历史数据以处理分析任务。

我们如何在 ActorCloud 使用 Pulsar IO 连接器

Pulsar IO 连接器有两种类型:

  • 来源从其他系统向 Pulsar 提供数据,常见来源包括其他消息传递系统和 firehose 式数据管道 API。
  • Sinks 由 Pulsar 提供数据,常见的 sinks 包括其他消息传递系统、SQL 和 NoSQL 数据库。

ActorCloud 充分利用了 Pulsar IO 连接器,并创建了各种源和接收器来满足不同的需求。

  1. EMQ source

    从 EMQ 读取数据,向 Pulsar 主题写入数据(从 EMQ 与 Pulsar 同步数据)。

  2. Mail sink

    接收来自 Pulsar 主题的数据并发送电子邮件。

  3. Publish sink

    Receive data from Pulsar topics and send data to external systems using initialized HttpClient.

  4. DB sink

    Receive data from Pulsar topics and send data to external systems. DB sink encapsulates JDBC and supports sending data from Pulsar topics to SQLite, MySQL, and PostgreSQL.

  5. EMQ sink

    Receive data from Pulsar topics and send data to EMQ X topics.

总结

ActorCloud 结合 EMQ X 和 Apache Pulsar,实现 IoT 设备数据访问、设备管理、数据存储、数据分析,并提供灵活的编程接口来开发满足 IoT 垂直行业特定需求的 IoT 应用,实现设备的横向扩展 访问和数据处理。

© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私