Big Data: We're Not There Yet - Part I

by Vincent Robaye on September 29, 2016

Recently, one of our customers introduced an old-fashioned data solution: an error-prone ETL-flow coded in C to move flat files to Postgres. We wanted to demonstrate how this could be done with technologies such as Drill, Cassandra, Phoenix, Impala, ... The constraint we have to cope with is that the data ultimately should be consumable by Postgres using a Foreign Data Wrapper.

Piece of cake, right? Wrong!

bigdatanotthereyetp1.jpg

Our customer has transactional data in CSV format. Let's assume these are event logs. Referential data and master data is stored in Postgres, so the logs need to be incorporated into Postgres for further analysis. Initially, Postgres could handle the load perfectly, but as data volume increased, it started reaching its limits and they had to implement an alternative solution to keep performance at an acceptable level.

The company then started using Postgres's Foreign Data Wrapper functionality. This allows Postgres to access external data (think file system, MySQL, Hadoop, ...). It's a C library that translates SQL queries into native function calls on the remote system so that data can be manipulated locally before sending it to Postgres. The logs were cleansed and stored in some binary format on a remote filesystem to distribute the load. The FDW then allows users to combine these remote logs with the data in Postgres.

Moving the data away from the RDBMS into a specialized system and utilizing their respective power to the fullest indeed sounds like a good solution.
We felt however that an easier, more flexible solution could be built using cutting-edge technology. We started brainstorming with these objectives in mind:

  • Performant and cost-efficient
  • Streaming-enabled (in case the logs would be delivered continuously instead of in chunks)
  • Integration with Postgres with as little impact as possible

We arrived at potential to-be architectures that we wanted to try. Yay, a contest: Implement the same (logical) architecture with different storage technologies, and evaluate them using the same generated data set based on:

  • Performance benchmarks using reference queries
  • Estimated total cost

We generated a large data set using Elastic Map Reduce jobs on Amazon's cloud platform. Just for fun! Using Amazon S3 storage also made the data easily available for the three of us. Note that we didn't use S3 as a storage platform for any of the solutions.

Phoenix & HBase

Our first contestant: Apache Phoenix, which adds a relational layer on top of HBase. HBase allows low latency random access in a scalable manner. Out of the box however, it can only be queried using a verbose API. Phoenix adds both client-side and cluster-side components that allow SQL queries to be translated into native HBase operations. This might fit into our to-be architecture.

Phoenix doesn't provide a FDW, but it comes with a JDBC driver. We know of two JDBC compatible FDWs, but only their source code is available. The instructions for compiling, installing and using the FDWs are clearly documented, so we were able to access the data without too much hassle.

We omit the installation instructions of Phoenix & HBase as for our tests we kept it simple and used the Hortonworks Data Platform 2.4 that packages Phoenix 4.4. In Phoenix, we created two tables: one with dummy data and the other with our logs data, which contained respectively 10M and 6.75M records.

The next excerpt shows the installation commands for the first wrapper: JDBC_FDW. First, the source code of your Postgres is downloaded and compiled. Then comes the download, the compilation and the installation of the FDW. As a last step, the client driver of Phoenix is copied to a location accessible to Postgres:

Foreign Data Wrapper Compilation & Installation

wget https://ftp.postgresql.org/pub/source/v9.2.16/postgresql-9.2.16.tar.gz
tar -xvf postgresql-9.2.16.tar.gz
cd postgresql-9.2.16
./configure
gmake
  
cd ~/postgres/sources/postgresql-9.2.16/contrib/
git clone git://github.com/atris/JDBC_FDW.git
 
sudo PATH=/usr/pgsql-9.2/bin/:$PATH make USE_PGXS=1 clean
sudo PATH=/usr/pgsql-9.2/bin/:$PATH make USE_PGXS=1 install
 
cd ~
wget
    https://archive.apache.org/dist/phoenix/phoenix-4.4.0-HBase-1.1
    /bin/phoenix-4.4.0-HBase-1.1-bin.tar.gz
tar -xvf phoenix-4.4.0-HBase-1.1-bin.tar.gz
cp
    phoenix-4.4.0-HBase-1.1-bin/phoenix-4.4.0-HBase-1.1-client.jar
    /usr/pgsql-9.2/lib

After its installation, the FDW can be used within Postgres to create remote tables that access the data stored in Phoenix:

Foreign Table Creation

CREATE EXTENSION jdbc_fdw;
CREATE SERVER jdbc_phoenix_srv
FOREIGN DATA WRAPPER jdbc_fdw
OPTIONS(
  drivername 'org.apache.phoenix.jdbc.PhoenixDriver',
  url 'jdbc:phoenix:localhost:2181:/hbase-unsecure',
  querytimeout '15',
  jarfile '/usr/pgsql-9.2/lib/phoenix-4.4.0-HBase-1.1-client.jar',
  maxheapsize '600'
);
 
CREATE USER MAPPING FOR postgres SERVER jdbc_phoenix_srv;
 
CREATE FOREIGN TABLE logs_remote (
    logTime timestamp not null,
    clientId varchar not null,
    isSecured boolean,
    logLevel integer,
    componentId integer,
    eventId integer,
    latitude double precision,
    longitude double precision
) SERVER jdbc_phoenix_srv OPTIONS (
    table 'logs'
);
 
CREATE FOREIGN TABLE foo_remote_table (
    k int,
    v varchar
) SERVER jdbc_phoenix_srv OPTIONS (
    table 'foo'
)

Finally, the remotely hosted data can be queried from Postgres.

Query Results

postgres=# select * from foo_remote_table
    where k = 123456;
   k    |     v
--------+------------
 123456 | test123456
(1 row)
  
Time: 97508.510 ms
 
postgres=# select * from logs_remote
    where logTime = '2016-05-05 02:36:15.350'
    and clientId = 'C-DAF2-3F1';
       logTime          |  clientId  | eventId | componentId | latitude | longitude
------------------------+------------+---------+------------------------+-----------
 2016-05-05 02:36:15.35 | C-DAF2-3F1 |     581 |  1602752138 |   50.8504|  4.348780

Knowing that the first query runs in less than one second when directly executed in Phoenix, its execution time in this context is far from satisfying. The second query shows that filtering can be applied, even on timestamp columns. The explanations of these results is simple: the predicates are not pushed down to the remote system. Instead Postgres fetches the remote table completely and applies the predicates afterwards. Disqualified!

The second FDW that we tested was JDBC2_FDW. The procedure for installing it and using it to create tables is very similar to the one of the previous FDW. The only difference is that it is not compatible with Postgres 9.2. We tested it on versions 9.3 and 9.4.

Query Results

postgres=# select * from foo_remote_table
    where k = 123456;
   k    |     v
--------+------------
 123456 | test123456
(1 row)
 
Time: 909.116 ms
 
postgres=# select componentId from logs_remote limit 10;
ERROR:  invalid input syntax for type timestamp: "1572486348"
 
postgres=# select * from logs_remote
    where logTime = '2016-05-05 02:36:15.350'
    and clientId = 'C-DAF2-3F1';
ERROR:  org.apache.phoenix.schema.TypeMismatchException:
ERROR 203 (22005): Type mismatch.
    TIMESTAMP and VARCHAR for LOGTIME = '2016-05-05 02:36:15.35'
 
postgres=# select * from logs_remote where
    logTime = to_timestamp('2016-05-05 02:36:15.350','yyyy-MM-dd HH:mi:ss.MS')
    and clientId = 'C-DAF2-3F1';
 logTime | clientId | eventId | componentId | latitude | longitude
---------+----------+---------+-------------+----------+-----------
(0 rows)

With this FDW, the performance of the queries is excellent and almost comparable to those obtained when executing the query directly in Phoenix. Unfortunately, it showed functional limitations as illustrated by the 3 other queries. It failed in selecting only some columns and in filtering on timestamp columns. Disqualified as well!

Cassandra

Second competitor: Cassandra. Like HBase, it is based on Google BigTable's specification. Its architecture is similar to that of HBase since it's also based on a LSM-tree. Unlike HBase, Cassandra is masterless. Each node of the cluster has exactly the same role: to manage a range of data and coordinate incoming queries. Cassandra's query language CQL has a lot in common with SQL, but it also has some differences and limitations. For example, the OR-operator isn't supported. Despite these shortcomings, Cassandra's query performance makes it a valid candidate.

For fairness’ sake, the two tables that we used for evaluating Phoenix have been created in Cassandra and loaded with the exact same amount of records.

Cassandra has two native FDWs: one uses Cassandra's C++ driver, the other one the Python driver. While installation of the cassandra2_fdw was tough because of issues with the libuv library, we think we managed to set up a remote link to Cassandra. Unfortunately, all queries timed out without much information in the console output or log files, so we're not entirely sure if the remote link was set up correctly.

The second FDW for Cassandra, pgCassandra, relies on multicorn which first must be installed before installing the FDW:

easy_install pgxnclient
PATH=/usr/pgsql-9.2/bin:$PATH
pgxn install multicorn
 
pip install cassandra-driver
 
git clone https://github.com/wjch-krl/pgCassandra
cd pgCassandra
python setup.py install

Using the multicorn extension, server & foreign tables can be created in order to access the data hosted in Cassandra: 

CREATE EXTENSION multicorn;
CREATE SERVER foo_cass_srv
FOREIGN DATA WRAPPER multicorn options (
  wrapper 'pgCassandra.CassandraFDW',
  port '9042',
  columnfamily 'foo',
  keyspace 'vroks',
  hosts '127.0.0.1'
);
CREATE SERVER logs_cass_srv
FOREIGN DATA WRAPPER multicorn options (
  wrapper 'pgCassandra.CassandraFDW',
  port '9042',
  columnfamily 'logs',
  keyspace 'vroks',
  hosts '127.0.0.1'
);
CREATE FOREIGN TABLE logs_cass (
    logTime timestamp not null,
    clientId varchar not null,
    isSecured boolean,
    eventId integer,
    componentId integer,
    logLevel integer,
    latitude double precision,
    longitude double precision
) SERVER logs_cass_srv;
 
CREATE FOREIGN TABLE foo_cass  (
    k int,
    v text
) SERVER foo_cass_srv ;

Now is the moment of truth! Will the queries perform better here than on Phoenix?

postgres=# select * from foo_cass where k = 123456;
NOTICE:  CQL query: SELECT k,v FROM vroks.foo WHERE k = 123456  ALLOW FILTERING
   k    |     v
--------+------------
 123456 | test123456
(1 row)
  
Time: 4.822 ms
 
postgres=# select * from logs_cass where
    logTime = to_timestamp('2016-05-05 02:36:15.350', 'yyyy-MM-dd HH:mi:ss.MS')
    and clientId = 'C-DAF2-3F1';
NOTICE:  CQL query: SELECT longitude,clientId,latitude,logTime,eventId,componentId,isSecured,logLevel FROM vroks.logs WHERE clientId = 'C-DAF2-3F1'  AND logTime = 2016-05-05 02:36:15  ALLOW FILTERING
ERROR:  Error in python: SyntaxException
DETAIL:  
Time: 3.823 ms
 
postgres=# select * from logs_cass
    where clientId in ('B-AC31-E6D','F-A1E4-H1A');
NOTICE:  CQL query: SELECT longitude,clientId,latitude,logTime,eventId,componentIdFROM vroks.logsALLOW FILTERING
      logTime        |   clientId  | eventId | componentId |  latitude  | longitude
---------------------+-------------+---------+-------------+------------+-----------
 2016-05-05 00:00:00 | F-A1E4-H1A  |    1659 |  1388861553 |    51.0535 |    3.7304
 2016-05-06 00:00:00 | B-AC31-E6D  |    4328 |   638099536 | 50.4669000 | 4.8674600
 2016-05-08 00:00:00 | B-AC31-E6D  |    3855 |  1231333269 |    50.6412 |    5.5718
 2016-05-08 00:00:00 | F-A1E4-H1A  |    4452 |   962307120 |    51.2192 |    4.4029
(4 rows)
Time: 924332.175 ms

Simple queries such as the first one of the examples work fine and very, very fast. There is almost no difference compared to a direct execution on Cassandra. In a real-life scenario, this will be impacted by the speed of the network connection between Postgres and Cassandra.

The second query does not work at all, once again because of a problem with a translation of the filter that we wanted to apply on a timestamp column, which got translated to a string without quotes. While the third query returns the records it should return, its execution time is completely awful.

The good thing with this FDW is that it shows the query that is sent to the remote system. We can see that the "IN" operator is not correctly translated and that the remote table is fully retrieved by Postgres.

So this contestant didn’t make it to the finish line either.

Time for a break

At this point we've already failed to have a Phoenix or Cassandra-based solution that would work within the confines of our contest. Join us next time to see if we’ll have more success with the ultra-scalable SQL query engine Drill or good old trustworthy Impala.

In the meantime, definitely let us know what your experiences are coupling legacy with new repository technologies. Requests for specific topics are always welcome. See you in a few weeks for Part II.

Special thanks to Gert Nelissen, Niels Nuyttens and Carlo Wouters  for their help creating this post.

Want to talk to one of our Information Management experts? Contact us.

Topics: Information Management