Tag Archives: Scalability

Redis Replication

Continuing on my series of introductory posts on Redis DB, today i’ll address the subject of replication.

 

Definition:

  • Replication is a method by which other servers receive a continuously updated copy of the data as it’s being written, so that the replicas can service read queries.

 

Basic info (redis.io):

  • Redis uses asynchronous replication. Starting with Redis 2.8 there is however a periodic (one time every second) acknowledge of the replication stream processed by slaves.
  • A master can have multiple slaves.
  • Slaves are able to accept other slaves connections. Aside from connecting a number of slaves to the same master, slaves can also be connected to other slaves in a graph-like structure.
  • Redis replication is non-blocking on the master side, this means that the master will continue to serve queries when one or more slaves perform the first synchronization.
  • Replication is non blocking on the slave side: while the slave is performing the first synchronization it can reply to queries using the old version of the data set, assuming you configured Redis to do so in redis.conf. Otherwise you can configure Redis slaves to send clients an error if the link with the master is down. However there is a moment where the old dataset must be deleted and the new one must be loaded by the slave where it will block incoming connections.
  • Replications can be used both for scalability, in order to have multiple slaves for read-only queries (for example, heavy SORT operations can be offloaded to slaves), or simply for data redundancy.
  • It is possible to use replication to avoid the saving process on the master side: just configure your master redis.conf to avoid saving (just comment all the “save” directives), then connect a slave configured to save from time to time.

 

How Redis replication works (redis.io):

  • If you set up a slave, upon connection it sends a SYNC command. And it doesn’t matter if it’s the first time it has connected or if it’s a re-connection.
  • The master then starts background saving, and collects all new commands received that will modify the dataset. When the background saving is complete, the master transfers the database file to the slave, which saves it on disk, and then loads it into memory. The master will then send to the slave all accumulated commands, and all new commands received from clients that will modify the dataset. This is done as a stream of commands and is in the same format of the Redis protocol itself.
  • You can try it yourself via telnet. Connect to the Redis port while the server is doing some work and issue the SYNC command. You’ll see a bulk transfer and then every command received by the master will be re-issued in the telnet session.
  • Slaves are able to automatically reconnect when the master <-> slave link goes down for some reason. If the master receives multiple concurrent slave synchronization requests, it performs a single background save in order to serve all of them.
  • When a master and a slave reconnects after the link went down, a full re-sync is always performed. However starting with Redis 2.8, a partial re-synchronization is also possible.

 

In order to configure the replication, all you have to do is to add the line below (or issue the same as a CLI command from slave) to the redis.conf file of the slave.

  • SLAVEOF <master_ip> <master_port>             (ex. SLAVEOF 127.0.0.1 6379)

 

to tune the replication process you can play with following options in the redis.conf file:

  • requirepass <password> – Require clients to issue AUTH <PASSWORD> before processing any other commands. This might be useful in environments in which you do not trust (eg. don’t run your own servers) others with access to the host running redis-server
  • masterauth <master-password> – If the master is password protected (using the “requirepass” configuration directive above) it is possible to tell the slave to authenticate before starting the replication synchronization process, otherwise the master will refuse the slave request
  • slave-serve-stale-data <yes|no> – When a slave loses its connection with the master, or when the replication is still in progress, the slave can act in two different ways:
    • still reply to client requests, possibly with out-of-date data (the default behavior if the switch is set to “yes”)
    • or reply with an error “SYNC with master in progress” to all the kind of commands, except for to INFO and SLAVEOF (otherwise)
  • slave-read-only <yes|no> – You can configure a slave instance to accept writes or not. Writing against a slave instance may be useful to store some ephemeral data (because data written on a slave will be easily deleted after re-sync with the master anyway), but may also cause problems if clients are writing to it because of a misconfiguration
  • repl-ping-slave-period <seconds> – Slaves send PINGs to server in a predefined interval. It’s possible to change this interval with the repl_ping_slave_period option from CLI. The default value is 10 seconds.
  • repl-timeout <seconds> – This option sets a timeout for both Bulk transfer I/O timeout and master data or ping response timeout. The default value is 60 seconds. It is important to make sure that this value is greater than the value specified for repl-ping-slave-period otherwise a timeout will be detected every time there is low traffic between the master and the slave.
  • repl-disable-tcp-nodelay <yes|no> – Controls whether to disable TCP_NODELAY on the slave socket after SYNC. If you select “yes” Redis will use a smaller number of TCP packets and less bandwidth to send data to slaves. But this can add a delay for the data to appear on the slave side, up to 40 milliseconds with Linux kernels using a default configuration. If you select “no” the delay for data to appear on the slave side will be reduced but more bandwidth will be used for replication. Default value of “no” is an optimization for low latency, but in very high traffic conditions or when the master and slaves are many hops away, turning this to “yes” may be a good idea.
  • slave-priority <integer> – The slave priority is an integer number published by Redis in the INFO output. It is used by Redis Sentinel in order to select a slave to promote into a master if the master is no longer working correctly. A slave with a low priority number is considered better for promotion, so for instance if there are three slaves with priority 10, 100, 25 Sentinel will pick the one with priority 10, that is the lowest. However a special priority of 0 marks the slave as not able to perform the role of master, so a slave with priority of 0 will never be selected by Redis Sentinel for promotion.

 

Allowing writes only with N attached replicas (redis.io):

  • Starting with Redis 2.8 it is possible to configure a Redis master in order to accept write queries only if at least N slaves are currently connected to the master, in order to improve data safety.
  • However because Redis uses asynchronous replication it is not possible to ensure the write actually received a given write, so there is always a window for data loss.
  • This is how the feature works:
    • Redis slaves ping the master every second, acknowledging the amount of replication stream processed.
    • Redis masters will remember the last time it received a ping from every slave.
    • The user can configure a minimum number of slaves that have a lag not greater than a maximum number of seconds.
    • If there are at least N slaves, with a lag less than M seconds, then the write will be accepted.
  • There are two configuration parameters for this feature:
    • min-slaves-to-write <number of slaves>
    • min-slaves-max-lag <number of seconds>

 

Have a nice weekend!

Redis data sharding – part 2 – hash-based keys

In my previous post on Redis data sharding i introduced the concept of data sharding/partitioning and provided a small Java code example to illustrate the idea. As you noticed i was creating fixed-size “emailbuckets”, containing 1024 emails each. An email address was the key of my hash, while user id was the value. For a shard identifier i used a simple integer value (shardKey) obtained as a result of “i mod shardSize” operation.

Whereas such approach illustrates the concept well, it’s impractical in “real life” applications. This is due to a simple reason – knowing the email address alone (which may often be the only thing you’d know at some point of the app execution flow; for example when you’re using Spring Security and requesting emails “as usernames” during sign-in process), you wouldn’t be able to retrieve the corresponding userId. If you would know the algorithm by which shardKey was generated, in this case – yes – you would be able to traverse emailbuckets one-by-one looking for the appropriate email, but without that knowledge you wouldn’t be able to tell which shard the email address you’re looking for, ended up in.

One solution to that problem is to use a different key for sharding data; something that is computed directly based on the data you’re interested in partitioning. An obvious candidate here is the email address itself. If you’d be able to generate shardKey based on email address you could reproduce the same scenario every time a user provides you with his email during signing in and retrieve his userId (which you could use later on (for example) to further query another hash in Redis – “users:id” – that stores complete user profile).

 

This seems like an ideal task for a hash function… Let’s start first with some background on hashing. According to Neil Coffey’s Javamex article Introduction to hashing:

  • Hashing means using some function or algorithm to map object data (eg. content of a String object) to some representative integer value. This so-called hash code (or simply hash) can then be used as a way to narrow down our search when looking for the item…

 

Also, when you search Wikipedia after Java hashCode() function, you’ll get the following definition:

  • In the Java programming language, every class must provide a hashCode() method which digests the data stored in an instance of the class into a single hash value (a 32-bit signed integer). This hash is used by other code when storing or manipulating the instance – the values are intended to be evenly distributed for varied inputs in order to use in clustering. This property is important to the performance of hash tables and other data structures that store objects in groups (“buckets”) based on their computed hash values.

 

Looks like this is exactly what we’re interested in – …evenly distributed values for varied inputs…, which …is important to the performance of data structures that store objects in groups (shards in our case) based on their computed hash values.

Conclusion: Java hashCode() function is what we’ll proceed with.

 

More from Wikipedia on Java hashCode():

  • Starting with Java version 1.2, the java.lang.String class implements its hashCode() using a product sum algorithm over the entire text of the string.
  • An instance s of the java.lang.String class, would have a hash code h(s) defined by:
    Java String hashCode()

     

  • where terms are summed using Java 32-bit int addition, s[i] denotes the i-th character of the string, and n is the length of s.

 

Now, looking at Java docs on String.hashCode() function we can read:

  • The hash code for a String object is computed as
    s[0]*31^(n-1) + s[1]*31^(n-2) + … + s[n-1]

     

  • using int arithmetic, where s[i] is the ith character of the string, n is the length of the string, and ^ indicates exponentiation. (The hash value of the empty string is zero.)

 

Finally let’s take a look at some Java code of String object showing how hashCode() has actually been implemented:

public int hashCode() {
    int h = hash;
    if (h == 0 && value.length > 0) {
        char val[] = value;

        for (int i = 0; i < value.length; i++) {
            h = 31 * h + val[i];
        }
        hash = h;
    }
    return h;
}

 

An alternative (faster) implementation may look like this (from Apache Harmony JDK):

public int hashCode() {
    if (hashCode == 0) {
        int hash = 0, multiplier = 1;
        for (int i = offset + count - 1; i >= offset; i--) {
            hash += value[i] * multiplier;
            int shifted = multiplier << 5;
            multiplier = shifted - multiplier;
        }
        hashCode = hash;
    }
    return hashCode;
}

what’s the difference between the two above code snippets? As you can see, multiplication can be replaced by a bitwise shift operation and a subtraction for better performance. “(multiplier << 5) – multiplier” is just 31*multiplier after all (however VMs nowadays do this optimization automatically). If you’re interested in good reading on the subject of Binary numbers i strongly recommend Neil Coffey’s Javamex article: Introduction to binary numbers.

 

OK, applying all this knowledge to our sharding code example results in the following implementation:

while(i<1000000) {
    String userId = String.valueOf(i++);
    String emailAddress = String.format("user_%s@mariuszprzydatek.com", userId);
    int shardKey = emailAddress.hashCode();
    redisTemplate.opsForHash().put(String.format("emailsbucket:%s", shardKey), emailAddress, userId);
}

 

Happy coding! 🙂

 

 

Resources:

Redis data sharding – part 1

In one of my previous posts on Redis i provided a definition of data sharding, quoting a great book “Redis in Action” authored by Dr. Josiah L Carlson:

  • “Sharding is a method by which you partition your data into different pieces. In this case, you partition your data based on IDs embedded in the keys, based on the hash of keys, or some combination of the two. Through partitioning your data, you can store and fetch the data from multiple machines, which can allow a linear scaling in performance for certain problem domains.”

 

Today i’d like to elaborate some more on data sharding based on IDs embedded in the keys.

 

Let’s start with an example of a hypothetical data stored in an Redis instance:

redis 127.0.0.1:6379>keys *
(empty list or set)
redis 127.0.0.1:6379>set emails:1 me@mariuszprzydatek.com
OK
redis 127.0.0.1:6379>get emails:1
"me@mariuszprzydatek.com"

what i did here is to use the basic String data type to store an email of a user. As you can see, i embedded user id within the key (’emails:1′). Now, if a front-end application would ask for an email address of a user with id=1, on the back-end side i would concatenate the keyword which i usually use to denote keys i store emails under (ie. ’emails’) with the id of a user (‘1’), add a colon (‘:’) in between, and this way i’ll get the resulting key (’emails:1′) i should look after while making a call to the Redis instance.

 

This solution is nice but if i’ll have 1 million of users registered in my system and use Redis as the data store for keeping mappings between identifier of a user and his email, i will end up with 1 million keys (’emails:1′, ’emails:2′, ’emails:3′, etc.). This is a volume my Redis instance will easily handle (see my previous post on Redis Performance Basics) and it will use little more than 190MB to store everything in the memory (so much due to a lot of overhead when storing small keys and values; the ratio is much better with large keys/values), but this is only one attribute we’re talking about – and what about firstName, lastName, etc.?. Obviously, if my system will have millions of registered users and i’d use Redis as my primary data store for users-related info, i would be running multiple instances of Redis already and based on the id of a user, route the queries to a specific instance, but there’s still a lot we can do to optimize costs prior to thinking about scaling.

 

Small code snippet to generate 1M of emails stored in Redis using String data structure (and Spring Data Redis mentioned in my other post).

int i = 0;
while(i<1000000) {
    redisTemplate.opsForValue().set(String.format("emails:%s", i++), "me@mariuszprzydatek.com");
}

the loop above executes in 2 mins on my Windows 8 64bit i7 laptop and the ‘redis-server’ process allocates ca 190 MB of memory.

 

Now, what will happen if we change the data structure let’s say to a Redis Hash?

Next code snippet and we’re getting just that:

int i = 0;
while(i<1000000) {
    String userId = String.valueOf(i++);
    String emailAddress = String.format("user_%s@mariuszprzydatek.com", userId);
    redisTemplate.opsForHash().put("emails", emailAddress, userId)
}

2 mins and 165 MB of memory allocated – a 15 % gain absolutely for free.

 

Let’s try with data sharding/partitioning. Another code snippet using Redis Hash data structure and there you go:

int shardSize = 1024;
int i = 0;
while(i<1000000) {
    int shardKey = i/shardSize;
    String userId = String.valueOf(i++);
    String emailAddress = String.format("user_%s@mariuszprzydatek.com", userId);
    redisTemplate.opsForHash().put(String.format("emailsbucket:%s", shardKey), emailAddress, userId);
}

2 mins later and… only 30 MB allocated – now you’re talking Mariusz!

Staggering 530 % increase in memory allocation efficiency!

 

Hope you enjoyed the first part of this brief tutorial.

 

Cheers!

 

 

Resources: