Pika Cluster Horizontal Scaling: Unlimited Performance

By | September 15, 2020

Background

Pika is a persistent large-capacity redis storage service, compatible with most of the interfaces of string, hash, list, zset, and set (compatibility details), and solves the capacity bottleneck of redis that is insufficient due to the huge amount of stored data. Users can migrate from redis to pika service without modifying any code. It has good compatibility and stability. It has been used by our company with more than 3000 instances and the github community has exceeded 3.8K stars. Since the capacity of a single machine pika is limited by the capacity of a single hard disk, our company’s business and communities have increasingly strong demand for distributed pika clusters. Therefore, we launched a native distributed pika cluster and released pika version v3.4. Compared with the pika+codis cluster solution, Codis’s support for pika creation and management of slot operations is not friendly and requires a lot of intervention by operation and maintenance personnel. The pika native cluster does not require the additional deployment of the codis-proxy module.

Cluster deployment structure

image.png
Taking a cluster of 3 pika nodes as an example, the cluster deployment structure is shown in the figure above:
  1. Deploy Etcd cluster as the meta-information store of pika manager.
  2. Deploy pika manager on the three physical machines and configure the Etcd service port. Pika manager will register with etcd and compete to become the leader. There is only one pika manager in the cluster that can become the leader and write cluster data to etcd.
  3. Deploy pika nodes on the three physical machines, and then add the information of the pika nodes to the pika manager.
  4. For load balancing, register the service port of pika to LVS.

Data distribution

image.png
In order to isolate data according to business, Pika cluster introduces the concept of table, and different business data are stored in different tables. The business data is stored in the corresponding slot according to the hash value of the key. Each slot will have multiple copies to form a replication group. All slot replicas in the replication group have the same slot ID. One of the slot replicas is the leader and the other replicas are the followers. In order to ensure data consistency, only the leader provides read and write services. You can use pika manager to schedule and migrate slots, so that data and read and write pressure are evenly distributed to the entire pika cluster, thus ensuring the full utilization of the entire cluster resources and horizontal expansion and shrinking according to business pressure and storage capacity needs. Pika uses rocksdb as the storage engine, and each slot creates a corresponding rocksdb. Each slot in pika supports 5 data structures for reading and writing redis. Therefore, it is particularly convenient when data is migrated, just migrate the slots in pika. But at the same time there is the problem of excessive resource usage. The current pika creates 5 rocksdb by default when creating a slot to store 5 data structures. When the table contains a large number of slots or creates a large number of tables, a single pika node will contain multiple slots, which in turn creates too many instances of rocksdb and consumes too much system resources. In subsequent versions, on the one hand, it will support the creation of one or more data structures based on business needs when creating a slot. On the other hand, it will continue to optimize the blackwidow interface layer in pika to reduce the use of rocksdb.

data processing

image.png
  1. When the pika node receives a user request, the parsing layer processes and parses the redis protocol, and passes the resolved result to the router layer for judgment.
  2. The router finds the slot corresponding to the key according to the hash result of the key, and judges whether the slot is on the local node.
  3. If the slot where the key is located is on another node, a task is created according to the request and placed in the queue, and the request is forwarded to the peer node for processing. When the task receives the processing result of the request, it returns the request to the client.
  4. If the slot where the key is located belongs to the local node, the request is directly processed locally and returned to the client.
  5. For write requests that need to be processed locally, first write binlog through the replication manager module and asynchronously replicate to other slot replicas. The process layer writes into the leader slot according to the requirements of consistency. Among them, blackwidow is an interface encapsulation to rocksdb.
We embed the proxy in the pika, which does not need to be deployed separately. Compared with the Redis cluster, the client does not need to be aware of the existence of the proxy and only needs to use the cluster as a stand-alone machine. You can mount the service port of the pika node to the LVS to achieve load balancing of the pressure in the entire cluster.

Log replication

The replication manager module in pika is responsible for the master-slave synchronization of the log. In order to be compatible with Redis, pika supports non-consistent log replication. The leader slot writes data directly in the DB without waiting for the ack response from the follower slot. At the same time, it also supports log replication in the raft consistency protocol, and it needs to meet the ack of receiving most copies before writing to DB.
Inconsistent log replication
image.png
The processing flow in non-consistent scenarios is as follows:
  1. The processing thread receives the client’s request writes to the binlog and operates the DB directly after locking.
  2. The processing thread returns the client response.
  3. The auxiliary thread sends a BinlogSync synchronization request to the follower slot to synchronize logs.
  4. The follower slot returns BinlogSyncAck to report the synchronization status.
Consistent log replication
image.png
In a consistent log replication scenario:
  1. The processing thread writes the client request to the binlog file
  2. Synchronize to the slave library by sending BinlogSync request
  3. Return BinlogSyncAck from the library to report synchronization status
  4. Check that the response from the library satisfies the majority and write the corresponding request to the db
  5. Return the response to the client

Cluster metadata processing

We developed pika manager (PM for short) on the basis of codis-dashboard, which serves as the global control node of the entire cluster for deployment and scheduling management of the cluster. The metadata and routing information of the entire cluster are stored in the PM.
  • The function of creating multiple tables in the cluster is added to facilitate business data isolation based on different tables.
  • Supports specifying the number of slots and the number of replicas when creating a table, facilitating operation and maintenance to create a table based on the scale of the business and fault tolerance.
  • Logically change the concept of the group to replication group, so that the original process-level data and log replication is transformed into slot-level replication.
  • Supports creating a password when creating a table to isolate business usage. The client only needs to execute the auth and select statements to authenticate and operate on the specified table.
  • Support slot migration to facilitate expansion and contraction according to business needs.
  • Integrating the sentinel module, PM will continuously send heartbeats to the pika nodes in the cluster to monitor the survival status. When PM finds that the leader slot is down, it will automatically promote the slave slot with the largest binlog offset as the leader.
  • The storage backend supports writing metadata to etcd to ensure high availability of metadata.
  • Pika manager becomes the leader by constantly competing for locks with etcd to achieve high availability of pika manager.

Postscript

The introduction of the pika native cluster solves the limitation of the disk capacity of a single pika, and can be expanded horizontally according to business needs. But there are still some shortcomings, such as the lack of the internal automatic main selection function based on the raft, the data distribution based on the range, and the display board of monitoring information. We will solve these problems one by one in subsequent versions.