суббота, 7 апреля 2018 г.

Using PgRocks to sync and rewind data on PostgreSQL nodes

Introduction

The following article discusses the usage of PgRocks on hosts co-running the same web app (i.e. nodes) with a goal to provide a solution for the task of syncing data between PostgreSQL tables on these nodes.

The software we need on them is built up for a web application running on Apache and making connection to a PostgreSQL database. We will assume a copy of our own corporate web application is running on each node.

The data used by our app is such that there are local tables as well as 'global' tables, while only this latter data must be entirely shared by all nodes.

So we are going to deal with this situation by syncing only these tables to all of our PG nodes, while the rest of the tables contain locally significant data and are not to be in sync between the nodes. This approach is further discussed in this article.

Here is described how to setup PgRocks on a single Linux machine. We will compile and install PostgreSQL and RocksDB with PgRocks on each of the nodes according to that guide.

Then we are concerned with a choice of network layer implementation because we have to share the bottom layer of data between our nodes. The bottom level of data is supposed to reside in a RocksDB store that is accessible from all the nodes for read and write. While there are several solutions how to setup such RocksDB store, for simplicity in this article we suppose a network file system (NFS) is the setup sufficient for demonstration of key principles of our approach (see Fig. 1).

Figure 1. The network layer implementation scheme.

Also for simplicity further in this article when we show examples we will limit to only two nodes.

1. Preparing initial Cache-0

We will refer as Cache-0 to all those tables that have to be identical on all nodes, so that our goal is to keep Cache-0 tables in the state of synchronization with each other between the nodes.

We assume that Cache-0 tables will normally be accessed by the users of our application through their web interface. The access to them by superusers via console must not interfere with the web application.

Before our web application is launched on the nodes, everything has to get prepared on each node.

First, we assign numbers to each PostgreSQL node through postgresql.conf new parameter node_number, namely:

node_number = 1

and

node_number = 2
...

and so on, on all nodes, respectively.

Second, we prepare a PL/PGSQL library that contains functions we need for communication between PostgreSQL nodes, the sync engine (see later) and RocksDB store. We will discuss the details of these functions later in this article. Most important for us now, this library contains functions for creating RocksDB store, packing initial data to RocksDB store and filling the Cache-0 from the RocksDB store. We have to deploy this library on all our participating nodes.

Third, only on one of our nodes we execute functions from the library in order to create a new RocksDB store on a mounted NFS partition, pack our initial data into it and simultaneously create the initial DNA (Data Numeric Audit) table v1_dna with keys to our data just packed into the RocksDB store.

That's how DNA version one table (i.e. v1_dna) looks like after packing e.g. table 'players' into the RocksDB store #1:

select * from v1_dna_1;

  
tab   | rev |        key       
---------+-----+-------------------
 players |   1 | 11522687676247617
 players |   1 | 11522687676247896
 players |   1 | 11522687676247929

....
....
....

Fourth, we clone this initial v1_dna to all other nodes, which is, we send its text pg_dump output and then restore with psql on each node.

Fifth, we mount NFS partition of our RocksDB store on all other nodes and then use our library on each of the nodes to create initial Cache-0 tables and fill them with data from the RocksDB store.

Now that we have identical v1_dna and Cache-0 on each of the participating nodes, we are ready to start the sync engine and then to start the web app.

2. PL/PGSQL library

This library contains a number of SQL functions for PG nodes to communicate with the sync engine and with the RocksDB store, e.g. here is a function that does packing data from the initial PG table to the RocksDB store:

CREATE OR REPLACE FUNCTION _w_v1_dna(text, text, int)
RETURNS integer AS $$
DECLARE
    result integer := 0;
BEGIN
EXECUTE format('truncate tmp');
EXECUTE format('insert into tmp select * from %s where %s = %s',$1,$2,$3);
EXECUTE format('insert into v1_dna_%s (tab, rev, key) select ''%s'',1,row_to_csv_rocks(%s,tmp) from tmp',$3,$1,$3);
EXECUTE format('select rocks_close()');
    return result;
END;$$ LANGUAGE plpgsql;


We have assumed here that the initial table has a column of type integer that serves to us as parameter for subdividing this table (see the second EXECUTE line in the code above); this column's name is the second and its filter value is the third parameter of _w_v1_dna(text,text,int).

We can send our initial data to different RocksDB stores based on value of this parameter. We will further assume that for all initial data this parameter is set to 1, so that we have to deal only with RocksDB store#1 and v1_dna_1. We also initialize our v1_dna with rev column set to 1 for all new rows. (See how this revision works later at 'Data rewind'.)

As soon as the RocksDB store is created and v1_dna filled with new keys, we are ready to proceed and unpack the data into the Cache-0 table (see the following code for that). The result mytable_c0_1 differs from the initial table mytable in an additional column key for keeping the RocksDB store key:

if $1 = 'mytable' then
EXECUTE format('create table if not exists %s_c0_%s with oids as select key, d.* 
from v1_dna_%s, rocks_csv_to_record(%s,v1_dna_%s.key)

-- we have to describe our fields here
d(field1 field1_type, ...,..., fieldN fieldN_type)
-- why we select only positive revisions from dna table here, see "Data rewind"
where v1_dna_%s.rev  > 0 and v1_dna_%s.tab = ''%s''',$1,$2,$2,$2,$2,$2,$2,$1); elsif $1 = 'myothertable'
...
...
end if;


The result Cache-0 table has got an important trigger for CUD events occurring on it:

EXECUTE format('CREATE TRIGGER %s_c0_%s_i
BEFORE INSERT OR UPDATE OR DELETE
ON %s_c0_%s
FOR EACH ROW
EXECUTE PROCEDURE _w_new_row(''%s'',%s);',$1,$2,$1,$2,$1,$2);


The procedure _w_new_row called from this trigger depending on type of event does the following (here is an example of its code for INSERT):

(...)

IF (TG_OP = 'INSERT') THEN
EXECUTE 'select rocks_get_node_number()' into v;
IF (v = floor(NEW.key/10000000000000000)) THEN
drop table if exists tmp;
CREATE TEMP TABLE tmp on commit drop as select NEW.*;
ALTER TABLE tmp drop column key;
EXECUTE format('select row_to_csv_rocks(%s,tmp) from tmp',TG_ARGV[1]) into v;
EXECUTE format('select rocks_close()');
EXECUTE format('select cast(%s as text)',v) into vc;
payload := (SELECT TG_ARGV[1] || ',' || json_build_object('tab',TG_ARGV[0],'rev',1,'key',vc) );
  perform pg_notify('v1_dna_insert', payload);
EXECUTE format('insert into v1_dna_%s (tab, rev, key) values(''%s'',1,%s)',TG_ARGV[1],TG_ARGV[0],v);
NEW.key = v;
END IF;
RETURN NEW;


(...)

In this code, we first check if the newly arrived row is originating from our node (i.e. from the web app), if it is really 'new' then it has to be packed to the RocksDB store. The sync engine will be notified of this new row via pg_notify with payload of a JSON object containing the new row's key, revision number (which  is '1' as it is for all new data) and the table name, i.e. all the data for INSERT into v1_dna. This notification's payload is processed by the sync engine that executes this INSERT on all other nodes.

If the first check failed, this row must have been added to our local Cache-0 by the sync engine and the data had already been packed into the RocksDB store from another originating node.

3. Sync engine

The sync engine is a dedicated TCP server that keeps connections to all the nodes. Once an INSERT or UPDATE event occurred on a node's Cache-0 table, the triggered procedure sends pg_notify to the sync engine with a payload containing new RocksDB record key, revision and table name. The sync engine then makes all other nodes write this information into their DNAs and then makes them fetch this single row from the RocksDB store into their Cache-0.

It is worth mentioning that UPDATE and INSERT both create a new record in the RocksDB store and in the v1_dna. The former data of the updated record is not removed from the RocksDB store, but its rev value in v1_dna changes to (-1)*rev, and the one corresponding to the updated record becomes (rev + 1).

The sync engine also handles DELETE events for all Cache-0 in sync with the originating node. However, even as some row gets deleted from the Cache-0 nothing at all changes in the RocksDB store. Only in v1_dna its rev value becomes '0' due to the fact the data row has been deleted from the Cache-0.

Handling the rev values this way, it is possible to dig all previous revisions of our data (see 'Data rewind').

In general, how the sync engine is supposed to work in terms of PostgreSQL replication schemes complies with the definition "middleware for synchronous multimaster replication". Moving the actually shared data away into external store and limiting the direct traffic between nodes only to v1_dna updates will reduce the communication overhead that is often regarded as characteristic of this scheme.

We have developed the code for the sync engine based on node.js. The code for this server is at https://github.com/kl3eo/PgRocks.

4. Data consistency

Making up such a hybrid architecture we have to be concerned with known issues arising in the multi-node environment - e.g. double-spending transactions, "read-modify-write" loops, etc. Whereas there are many ways in which PostgreSQL provides for its data consistency through MVCC and locking, we have to set up one of our own based upon locking.

First, we see that the UPDATE and DELETE events occurring on Cache-0 tables may pose problems for us if two or more nodes simultaneously draw this kind of events on a data row sharing the same key.

To avoid such problems we have to make sure that no actual writing to the RocksDB store is done by our "workhorse" function row_to_csv_rocks unless the originating node has notified the sync engine about a forthcoming UPDATE. Upon receiving this notification, the sync engine checks the nodes list and applies an appropriate locking on the rows of interest on all other nodes from the checklist. If no locking could be applied on a node, this node has to be dropped from the active nodes list. The sync engine sends back a confirmation of locking to the originating node and only then the trigger function on a Cache-0 table will proceed and write a new row to the RocksDB store, perform pg_notify, and so on (see example trigger code in 'PL/PGSQL library').

A similar mechanism works when the DELETE event occurs, although in this case there is no writing to the RocksDB store, only locking the Cache-0 record (or may be even the whole table) while its v1_dna is being synchronized and Cache-0 table is updated.

5. Data rewind

Having the keys constructed from the epoch time at the moment of packing, now it is easy to see how we can bring back the data into the Cache-0 table exactly as it was at a certain moment in past. Namely, for that we will have another column added to our DNA table named "ancestor" of type bigint, in which we hold the keys of the "time zero" records of our Cache-0 table, so that each updated row remembers the key of its ancestor with rev=1. This modified version of DNA table has now four columns and we call it v2_dna.

The mechanism for data rewind is following: the revision of the current row with data is incremented by 1 each time the row was updated, while the previous revision changes its sign. All rows at the beginning have revision "1" in the DNA table, one UPDATE on a row in Cache-0 table will result in a new row with a new key in DNA table, and the rev value is "2" for it, while the record in the DNA that previously referred to the modified row of the Cache-0 will change its rev to (-1). When the record is deleted from the Cache-0 table, its rev in the DNA table becomes "0".

The following psql output illustrates the process of data revision change occurring in the DNA table.

We have our data in table 'players' and proceed with the filter 'club_id = 5' to create a Cache-0 table 'players_c0_5':

test=# select * from players where club_id = 5;
      name      |    aka    |    dob     | weight | height |      last_seen      | club_id
----------------+-----------+------------+--------+--------+---------------------+---------

 Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00 |       5
 Mike Bryan     | Joker     | 1984-08-21 |     80 |    180 | 2018-04-08 06:25:00 |       5
 Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00 |       5
 Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00 |       5
 Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00 |       5
(5 rows)


Initialize v2_dna:


test=# select _i_v2_dna('players','club_id',4,6);
 _i_v2_dna
-----------
         1
(1 row)


Pack the data into the RocksDB store#5 and scan it:

test=# select _wc_v2_dna('players','club_id',4,6);
NOTICE:  Executing _w_v2_dna('players','club_id',5):
 _wc_v2_dna
------------
          0
(1 row)


test=# select *,'0x' || LPAD(to_hex(key), 16, '0') as hex_key from v2_dna_5;
   tab   | rev |        key        |     ancestor      |      hex_key      
---------+-----+-------------------+-------------------+--------------------
 players |   1 | 11523440602279181 | 11523440602279181 | 0x0028f08217b06d0d
 players |   1 | 11523440602279198 | 11523440602279198 | 0x0028f08217b06d1e
 players |   1 | 11523440602279215 | 11523440602279215 | 0x0028f08217b06d2f
 players |   1 | 11523440602279232 | 11523440602279232 | 0x0028f08217b06d40
 players |   1 | 11523440602279249 | 11523440602279249 | 0x0028f08217b06d51
(5 rows)


postgres@quercus:~$ ldb --db=/tmp/rocksdb_5 scan --key_hex
0x0028F08217B06D0D : Petra Che|Baby|"1989-05-30"|62.3|180|565701600000000|5|
0x0028F08217B06D1E : Mike Bryan|Joker|"1984-08-21"|80|180|576483900000000|5|
0x0028F08217B06D2F : Alicia Silver|Checkmate|"1995-09-02"|57.8|168|548931900000000|5|
0x0028F08217B06D40 : Fernan Ozy|Beast|"1967-12-12"|92.7|177|567965400000000|5|
0x0028F08217B06D51 : Ivan Lebed|Ruso|"1959-01-10"|77.4|180|575735400000000|5|



Unpack the data from the RocksDB store#5 to Cache-0 table:

test=# select _p_c0('players',4,6,1);
NOTICE:  Executing _checkout_c0('players',5,1):
NOTICE:  table "players_c0_5" does not exist, skipping
 _p_c0
-------
     0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279198 | Mike Bryan     | Joker     | 1984-08-21 |     80 |    180 | 2018-04-08 06:25:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
(5 rows)



Update a record:

test=# update players_c0_5 set weight=80.2 where name = 'Mike Bryan';
NOTICE:  table "tmp" does not exist, skipping
UPDATE 1
test=# select * from v2_dna_5;
   tab   | rev |        key        |     ancestor     
---------+-----+-------------------+-------------------
 players |   1 | 11523440602279181 | 11523440602279181
 players |   1 | 11523440602279215 | 11523440602279215
 players |   1 | 11523440602279232 | 11523440602279232
 players |   1 | 11523440602279249 | 11523440602279249
 players |   2 | 11523440812685302 | 11523440602279198
 players |  -1 | 11523440602279198 | 11523440602279198
(6 rows)



Update the same record again:

test=# update players_c0_5 set height=181 where name = 'Mike Bryan';
NOTICE:  table "tmp" does not exist, skipping
UPDATE 1
test=# select * from v2_dna_5;
   tab   | rev |        key        |     ancestor     
---------+-----+-------------------+-------------------
 players |   1 | 11523440602279181 | 11523440602279181
 players |   1 | 11523440602279215 | 11523440602279215
 players |   1 | 11523440602279232 | 11523440602279232
 players |   1 | 11523440602279249 | 11523440602279249
 players |  -1 | 11523440602279198 | 11523440602279198
 players |   3 | 11523440850187058 | 11523440602279198
 players |  -2 | 11523440812685302 | 11523440602279198
(7 rows)


Delete this record and scan the RocksDB store:

test=# delete from players_c0_5 where name = 'Mike Bryan';
DELETE 1
 

test=# select *,'0x' || LPAD(to_hex(key), 16, '0') as hex_key from v2_dna_5;
   tab   | rev |        key        |     ancestor      |      hex_key     
---------+-----+-------------------+-------------------+--------------------
 players |   1 | 11523440602279181 | 11523440602279181 | 0x0028f08217b06d0d
 players |   1 | 11523440602279215 | 11523440602279215 | 0x0028f08217b06d2f
 players |   1 | 11523440602279232 | 11523440602279232 | 0x0028f08217b06d40
 players |   1 | 11523440602279249 | 11523440602279249 | 0x0028f08217b06d51
 players |  -1 | 11523440602279198 | 11523440602279198 | 0x0028f08217b06d1e
 players |  -2 | 11523440812685302 | 11523440602279198 | 0x0028f082243af7f6
 players |   0 | 11523440887503450 | 11523440602279198 | 0x0028f08228b09a5a
 players |  -3 | 11523440850187058 | 11523440602279198 | 0x0028f08226773332
(8 rows)



postgres@quercus:~$ ldb --db=/tmp/rocksdb_5 scan --key_hex
0x0028F08217B06D0D : Petra Che|Baby|"1989-05-30"|62.3|180|565701600000000|5|
0x0028F08217B06D1E : Mike Bryan|Joker|"1984-08-21"|80|180|576483900000000|5|
0x0028F08217B06D2F : Alicia Silver|Checkmate|"1995-09-02"|57.8|168|548931900000000|5|
0x0028F08217B06D40 : Fernan Ozy|Beast|"1967-12-12"|92.7|177|567965400000000|5|
0x0028F08217B06D51 : Ivan Lebed|Ruso|"1959-01-10"|77.4|180|575735400000000|5|
0x0028F082243AF7F6 : Mike Bryan|Joker|"1984-08-21"|80.2|180|576483900000000|
0x0028F08226773332 : Mike Bryan|Joker|"1984-08-21"|80.2|181|576483900000000|
 


Now, to accomplish our task of rewinding Cache-0 data to a certain moment in past all we have to do is select keys that satisfy to the given clause and feed them to our rocks_csv_to_record function:

CREATE OR REPLACE FUNCTION _rewind_c0(text,int,timestamptz)
RETURNS integer AS $$
BEGIN
if ($1 = 'players') then
else
RETURN -1;
end if;
EXECUTE format('drop table if exists %s_c0_%s',$1,$2);
EXECUTE format('create temp table tmp on commit drop as select max(abs(rev)) as max, min(abs(rev)) as min, ancestor from v2_dna_%s where right(key::text,16)::bigint < EXTRACT(EPOCH FROM timestamptz ''%s'')*1000000 and tab = ''%s'' group by ancestor',$2,$3,$1);
if $1 = 'players' then
EXECUTE format('create unlogged table if not exists %s_c0_%s with oids as select key, d.* 
from v2_dna_%s, tmp, rocks_csv_to_record(%s,v2_dna_%s.key)
d(name text, aka text, dob date, weight float, height int, last_seen timestamp
) where abs(v2_dna_%s.rev) = tmp.max and tmp.min != 0 and v2_dna_%s.ancestor = tmp.ancestor',$1,$2,$2,$2,$2,$2,$2);
end if;
EXECUTE format('select rocks_close()');
EXECUTE format('CREATE TRIGGER %s_c0_%s_i
BEFORE INSERT OR UPDATE OR DELETE
ON %s_c0_%s
FOR EACH ROW
EXECUTE PROCEDURE _w_new_row(''%s'',%s);',$1,$2,$1,$2,$1,$2);
RETURN 0;
END;$$ LANGUAGE plpgsql;


Now run this function four times with 30 sec interval around the time we modified and then deleted our record:

test=# select _rewind_c0('players',5,'2018-04-11 13:00:00');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279198 | Mike Bryan     | Joker     | 1984-08-21 |     80 |    180 | 2018-04-08 06:25:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
(5 rows)


test=# select _rewind_c0('players',5,'2018-04-11 13:00:30');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
 11523440812685302 | Mike Bryan     | Joker     | 1984-08-21 |   80.2 |    180 | 2018-04-08 06:25:00
(5 rows)

test=# select _rewind_c0('players',5,'2018-04-11 13:01');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
 11523440850187058 | Mike Bryan     | Joker     | 1984-08-21 |   80.2 |    181 | 2018-04-08 06:25:00
(5 rows)

test=# select _rewind_c0('players',5,'2018-04-11 13:01:30');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
(4 rows)



It is clear that the correct working of the described algorithm depends on the consistency of the time measure and uniqueness of the keys assigned while packing the data into the RocksDB store. We have to either synchronize the clocks of all nodes better than 1 microsecond, or make all nodes receive the time from the same clock. The latter solution works fine with NTP time converted to the epoch time on each node. As for the uniqueness of the keys, a simple check in the code of PgRocks is enough to make sure that no two subsequent records are assigned the same key while packing data into the RocksDB store even if it comes out so fast that some rows are written within 1 ms of each other.

Conclusion
In this article we have shown how a hybrid SQL-NoSQL architecture is implemented in a new plugin developed by our team within the last half a year. The code of PgRocks is released under GNU GPL license and can be found at the developers site at https://github.com/kl3eo/PgRocks.

There is a simple live demo at the site http://pgrocks.com that shows how two PostgreSQL nodes work in sync following the methods described in this article.




вторник, 3 апреля 2018 г.

How To Set Up PostgreSQL With Plugin To RocksDB On Linux

Introduction

PostgreSQL is a well-known SQL database management system.

RocksDB is a persistent key-value store and C/C++ library.

PgRocks is a new plugin that uses DNA-like PostgreSQL data structure to link and easily migrate data between a PostgreSQL database and a RocksDB store.

In this tutorial, you'll install and configure PgRocks, and by doing so you will set up a data bridge between PostgreSQL and RocksDB.

This will require modifications in source code of both PostgreSQL and RocksDB, their compilation and installation, or re-installation, if they are already on the system.

In the end of this guide you will see examples of how the plugin works.

When you're finished, you'll basically understand how to flow data from PostgreSQL to RocksDB and back with the new plugin.


Prerequisites

Before you begin this guide you'll need the following:

  • - a typical modern Linux system, like CentOS 7, Ubuntu 16, or Debian 9;
  • - a root access (or non-root user with sudo privileges) on this system;
  • - git and optionally wget installed on the system;
  • - gcc compiler version 4.8 and later, also make.


Step 1 - Preparing all necessary source code trees

First, download the source code for PostgreSQL. Unpack the source code tree into a new work directory, e.g. work_pgrocks:

mkdir work_pgrocks
cd work_pgrocks
wget https://ftp.postgresql.org/pub/source/v10.3/postgresql-10.3.tar.gz
tar zxvf postgresql-10.3.tar.gz


In the same work directory (i.e. work_pgrocks), get the code for RocksDB and PgRocks with git:


git clone https://github.com/facebook/rocksdb
git clone https://github.com/kl3eo/PgRocks


You have three new catalogs in your work directory, postgresql-10.3, rocksdb and PgRocks.

Further in Steps 2 and 3 you will modify the RocksDB and PostgreSQL with files from the PgRocks tree.


Step 2 - Patching the RocksDB tree, then compiling and installing the new RocksDB libraries

First, locate the file with the code for patching RocksDB. It is the file named rocksdb_get2.patch at the top level of PgRocks catalog.

Apply this patch:

cd rocksdb
git apply ../PgRocks/rocksdb_get2.patch


Before you proceed with compilation of the new RocksDB libraries, check that all the libs required for RocksDB are installed on the system. Methods of their installation depending on the system are explained at https://github.com/facebook/rocksdb/blob/master/INSTALL.md

For example, to install the required libs on Debian or Ubuntu, run the following command as root (or with sudo prefix if run from a user account):

apt-get install libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev libnuma-dev

Now you're ready to compile and install (or possibly re-install) RocksDB. Go ahead with it:

make static_lib
make shared_lib


Everything fine, go ahead and install (as root):

make install

Optionally, you may compile and install a useful tool ldb for working with RocksDB store. Go to the tools and run these commands:

g++ -std=gnu++11 ldb.cc -o ldb -lrocksdb
cp -a ldb /usr/local/bin

cd ../..

Now you're ready to proceed to Step 3 and modify the code in PostgreSQL tree, then compile and install it.

Step 3 - Modifying the code in PostgreSQL tree, compiling and installing it

Run the following commands to overwrite the source files in PostgreSQL tree with newer files of the same name from PgRocks/src catalog:

cd postgresql-10.3
cp -a ../PgRocks/src/* src/


Then configure the PostgreSQL source code:

./configure

This command creates a file named Makefile.global in the src catalog. You will have to open it in the editor and modify it, manually replacing only one line, the one that starts with "LIBS". Replace it with another line you will find in the file named compile at the top level of PgRocks source tree.

These lines only differ in libs required by gcc to compile the code.

If in your src/Makefile.global there is the line:
 

LIBS = -lpthread -lz -lreadline -lrt -lcrypt -ldl -lm

than you have to replace it with this line:

LIBS = -lpthread -lz -lreadline -lrt -lcrypt -ldl -lm -lsnappy -lgflags -lbz2 -llz4 -lzstd -lnuma -lrocksdb

After editing the src/Makefile.global, you are ready to compile PostgreSQL.

Go on and do it:

make

Now that the compilation is complete and you're ready to install, please make sure that no older version of PostgreSQL exists on your system.

If another version of PostgreSQL had been automatically installed (and probably never used), remove it now, e.g. on Debian and Ubuntu run the following command as root:

apt remove PostgreSQL-10


Now as root run the following command from the top level of PostgreSQL tree:

make install

Make sure the newly installed libs will be found by the linker:

ldconfig


Only if you're installing PostgreSQL for the very first time, then run the following commands, also as root:

adduser postgres
mkdir /usr/local/pgsql/data
chown postgres /usr/local/pgsql/data
su - postgres
/usr/local/pgsql/bin/initdb -D /usr/local/pgsql/data


Start the new PostgreSQL daemon as user postgres to see if everything is fine:

su - postgres
/usr/local/pgsql/bin/postgres -D /usr/local/pgsql/data >logfile 2>&1 &
/usr/local/pgsql/bin/psql template1


If you are able to connect to the daemon with this command, then you see this prompt:

template1=#

You're ready to make new functions visible by PostgreSQL at the next Step 4.


Step 4 - Making new internal functions visible by PostgreSQL

Open the file named pg_catalog.pg_proc.txt found at the top level of PgRocks catalog in the editor, select, copy and then paste everything into the prompt that you have opened at the previous Step. This will result in many lines of output like this:

(...)

drop table if exists jar;
DROP TABLE
delete from pg_catalog.pg_proc where proname='rocks_get_node_number';
DELETE 0
create table jar as select * from pg_catalog.pg_proc where oid = 89;
SELECT 1


(...)

If you have made mistakes while copy-pasting, repeat the whole Step 4 from beginning to end, copy and paste ALL the lines from the pg_catalog.pg_proc.txt into the prompt.

This is it, and now you are ready to start using PgRocks.


Step 5 - Creating a test suit for the plugin and running examples

In this Step you will see how the plugin works.

You will only use three of new PostgreSQL/PgRocks functions, one for writing data to RocksDB, one for reading from it, and one for closing the open connection.

-- First, create a test database as user postgres.

su - postgres
/usr/local/pgsql/bin/createdb test


-- Second, create there a table named players and populate it with test data:

/usr/local/pgsql/bin/psql test

create table players(name text, aka text, dob date, weight float, height int, last_seen timestamp); 


CREATE TABLE

insert into players values ('Peter Stevens','Boss', '1956-06-30', 85.4,169,'2017-10-25 17:30');
insert into players values ('Mike Palmer','Hippy', '1988-12-06', 75.5,184,'2018-02-28 14:20');
insert into players values ('Dorothy Brown','Miss D', '1992-07-12', 64.3,172,'2018-03-15 09:25');
insert into players values ('Linda Chambers','Jamaica', '1987-03-10', 57.7,180,'2018-04-01 14:00');
insert into players values ('Claude Moulin','French', '1944-10-17', 68.1,170,'2017-09-15 12:30');


select * from players;      

name      |   aka   |    dob     | weight | height |      last_seen     
----------------+---------+------------+--------+--------+---------------------
 Peter Stevens  | Boss    | 1956-06-30 |   85.4 |    169 | 2017-10-25 17:30:00
 Mike Palmer    | Hippy   | 1988-12-06 |   75.5 |    184 | 2018-02-28 14:20:00
 Dorothy Brown  | Miss D  | 1992-07-12 |   64.3 |    172 | 2018-03-15 09:25:00
 Linda Chambers | Jamaica | 1987-03-10 |   57.7 |    180 | 2018-04-01 14:00:00
 Claude Moulin  | French  | 1944-10-17 |   68.1 |    170 | 2017-09-15 12:30:00

(5 rows)
 


-- Third, create another table named venues and also populate it with test data:

create table venues (address text, capacity int);
insert into venues values ('Baker St. 221b', 330);
insert into venues values ('Downing St. 10', 50);
insert into venues values ('Marylebone Rd', 1000);


-- Fourth, create an empty "DNA" table v1_dna_1 (v1 stands for "version one") for RocksDB "store #1";

create table v1_dna_1 (tbl text, rev int, key bigint);

-- Fifth, pack the data from the two test table into a new RocksDB store located at /tmp/rocksdb_1 on your file system - i.e. to store#1.

For this you will use the new internal PostgreSQL/PgRocks function row_to_json_rocks, which creates a new store #1, if there is none,
and writes all rows from the given PG table into this new store as JSON data.

insert into v1_dna_1 (tbl, rev, key) select 'players', 1, row_to_json_rocks(1,players) from players;
insert into v1_dna_1 (tbl, rev, key) select 'venues', 1, row_to_json_rocks(1,venues) from venues;
select rocks_close();


Make sure to close the RocksDB connection each time after write operations calling function rocks_close.

N.B. You have inserted "1" into rev column of DNA-table. By doing this you assume that all the rows are of revision one. You won't need the data revisions yet.

The function row_to_json_rocks just used takes two arguments, (1) the number of the store and (2) the name of the table.

It returns the key of type bigint where the first digit is the number of the node (1 by default), and the other digits are epoch time in microseconds, measured at the moment of packing the record into the store.

These keys are guaranteed to be unique and as such they are assigned to the values in the RocksDB store.

You will keep these keys in the column key of the DNA table:

select * from v1_dna_1;

  
tbl   | rev |        key       
---------+-----+-------------------
 players |   1 | 11522687676247617
 players |   1 | 11522687676247896
 players |   1 | 11522687676247929
 players |   1 | 11522687676247951
 players |   1 | 11522687676247972
 venues  |   1 | 11522687676275189
 venues  |   1 | 11522687676275254
 venues  |   1 | 11522687676275269
 

(8 rows)

-- Sixth, drop the tables players and venues  - since you keep all the data in RocksDB, you don't need the original tables any more.


drop table players;
drop table venues;



-- Seventh, take a look at our data how it is stored in RocksDB.

You will need the bigint keys printed as hexadecimal for searching with ldb:

select '0x' || LPAD(to_hex(key), 16, '0') as hex_key from v1_dna_1;      

hex_key      
--------------------
 0x0028efd2c9ccee41
 0x0028efd2c9ccef58
 0x0028efd2c9ccef79
 0x0028efd2c9ccef8f
 0x0028efd2c9ccefa4
 0x0028efd2c9cd59f5
 0x0028efd2c9cd5a36
 0x0028efd2c9cd5a45
(8 rows)


Quit the psql shell and look at our data packed as JSON in our RocksDB store:

ldb --db=/tmp/rocksdb_1 scan --key_hex


0x0028EFD2C9CCEE41 : {"name":"Peter Stevens","aka":"Boss","dob":"1956-06-30","weight":85.4,"height":169,"last_seen":"2017-10-25T17:30:00"}
0x0028EFD2C9CCEF58 : {"name":"Mike Palmer","aka":"Hippy","dob":"1988-12-06","weight":75.5,"height":184,"last_seen":"2018-02-28T14:20:00"}
0x0028EFD2C9CCEF79 : {"name":"Dorothy Brown","aka":"Miss D","dob":"1992-07-12","weight":64.3,"height":172,"last_seen":"2018-03-15T09:25:00"}
0x0028EFD2C9CCEF8F : {"name":"Linda Chambers","aka":"Jamaica","dob":"1987-03-10","weight":57.7,"height":180,"last_seen":"2018-04-01T14:00:00"}
0x0028EFD2C9CCEFA4 : {"name":"Claude Moulin","aka":"French","dob":"1944-10-17","weight":68.1,"height":170,"last_seen":"2017-09-15T12:30:00"}
0x0028EFD2C9CD59F5 : {"address":"Baker St. 221b","capacity":330}
0x0028EFD2C9CD5A36 : {"address":"Downing St. 10","capacity":50}
0x0028EFD2C9CD5A45 : {"address":"Marylebone Rd","capacity":1000}



You may now search the data in this store by the key:

ldb --db=/tmp/rocksdb_1 get 0x0028EFD2C9CCEF8F --key_hex 

{"name":"Linda Chambers","aka":"Jamaica","dob":"1987-03-10","weight":57.7,"height":180,"last_seen":"2018-04-01T14:00:00"}

-- Eighth, get back the data into table players dropped earlier, - suppose you need it again.

You will use the counterpart function rocks_json_to_record, which also takes two arguments - (1) number of the store and (2) the key, and returns the stored record:

create table players as select d.* from v1_dna_1, rocks_json_to_record(1,v1_dna_1.key) d(name text, aka text, dob date, weight float, height int, last_seen timestamp) where v1_dna_1.tbl = 'players';
 

SELECT 5

select * from players;
      


name      |   aka   |    dob     | weight | height |      last_seen     
----------------+---------+------------+--------+--------+---------------------
 Peter Stevens  | Boss    | 1956-06-30 |   85.4 |    169 | 2017-10-25 17:30:00
 Mike Palmer    | Hippy   | 1988-12-06 |   75.5 |    184 | 2018-02-28 14:20:00
 Dorothy Brown  | Miss D  | 1992-07-12 |   64.3 |    172 | 2018-03-15 09:25:00
 Linda Chambers | Jamaica | 1987-03-10 |   57.7 |    180 | 2018-04-01 14:00:00
 Claude Moulin  | French  | 1944-10-17 |   68.1 |    170 | 2017-09-15 12:30:00
 

(5 rows)

-- Ninth, create a temporary table in PostgreSQL from the RocksDB store, do calculations and drop it on commit, such as:

CREATE OR REPLACE FUNCTION sql_main_sum_field(int,text,text)
RETURNS float AS $$
DECLARE
    result float := 0;
BEGIN
    EXECUTE format('create temp table temp_table on commit drop as select d.* from v1_dna_%s, rocks_json_to_record(%s,v1_dna_%s.key) d(%s text) where v1_dna_%s.tbl = ''%s'' ',$1,$1,$1,$3,$1,$2);
    EXECUTE format('select sum(%s::float) from temp_table',$3) into result;
    return result;

END;$$ LANGUAGE plpgsql;


select sql_main_sum_field(1,'venues','capacity') as total;
 total
-------
  1380
(1 row)



Conclusion

In this tutorial, you've explored PgRocks which is a plugin to flow data between PostgreSQL and RocksDB.

You've also seen how it is possible to enhance the PostgreSQL to create new internal functions for migrating its native formats data to external storage.

You may further explore PgRocks and take a look at a live demo you'll find at http://pgrocks.com about how PgRocks can be used for syncing PostgreSQL nodes over the network.