sharetwitterlinkedIn

How Proxima Beta Implemented CQRS and Event Sourcing on Top of Apache Pulsar and ScyllaDB

November 15, 2022
head img

Background

As a part of Tencent Interactive Entertainment Group Global (IEG Global), Proxima Beta is committed to supporting our teams and studios to bring unique, exhilarating games to millions of players around the world. At Proxima Beta, our team is responsible for managing a wide range of risks to our business. As such, we must build an efficient real-time analytics system to consistently monitor all kinds of activities in our business domain.

In this blog, I will talk about our experience of building a real-time analytics system on top of Apache Pulsar and ScyllaDB. Before I share our practices in detail, I will introduce two major architectures for data manipulation, namely CRUD and CQRS. I will also explain our reasons for combining CQRS and Event Sourcing to implement our service architecture, as well as their advantages over CRUD-based systems. Lastly, I will dive deeper into our practices of leveraging distinguishing features of Apache Pulsar for better data governance, such as multitenancy and geo-replication.

A stereotypical CRUD system

CRUD is the acronym for Create, Read, Update and Delete. It is one of the most common data processing methods for microservices development. These four operations are essential for managing persistent data, often used for relational database applications.

Figure 1

In Figure 1, this CRUD architecture is built on a backing data storage system. Typically, this system is a Relational Database Management System (RDBMS), but it does not have to be. It could be as easy as a key-value store, an object database, or even a plain text file. A key feature of the backing storage is that it represents the current state of the object in a domain, which refers to the focus of a functional team.

Beneath the backing data storage lies an application server, which contains the business logic of the system. This is also where the validation and orchestration logic exists for processing the requests sent to the application server.

Note that this is an oversimplified architecture. A real-world system may also use other patterns working together with this stereotypical approach. For example, you may have a gateway layer in front of the application and a data layer between the application server and the backing storage.

This example shows the idea of using the application server to abstract the data storage of a system and to provide a centralized location of business logic. This practice has become extremely popular over the past years and it is in many circumstances considered to be the default architecture applied to many systems. Also, many tools have been invented around this architecture to increase productivity for developers.

To summarize, traditional architectures based on CRUD use the same data model for reads and writes with the following characteristics.

  • Simple and straightforward. The CRUD pattern is easy to understand and implement.
  • Clients adhere to the application API contract defined by application developers. Developers design domain models and DTOs, which are part of the contract, in light of functional requirements.
  • The system pivots on states. An object is a collection of states. You may call “states” differently depending on the programming language. For example, they are known as “fields” in Golang, “variables” in C++, and “attributes” in Python (in some cases, “properties”). Essentially, all of them are states. The reason why I say this approach pivots on states is that the backing storage mostly only keeps the current states. We usually have objects first and then determine what transactions or actions are needed later. By looking at your database schema, you probably have no idea of what actions should be taken on a particular object.

CQRS: Separating reads and writes for your data store

CQRS stands for Command Query Responsibility Segregation. Greg Young coined this term in his CQRS Documents. According to the documents, CQRS originated from Bertrand Meyer's Command and Query Separation Principle. Wikipedia defines the Principle as:

It states that every method should either be a command that performs an action, or a query that returns data to the caller, but not both. In other words, asking a question should not change the answer. More formally, methods should return a value only if they are referentially transparent and hence possess no side effects.

CQRS was originally considered just to be an extension of this CQS Principle at a higher level. Eventually, after much confusion between the two concepts, they were finally deemed to be different patterns.

CQRS uses exactly the same definition of Commands and Queries that Meyer used. The fundamental difference is that in CQRS, objects are split into two categories, one containing the Commands and the other containing the Queries.

Figure 2

This is another definition of CQRS in Amanda Bennett's blog:

The Command and Query Responsibility Segregation (CQRS) pattern separates read and write operations for a data store. Reads and writes may take entirely different paths through the application and may be applied to different data stores. CQRS relies on asynchronous replication to progressively apply writes to the read view, so that changes to the application state instigated by the writer are eventually observed by the reader.

The key idea of CQRS is to explicitly build data models that serve reads and writes respectively instead of doing them against the same data model. This pattern is not very interesting by itself. However, it becomes extremely interesting when working together with Event Sourcing from an architectural point of view.

Event Sourcing: Handling operations on data driven by events

The fundamental idea of Event Sourcing is to ensure every change to the state of an application is captured in an event object. Event objects are stored in the sequence they were applied for the application state itself. For the Event Sourcing pattern, instead of storing just the current state of the data in a domain, you use an append-only store to record the full series of actions taken on that data.

This idea is simple but really powerful because the event store acts as a system of records and can be used to materialize domain objects and views. As events represent every action that has been recorded, any possible model describing the system can be built from the events.

In reality, there are many cases where Event Sourcing is applied. A good example of Event Sourcing is our bank statement as shown in the table below.

Timestamp Comment Change Balance
2022/1/1 Deposit 1 1000 1000
2022/1/2 Check 1 (100) 900
2022/1/3 Coffee (5) 895
2022/1/5 Deposit 2 1000 1895

In short, Event Sourcing tracks changes by capturing the sequence of actions instead of overwriting states deconstructively, which is what a CRUD system usually does.

Why CQRS and Event Sourcing

CQRS-based implementations are often used together with the Even Sourcing pattern.

On the one hand, CQRS allows you to use Event Sourcing as a data storage mechanism, which is very important when building a non-trivial CQRS-based system. Although you can maintain relational models for reading and writing respectively, this practice requires high cost, since there is an event model required to synchronize the two. As mentioned above, CQRS fundamentally separates reads and writes into different models. This means with Event Sourcing, you can leverage the event model as the persistence model on the write side.

Figure 3

On the other hand, one of the major issues of using Event Sourcing alone is that you cannot perform a query like “Give me all users whose first names are Joe” to a system. This is impossible due to the lack of a representation of the current state. The only valid query to an Event sourcing system alone is GetEventById. The responsibility of maintaining the current state is shifted to event processors. Different processors can generate different views against the same events.

Here, I would like to share a real-world example to further explain why we selected CQRS with Event Sourcing for our service architecture.

Figure 4

This is a screenshot from Tower of Fantasy, a 3D-action role-playing game, which was released recently. Players can use this dialog to file a report against another player for different reasons. If you use a stereotypical CRUD system for it, how do you keep those records for follow-ups? And what are the potential problems?

The first challenge would be the team that is going to own the database to store this form. There are different reasons so a case may be handled by different functional teams. However, there is not a single functional team that can fully own the form. There is even an option called “Others”, but we don’t have a team to handle it.

Therefore, it is a natural choice for us to capture this case as an event, like “report a case”. All the information is captured in this event as is. All functional teams only need to subscribe to this event and do their own filtering. If they think the case falls into their domain, they can just capture it and trigger further actions.

I hope this real-world example could show you the reason for our decision to choose CQRS with Event Sourcing as the service architecture of our solution. Here are some scenarios where you may consider using CQRS with Event Sourcing over CRUD.

  • Team integration. Collaborative domains where many users access the same data in parallel. CQRS allows you to define commands with enough granularity to minimize merge conflicts at the domain level.
  • Different work styles. In the stereotypical architecture, functional teams tend to work in their silo, without too much concern about whether their API contract is friendly enough to the production team. In a worst-case scenario, the same information may be passed to different functional teams in different ways because they have different design styles.

For more information about when you should use CQRS with Event Sourcing, see this page.

Note that this blog is not intended to convince you that the CRUD model is no longer applicable. In fact, it is still widely used across different use cases. If you think a CRUD-based architecture is sufficient for your organization, you should definitely use it.

Apache Pulsar: A cloud-native, distributed messaging and streaming system

As I give you an overview of our implementation of CQRS and Event Sourcing, I will also introduce some key features of Pulsar that distinguish it from other streaming and messaging alternatives. They are essential for us when making the decision to land our solution on top of it.

Architecture characteristics

  • Compute-storage separation. This cloud-native architecture of Pulsar allows for independent scaling of both the serving and storage layers, handled by brokers and bookies respectively. For example, if you don’t need high throughput while hoping to keep your data for a longer period of time, you can simply scale up bookies. Alternatively, you can take advantage of the tiered storage feature of Pulsar to offload your data to cloud storage. On the flip side, if you want to increase the throughput, you can add more broker nodes.
  • Node parity. There is no master node in a Pulsar cluster. Pulsar brokers are stateless and are equivalent to each other. This means you can scale your cluster much faster and easier with better fault tolerance.

Multitenancy and workload isolation

Pulsar features a three-level hierarchy of tenants, namespaces, and topics for message isolation, with tenants being the first-class citizen. If you are going to run a Pulsar instance that is shared within a large organization, multitenancy is a must-have feature to isolate the workload of different departments. Within a tenant, Pulsar allows you to further divide the workload by namespace.

Geo-replication

You can enable geo-replication in Pulsar to synchronize data across multiple data centers. As shown in the figure below, consumers and producers in different clusters (usually geo-diversified) can have the same view of event streams (topics). This ensures you will still be able to access the data in the event of a cloud failure.

Figure 5

Pulsar and ScyllaDB in our service architecture

As we can see from the diagram below, event storage is pivotal to successfully implementing a system based on CQRS and Event Sourcing.

Figure 6

In this architecture, we use Apache Pulsar as the event storage solution because it meets the following needs:

  1. Multitenancy and workload isolation. This feature is critical to large organizations like us. As multiple teams are working on the same set of data (event streams) in parallel, you must have fine-grained access control and prevent workloads interference with each other.
  2. Scalability and elasticity. Since all activities will be captured as events and all events will be recorded for a certain amount of time, we need the ability to scale our cluster according to the volume of incoming traffic.
  3. Geo-replication. Running a business across the globe is a challenging task, as we need to take different factors into consideration, such as policy compliance and network latency.

I will explain how Pulsar has helped us in these aspects in more detail in the next section.

On the read side of the system, any SQL/NoSQL solutions that fit your query workload could be a good candidate. It is also possible to have more than one state store and optimize each of them for a certain kind of query. In our use case, since we are dealing with hundreds of thousands of game-playing sessions in parallel, we finally landed our solution on ScyllaDB as the state storage (An alternative implementation of Apache Cassandra, inspired by Amazon DynamoDB).

A multi-cluster solution built on Apache Pulsar

There are different reasons for building a multi-cluster system as shown below:

  • Achieve the Recovery Time Objectives (RTOs) and the Recovery Point Objectives (RPOs) of your organization
  • Lower network latency for better user experience
  • Comply with rules and regulations

In our case, low network latency and regulation compliance are top priorities. We are trying our best to make sure data is processed and saved in the right region. Let me quickly walk you through some typical approaches to deploying a multi-cluster system.

Independent clusters in different regions

This approach runs multiple independent instances in different regions with no intercommunication. In some cases, eliminating cross-region connectivity is necessary. For example, you may need to deploy a dedicated cluster in a customer's private data center. The downside is that the maintenance cost will surge as you have more clusters. However, it does give you a high level of confidence in compliance, since there is no way you can accidentally process or save data in the wrong location.

Application-level federation

This solution pushes the complexity to the application layer. The application server coordinates with other peers to make sure data is saved to the right location. If your organization only runs one application and doesn't have a heterogeneous infrastructure, this approach probably makes more sense. This is because no matter how complex the implementation is, you only have to do it once.

In reality, however, a large organization may have hundreds of applications. We think it is not reasonable to ask every application developer to deal with a multi-cluster deployment. To make our developers less worried about complicated compliance issues, we took another approach, also known as the Global Data Ring.

Global Data Ring

This solution is a combination of policies and technologies. Every application only has access to local endpoints. Every cluster contains a Pulsar instance and a ScyllaDB instance. There is no interconnectivity between applications. This ensures that no application can accidentally access a region that it should not touch. Our Platform team can enforce this implementation without involving individual application developers.

Figure 7

In this architecture, we are using Pulsar namespaces as geofencing data containers. Currently, we have three types of namespaces:

  • Global. Geo-replication is enabled for the global namespace among all clusters. Applications running in different regions can share the same view of the namespace. Any data written to the global namespace is automatically replicated to the rest of the regions.
  • Regional. Geo-replication is not enabled for the regional namespace (or local namespace). The data will be stored in the same region as the writer.
  • Cross-region. Geo-replication is enabled for the cross-region namespace only with selected clusters. The reason is that in accordance with some local rules and regulations, we can copy the data out of a country in some cases, as long as the original copy stays within the border. This gives us great flexibility to move our workload to nearby regions, helping optimize our overall cost.

In this practice, the compliance policy can be represented by the system configuration of Pulsar and ScyllaDB, which is under the entire control of the Platform team. Those structured configurations are much easier for auditing and visualization, which eventually help us build better governance within our organization.

What’s next for your organization

In this blog, I explained CRUD and CQRS, and why we should combine CQRS and Event Sourcing together. I hope our experience of implementing CQRS and Event Sourcing on top of Pulsar and ScyllaDB can be helpful to those who want to build similar architectures. And here are my suggestions for you in terms of short- and long-term planning.

  1. Start by trying to build materialized views for queries.
  2. Figure out what domain events your system can produce with your client.
  3. Implement your data model based on the domain events of your client and try to establish a global data ring with your organization.

Reference

CQRS Documents | Greg Young

Introduction to CQRS | Amanda Bennett

CQRS pattern - Azure Architecture Center | Microsoft Learn

Event Sourcing pattern - Azure Architecture Center | Microsoft Learn

More on Apache Pulsar

Pulsar has become one of the most active Apache projects over the past few years, with a vibrant community driving innovation and improvements to the project. Check out the following resources to learn more about Pulsar.

  • Start your on-demand Pulsar training today with StreamNative Academy.
  • Spin up a Pulsar cluster in minutes with StreamNative Cloud. StreamNative Cloud provides a simple, fast, and cost-effective way to run Pulsar in the public cloud.
  • Register now for free for Pulsar Summit Asia 2022! Held on November 19th and 20th, this two-day virtual event will feature 36 sessions by developers, engineers, architects, and technologists from ByteDance, Huawei, Tencent, Nippon Telegraph and Telephone Corporation (NTT) Software Innovation Center, Yum China, Netease, vivo, WeChat, Nutanix, StreamNative, and many more.
© 北京原流科技有限公司Apache、Apache Pulsar、Apache BookKeeper、Apache Flink 及相关开源项目名称均为 Apache 软件基金会商标。条款隐私