19 Apr 2016

Feeding Postgres Triggers into the Firehose

Log Architecture

I’ve been considering architectures lately that allow for realtime updates across many disparate systems. We use one of these at work and it allows for a near infinite number of subscribers to watch a Kafka stream(s) for updates. Many different systems feed into this pipeline and many systems consume the data. In case of rare/non-existent Kafka downtime all events are stored temporarily into S3.

The architecture that informed this system was documented in a LinkedIn technical article here: https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying. Go spend twenty minutes reading and digesting the implications of that article.

The Problem

I was recently presented with the problem where many applications modify many records in various postgres databases that all need to be indexed by ElasticSearch. Bulk indexing is possible and actively done, but realtime updates are preferable.

The Solution

The solution to this problem rests with Posgres, NOTIFY/LISTEN, an intermediary application, a Kafka stream, and consumers who know how to update records from DB -> ElasticSearch.

Here’s how I prototyped the solution:

  • Register functions that are called whenever a PG database table performs an INSERT/UPDATE/DELETE, aka a database trigger.
  • Function calls NOTIFY <CHANNEL> <PAYLOAD> (or pg_notify(CHANNEL, PAYLOAD)) where channel is a string identifier of where to publish a stream of those events. Payload is a string of arbitrary data, which I set as table=<NAME>,action=<INSERT|UPDATE|DELETE>,id=<ROW_ID>. This payload configuration is similar in concept to a query string and passes just enough information that an event can be registered on Kafka stream.
  • Intermediary application is registered to listen on <CHANNEL> and call a callback of its own for each message. This parses the message and encodes it in a more advanced/portable manner.
  • Intermediary app posts that formatted event onto Kafka stream.
  • Workers consuming stream pick up that event and fire off a re-index of the row.

For simplicity and because of current excitement about Elixir, I built the intermediary application using Elixir and Boltun. Elixir gives me a good degree of confidence in its reliability and uptime because of the built in OTP architecture with GenServer and Supervisor trees. It’s also approachable for other engineers, regardless of their current familiarity with Elixir.

# Requires setting DB connection details in config/config.exs per Boltun Readme

defmodule Listener do
  use Boltun, otp_app: :listener

  listen do
    channel "watchers", :my_callback
  end

  def my_callback(channel, payload) do
    # Send to Kafka and S3
    IO.puts channel
    IO.puts payload
  end
end

Listener.start_link

Database functions and triggers for NOTIFY/LISTEN

-- DROP TABLE example_table;

CREATE TABLE example_table (id serial primary key, name varchar);

-- create function for DELETE action
-- Uses OLD id instead of NEW because the ID after action will be null
CREATE FUNCTION delete_event() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', 'table=' || TG_TABLE_NAME || ',action=' || TG_OP || ',id=' || OLD.id );
  RETURN OLD;
END;
$$ LANGUAGE plpgsql;

-- create function for INSERT/UPDATE action
CREATE FUNCTION insert_or_update_event() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', 'table=' || TG_TABLE_NAME || ',action=' || TG_OP || ',id=' || NEW.id );
  RETURN new;
END;
$$ LANGUAGE plpgsql;

-- attach insert_or_update_event fn to the update_trigger
-- Which is how we specify to act on INSERT/UPDATE
CREATE TRIGGER updates_trigger BEFORE insert or update ON example_table
FOR EACH ROW EXECUTE PROCEDURE insert_or_update_event();

-- Attach delete_event to deletion_trigger for DELETE
CREATE TRIGGER deletion_trigger BEFORE delete ON example_table
FOR EACH ROW EXECUTE PROCEDURE delete_event();

Now start the Elixir application for watching the NOTIFY stream. Execute insert/updates/deletes.

At this point, notifications will go out via PG’s NOTIFY as:

NOTIFY 'watchers', 'table=example_table,action=INSERT,id=2'

You’ll see Elixir logging those events in realtime via NOTIFY. Which is the equivalent of the following function call.

Listener.callback('watchers', 'table=example_table,action=INSERT,id=2')
-- Sample code for running the above SQL and doing row modifications.
-- PSQL <DBNAME>
-- $ psql mytestdb
-- <DBNAME># \i complete_action.sql
-- <DBNAME># insert into example_table (name) VALUES ('something');
-- <DBNAME># insert into example_table (name) VALUES ('something_else');
-- <DBNAME># DELETE from example_table where id=1;
-- <DBNAME># DELETE from example_table where id=2;

Conclusion

With larger datasets and more realtime data streaming through tech companies, I look forward to seeing and working on more log based architectures. These systems provide resilience, fault tolerance, simplicity, and scalability. By funneling events such as PG’s row modifications into a Kafka stream, we build a robust system of keeping ElasticSearch in near-realtime sync with Postgres.

Feed in data from Postgres, from various server logs, from user events, and from business metrics. Soon the kafka firehose is a central river of data running through the organization.