суббота, 7 апреля 2018 г.

Using PgRocks to sync and rewind data on PostgreSQL nodes

Introduction

The following article discusses the usage of PgRocks on hosts co-running the same web app (i.e. nodes) with a goal to provide a solution for the task of syncing data between PostgreSQL tables on these nodes.

The software we need on them is built up for a web application running on Apache and making connection to a PostgreSQL database. We will assume a copy of our own corporate web application is running on each node.

The data used by our app is such that there are local tables as well as 'global' tables, while only this latter data must be entirely shared by all nodes.

So we are going to deal with this situation by syncing only these tables to all of our PG nodes, while the rest of the tables contain locally significant data and are not to be in sync between the nodes. This approach is further discussed in this article.

Here is described how to setup PgRocks on a single Linux machine. We will compile and install PostgreSQL and RocksDB with PgRocks on each of the nodes according to that guide.

Then we are concerned with a choice of network layer implementation because we have to share the bottom layer of data between our nodes. The bottom level of data is supposed to reside in a RocksDB store that is accessible from all the nodes for read and write. While there are several solutions how to setup such RocksDB store, for simplicity in this article we suppose a network file system (NFS) is the setup sufficient for demonstration of key principles of our approach (see Fig. 1).

Figure 1. The network layer implementation scheme.

Also for simplicity further in this article when we show examples we will limit to only two nodes.

1. Preparing initial Cache-0

We will refer as Cache-0 to all those tables that have to be identical on all nodes, so that our goal is to keep Cache-0 tables in the state of synchronization with each other between the nodes.

We assume that Cache-0 tables will normally be accessed by the users of our application through their web interface. The access to them by superusers via console must not interfere with the web application.

Before our web application is launched on the nodes, everything has to get prepared on each node.

First, we assign numbers to each PostgreSQL node through postgresql.conf new parameter node_number, namely:

node_number = 1

and

node_number = 2
...

and so on, on all nodes, respectively.

Second, we prepare a PL/PGSQL library that contains functions we need for communication between PostgreSQL nodes, the sync engine (see later) and RocksDB store. We will discuss the details of these functions later in this article. Most important for us now, this library contains functions for creating RocksDB store, packing initial data to RocksDB store and filling the Cache-0 from the RocksDB store. We have to deploy this library on all our participating nodes.

Third, only on one of our nodes we execute functions from the library in order to create a new RocksDB store on a mounted NFS partition, pack our initial data into it and simultaneously create the initial DNA (Data Numeric Audit) table v1_dna with keys to our data just packed into the RocksDB store.

That's how DNA version one table (i.e. v1_dna) looks like after packing e.g. table 'players' into the RocksDB store #1:

select * from v1_dna_1;

  
tab   | rev |        key       
---------+-----+-------------------
 players |   1 | 11522687676247617
 players |   1 | 11522687676247896
 players |   1 | 11522687676247929

....
....
....

Fourth, we clone this initial v1_dna to all other nodes, which is, we send its text pg_dump output and then restore with psql on each node.

Fifth, we mount NFS partition of our RocksDB store on all other nodes and then use our library on each of the nodes to create initial Cache-0 tables and fill them with data from the RocksDB store.

Now that we have identical v1_dna and Cache-0 on each of the participating nodes, we are ready to start the sync engine and then to start the web app.

2. PL/PGSQL library

This library contains a number of SQL functions for PG nodes to communicate with the sync engine and with the RocksDB store, e.g. here is a function that does packing data from the initial PG table to the RocksDB store:

CREATE OR REPLACE FUNCTION _w_v1_dna(text, text, int)
RETURNS integer AS $$
DECLARE
    result integer := 0;
BEGIN
EXECUTE format('truncate tmp');
EXECUTE format('insert into tmp select * from %s where %s = %s',$1,$2,$3);
EXECUTE format('insert into v1_dna_%s (tab, rev, key) select ''%s'',1,row_to_csv_rocks(%s,tmp) from tmp',$3,$1,$3);
EXECUTE format('select rocks_close()');
    return result;
END;$$ LANGUAGE plpgsql;


We have assumed here that the initial table has a column of type integer that serves to us as parameter for subdividing this table (see the second EXECUTE line in the code above); this column's name is the second and its filter value is the third parameter of _w_v1_dna(text,text,int).

We can send our initial data to different RocksDB stores based on value of this parameter. We will further assume that for all initial data this parameter is set to 1, so that we have to deal only with RocksDB store#1 and v1_dna_1. We also initialize our v1_dna with rev column set to 1 for all new rows. (See how this revision works later at 'Data rewind'.)

As soon as the RocksDB store is created and v1_dna filled with new keys, we are ready to proceed and unpack the data into the Cache-0 table (see the following code for that). The result mytable_c0_1 differs from the initial table mytable in an additional column key for keeping the RocksDB store key:

if $1 = 'mytable' then
EXECUTE format('create table if not exists %s_c0_%s with oids as select key, d.* 
from v1_dna_%s, rocks_csv_to_record(%s,v1_dna_%s.key)

-- we have to describe our fields here
d(field1 field1_type, ...,..., fieldN fieldN_type)
-- why we select only positive revisions from dna table here, see "Data rewind"
where v1_dna_%s.rev  > 0 and v1_dna_%s.tab = ''%s''',$1,$2,$2,$2,$2,$2,$2,$1); elsif $1 = 'myothertable'
...
...
end if;


The result Cache-0 table has got an important trigger for CUD events occurring on it:

EXECUTE format('CREATE TRIGGER %s_c0_%s_i
BEFORE INSERT OR UPDATE OR DELETE
ON %s_c0_%s
FOR EACH ROW
EXECUTE PROCEDURE _w_new_row(''%s'',%s);',$1,$2,$1,$2,$1,$2);


The procedure _w_new_row called from this trigger depending on type of event does the following (here is an example of its code for INSERT):

(...)

IF (TG_OP = 'INSERT') THEN
EXECUTE 'select rocks_get_node_number()' into v;
IF (v = floor(NEW.key/10000000000000000)) THEN
drop table if exists tmp;
CREATE TEMP TABLE tmp on commit drop as select NEW.*;
ALTER TABLE tmp drop column key;
EXECUTE format('select row_to_csv_rocks(%s,tmp) from tmp',TG_ARGV[1]) into v;
EXECUTE format('select rocks_close()');
EXECUTE format('select cast(%s as text)',v) into vc;
payload := (SELECT TG_ARGV[1] || ',' || json_build_object('tab',TG_ARGV[0],'rev',1,'key',vc) );
  perform pg_notify('v1_dna_insert', payload);
EXECUTE format('insert into v1_dna_%s (tab, rev, key) values(''%s'',1,%s)',TG_ARGV[1],TG_ARGV[0],v);
NEW.key = v;
END IF;
RETURN NEW;


(...)

In this code, we first check if the newly arrived row is originating from our node (i.e. from the web app), if it is really 'new' then it has to be packed to the RocksDB store. The sync engine will be notified of this new row via pg_notify with payload of a JSON object containing the new row's key, revision number (which  is '1' as it is for all new data) and the table name, i.e. all the data for INSERT into v1_dna. This notification's payload is processed by the sync engine that executes this INSERT on all other nodes.

If the first check failed, this row must have been added to our local Cache-0 by the sync engine and the data had already been packed into the RocksDB store from another originating node.

3. Sync engine

The sync engine is a dedicated TCP server that keeps connections to all the nodes. Once an INSERT or UPDATE event occurred on a node's Cache-0 table, the triggered procedure sends pg_notify to the sync engine with a payload containing new RocksDB record key, revision and table name. The sync engine then makes all other nodes write this information into their DNAs and then makes them fetch this single row from the RocksDB store into their Cache-0.

It is worth mentioning that UPDATE and INSERT both create a new record in the RocksDB store and in the v1_dna. The former data of the updated record is not removed from the RocksDB store, but its rev value in v1_dna changes to (-1)*rev, and the one corresponding to the updated record becomes (rev + 1).

The sync engine also handles DELETE events for all Cache-0 in sync with the originating node. However, even as some row gets deleted from the Cache-0 nothing at all changes in the RocksDB store. Only in v1_dna its rev value becomes '0' due to the fact the data row has been deleted from the Cache-0.

Handling the rev values this way, it is possible to dig all previous revisions of our data (see 'Data rewind').

In general, how the sync engine is supposed to work in terms of PostgreSQL replication schemes complies with the definition "middleware for synchronous multimaster replication". Moving the actually shared data away into external store and limiting the direct traffic between nodes only to v1_dna updates will reduce the communication overhead that is often regarded as characteristic of this scheme.

We have developed the code for the sync engine based on node.js. The code for this server is at https://github.com/kl3eo/PgRocks.

4. Data consistency

Making up such a hybrid architecture we have to be concerned with known issues arising in the multi-node environment - e.g. double-spending transactions, "read-modify-write" loops, etc. Whereas there are many ways in which PostgreSQL provides for its data consistency through MVCC and locking, we have to set up one of our own based upon locking.

First, we see that the UPDATE and DELETE events occurring on Cache-0 tables may pose problems for us if two or more nodes simultaneously draw this kind of events on a data row sharing the same key.

To avoid such problems we have to make sure that no actual writing to the RocksDB store is done by our "workhorse" function row_to_csv_rocks unless the originating node has notified the sync engine about a forthcoming UPDATE. Upon receiving this notification, the sync engine checks the nodes list and applies an appropriate locking on the rows of interest on all other nodes from the checklist. If no locking could be applied on a node, this node has to be dropped from the active nodes list. The sync engine sends back a confirmation of locking to the originating node and only then the trigger function on a Cache-0 table will proceed and write a new row to the RocksDB store, perform pg_notify, and so on (see example trigger code in 'PL/PGSQL library').

A similar mechanism works when the DELETE event occurs, although in this case there is no writing to the RocksDB store, only locking the Cache-0 record (or may be even the whole table) while its v1_dna is being synchronized and Cache-0 table is updated.

5. Data rewind

Having the keys constructed from the epoch time at the moment of packing, now it is easy to see how we can bring back the data into the Cache-0 table exactly as it was at a certain moment in past. Namely, for that we will have another column added to our DNA table named "ancestor" of type bigint, in which we hold the keys of the "time zero" records of our Cache-0 table, so that each updated row remembers the key of its ancestor with rev=1. This modified version of DNA table has now four columns and we call it v2_dna.

The mechanism for data rewind is following: the revision of the current row with data is incremented by 1 each time the row was updated, while the previous revision changes its sign. All rows at the beginning have revision "1" in the DNA table, one UPDATE on a row in Cache-0 table will result in a new row with a new key in DNA table, and the rev value is "2" for it, while the record in the DNA that previously referred to the modified row of the Cache-0 will change its rev to (-1). When the record is deleted from the Cache-0 table, its rev in the DNA table becomes "0".

The following psql output illustrates the process of data revision change occurring in the DNA table.

We have our data in table 'players' and proceed with the filter 'club_id = 5' to create a Cache-0 table 'players_c0_5':

test=# select * from players where club_id = 5;
      name      |    aka    |    dob     | weight | height |      last_seen      | club_id
----------------+-----------+------------+--------+--------+---------------------+---------

 Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00 |       5
 Mike Bryan     | Joker     | 1984-08-21 |     80 |    180 | 2018-04-08 06:25:00 |       5
 Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00 |       5
 Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00 |       5
 Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00 |       5
(5 rows)


Initialize v2_dna:


test=# select _i_v2_dna('players','club_id',4,6);
 _i_v2_dna
-----------
         1
(1 row)


Pack the data into the RocksDB store#5 and scan it:

test=# select _wc_v2_dna('players','club_id',4,6);
NOTICE:  Executing _w_v2_dna('players','club_id',5):
 _wc_v2_dna
------------
          0
(1 row)


test=# select *,'0x' || LPAD(to_hex(key), 16, '0') as hex_key from v2_dna_5;
   tab   | rev |        key        |     ancestor      |      hex_key      
---------+-----+-------------------+-------------------+--------------------
 players |   1 | 11523440602279181 | 11523440602279181 | 0x0028f08217b06d0d
 players |   1 | 11523440602279198 | 11523440602279198 | 0x0028f08217b06d1e
 players |   1 | 11523440602279215 | 11523440602279215 | 0x0028f08217b06d2f
 players |   1 | 11523440602279232 | 11523440602279232 | 0x0028f08217b06d40
 players |   1 | 11523440602279249 | 11523440602279249 | 0x0028f08217b06d51
(5 rows)


postgres@quercus:~$ ldb --db=/tmp/rocksdb_5 scan --key_hex
0x0028F08217B06D0D : Petra Che|Baby|"1989-05-30"|62.3|180|565701600000000|5|
0x0028F08217B06D1E : Mike Bryan|Joker|"1984-08-21"|80|180|576483900000000|5|
0x0028F08217B06D2F : Alicia Silver|Checkmate|"1995-09-02"|57.8|168|548931900000000|5|
0x0028F08217B06D40 : Fernan Ozy|Beast|"1967-12-12"|92.7|177|567965400000000|5|
0x0028F08217B06D51 : Ivan Lebed|Ruso|"1959-01-10"|77.4|180|575735400000000|5|



Unpack the data from the RocksDB store#5 to Cache-0 table:

test=# select _p_c0('players',4,6,1);
NOTICE:  Executing _checkout_c0('players',5,1):
NOTICE:  table "players_c0_5" does not exist, skipping
 _p_c0
-------
     0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279198 | Mike Bryan     | Joker     | 1984-08-21 |     80 |    180 | 2018-04-08 06:25:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
(5 rows)



Update a record:

test=# update players_c0_5 set weight=80.2 where name = 'Mike Bryan';
NOTICE:  table "tmp" does not exist, skipping
UPDATE 1
test=# select * from v2_dna_5;
   tab   | rev |        key        |     ancestor     
---------+-----+-------------------+-------------------
 players |   1 | 11523440602279181 | 11523440602279181
 players |   1 | 11523440602279215 | 11523440602279215
 players |   1 | 11523440602279232 | 11523440602279232
 players |   1 | 11523440602279249 | 11523440602279249
 players |   2 | 11523440812685302 | 11523440602279198
 players |  -1 | 11523440602279198 | 11523440602279198
(6 rows)



Update the same record again:

test=# update players_c0_5 set height=181 where name = 'Mike Bryan';
NOTICE:  table "tmp" does not exist, skipping
UPDATE 1
test=# select * from v2_dna_5;
   tab   | rev |        key        |     ancestor     
---------+-----+-------------------+-------------------
 players |   1 | 11523440602279181 | 11523440602279181
 players |   1 | 11523440602279215 | 11523440602279215
 players |   1 | 11523440602279232 | 11523440602279232
 players |   1 | 11523440602279249 | 11523440602279249
 players |  -1 | 11523440602279198 | 11523440602279198
 players |   3 | 11523440850187058 | 11523440602279198
 players |  -2 | 11523440812685302 | 11523440602279198
(7 rows)


Delete this record and scan the RocksDB store:

test=# delete from players_c0_5 where name = 'Mike Bryan';
DELETE 1
 

test=# select *,'0x' || LPAD(to_hex(key), 16, '0') as hex_key from v2_dna_5;
   tab   | rev |        key        |     ancestor      |      hex_key     
---------+-----+-------------------+-------------------+--------------------
 players |   1 | 11523440602279181 | 11523440602279181 | 0x0028f08217b06d0d
 players |   1 | 11523440602279215 | 11523440602279215 | 0x0028f08217b06d2f
 players |   1 | 11523440602279232 | 11523440602279232 | 0x0028f08217b06d40
 players |   1 | 11523440602279249 | 11523440602279249 | 0x0028f08217b06d51
 players |  -1 | 11523440602279198 | 11523440602279198 | 0x0028f08217b06d1e
 players |  -2 | 11523440812685302 | 11523440602279198 | 0x0028f082243af7f6
 players |   0 | 11523440887503450 | 11523440602279198 | 0x0028f08228b09a5a
 players |  -3 | 11523440850187058 | 11523440602279198 | 0x0028f08226773332
(8 rows)



postgres@quercus:~$ ldb --db=/tmp/rocksdb_5 scan --key_hex
0x0028F08217B06D0D : Petra Che|Baby|"1989-05-30"|62.3|180|565701600000000|5|
0x0028F08217B06D1E : Mike Bryan|Joker|"1984-08-21"|80|180|576483900000000|5|
0x0028F08217B06D2F : Alicia Silver|Checkmate|"1995-09-02"|57.8|168|548931900000000|5|
0x0028F08217B06D40 : Fernan Ozy|Beast|"1967-12-12"|92.7|177|567965400000000|5|
0x0028F08217B06D51 : Ivan Lebed|Ruso|"1959-01-10"|77.4|180|575735400000000|5|
0x0028F082243AF7F6 : Mike Bryan|Joker|"1984-08-21"|80.2|180|576483900000000|
0x0028F08226773332 : Mike Bryan|Joker|"1984-08-21"|80.2|181|576483900000000|
 


Now, to accomplish our task of rewinding Cache-0 data to a certain moment in past all we have to do is select keys that satisfy to the given clause and feed them to our rocks_csv_to_record function:

CREATE OR REPLACE FUNCTION _rewind_c0(text,int,timestamptz)
RETURNS integer AS $$
BEGIN
if ($1 = 'players') then
else
RETURN -1;
end if;
EXECUTE format('drop table if exists %s_c0_%s',$1,$2);
EXECUTE format('create temp table tmp on commit drop as select max(abs(rev)) as max, min(abs(rev)) as min, ancestor from v2_dna_%s where right(key::text,16)::bigint < EXTRACT(EPOCH FROM timestamptz ''%s'')*1000000 and tab = ''%s'' group by ancestor',$2,$3,$1);
if $1 = 'players' then
EXECUTE format('create unlogged table if not exists %s_c0_%s with oids as select key, d.* 
from v2_dna_%s, tmp, rocks_csv_to_record(%s,v2_dna_%s.key)
d(name text, aka text, dob date, weight float, height int, last_seen timestamp
) where abs(v2_dna_%s.rev) = tmp.max and tmp.min != 0 and v2_dna_%s.ancestor = tmp.ancestor',$1,$2,$2,$2,$2,$2,$2);
end if;
EXECUTE format('select rocks_close()');
EXECUTE format('CREATE TRIGGER %s_c0_%s_i
BEFORE INSERT OR UPDATE OR DELETE
ON %s_c0_%s
FOR EACH ROW
EXECUTE PROCEDURE _w_new_row(''%s'',%s);',$1,$2,$1,$2,$1,$2);
RETURN 0;
END;$$ LANGUAGE plpgsql;


Now run this function four times with 30 sec interval around the time we modified and then deleted our record:

test=# select _rewind_c0('players',5,'2018-04-11 13:00:00');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279198 | Mike Bryan     | Joker     | 1984-08-21 |     80 |    180 | 2018-04-08 06:25:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
(5 rows)


test=# select _rewind_c0('players',5,'2018-04-11 13:00:30');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
 11523440812685302 | Mike Bryan     | Joker     | 1984-08-21 |   80.2 |    180 | 2018-04-08 06:25:00
(5 rows)

test=# select _rewind_c0('players',5,'2018-04-11 13:01');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
 11523440850187058 | Mike Bryan     | Joker     | 1984-08-21 |   80.2 |    181 | 2018-04-08 06:25:00
(5 rows)

test=# select _rewind_c0('players',5,'2018-04-11 13:01:30');
 _rewind_c0
------------
          0
(1 row)

test=# select * from players_c0_5;
        key        |      name      |    aka    |    dob     | weight | height |      last_seen     
-------------------+----------------+-----------+------------+--------+--------+---------------------
 11523440602279181 | Petra Che      | Baby      | 1989-05-30 |   62.3 |    180 | 2017-12-04 11:20:00
 11523440602279215 | Alicia Silver  | Checkmate | 1995-09-02 |   57.8 |    168 | 2017-05-24 09:05:00
 11523440602279232 | Fernan Ozy     | Beast     | 1967-12-12 |   92.7 |    177 | 2017-12-30 16:10:00
 11523440602279249 | Ivan Lebed     | Ruso      | 1959-01-10 |   77.4 |    180 | 2018-03-30 14:30:00
(4 rows)



It is clear that the correct working of the described algorithm depends on the consistency of the time measure and uniqueness of the keys assigned while packing the data into the RocksDB store. We have to either synchronize the clocks of all nodes better than 1 microsecond, or make all nodes receive the time from the same clock. The latter solution works fine with NTP time converted to the epoch time on each node. As for the uniqueness of the keys, a simple check in the code of PgRocks is enough to make sure that no two subsequent records are assigned the same key while packing data into the RocksDB store even if it comes out so fast that some rows are written within 1 ms of each other.

Conclusion
In this article we have shown how a hybrid SQL-NoSQL architecture is implemented in a new plugin developed by our team within the last half a year. The code of PgRocks is released under GNU GPL license and can be found at the developers site at https://github.com/kl3eo/PgRocks.

There is a simple live demo at the site http://pgrocks.com that shows how two PostgreSQL nodes work in sync following the methods described in this article.




Комментариев нет:

Отправить комментарий