24 Jul 2016

Reflections on Migrating Redis and PG

I had the task of deploying three production databases with minimal downtime. Here’s the takeaways.

Moving Redis with persistent data

Redis needed to move off a couple providers and into another provider. This needed to happen inside a 30 min maintenance window for one application (which performs critial writes) but some novelty loss of other low value writes was an acceptable tradeoff for having 0 downtime of other services.

One db was easily imported using DB host’s Import tool. Another db was not able to use that mechanism and was transfered by redis-transfer. I enjoyed extending the tool to make it work well for this purpose.

Postgres

Simplest of all, it was a matter of generating Heroku backup, downloading that link and importing it into other db.

#!/usr/bin/env bash

# References https://devcenter.heroku.com/articles/heroku-postgres-import-export
#
# Requires heroku commandline tool.
# The following ENV are required
# HEROKU_API_KEY=

# The following envs are required for the destination DB and are automatically
# used by PG.
# PGPASSWORD=
# PGUSER=
# PGHOST=
# PGPORT=

# Set this for simpler scripting
# PGDATABASE=

# Install heroku toolkit https://zph.xargs.io/heroku-toolkit-install.sh | bash
# sudo apt-get install postgresql
OUTPUT_FILE="latest.dump"
APP_NAME=$HEROKU_APP
heroku=$HEROKU_BIN
$heroku pg:backups -a $APP_NAME capture && \
  curl -o $OUTPUT_FILE `$heroku pg:backups -a $APP_NAME public-url` && \
  pg_restore --verbose --clean --no-acl --no-owner -d $PGDATABASE $OUTPUT_FILE

The Day Of

I ran through all the steps, outlined them, then setup working scripts for each portion of process. Those were then setup as commands in a command station type tmux session.

Each Tmux tab was a phase of the process: maintenance_mode:on, redis_migrations, maintenance_mode:off, pg_migrations, logging

Inside each tab it had the commands I would need to one, one per section of the window:

|-----------------|------------------|
| redis1_migration| redis_migration2 |
|-----------------|------------------|
| point to new r1 | point to new r2  |
|-----------------|------------------|

Performing the Migration

  • Notified stakeholders in advance
  • Prepared steps, conducted trials against staging
  • Setup migration scripts
  • Walk through checklist in 15 min before time
  • Set one heroku app to maintenance mode
  • Import 2 redis dbs
    • Verify result
    • Run script to point to those new endpoints
  • Maintenance mode off
  • PG migrate
    • Verify results
    • Run script to point to new endpoints

Conclusion

Glad redis-transfer was available to help with a recalcitrant server. And I’m glad to be preparing postgres for more active duty in our stack.

My takeaway from accomplishing this migration was that careful planning leads to quick and uneventful maintenance windows. Also, I’d rather migrate pg than redis.

And have a migration buddy :). Makes it far more enjoyable and extra hands in case things go wrong.

24 Jul 2016

Added Shortlinks To Hugo Blog

I got a bee in my bonnet today about adding unobtrusive Twitter share links to this blog.

It involved the following steps:

  • Finding out how to do it without using Twitter’s SDK on page
  • Wiring that into a Hugo template
  • Adding fragment to share links
  • Adding mechanism for shortlinks on blog

Twitter Shares without their SDK

I prefer not to include Third Party JS on pages for security and purity reasons.

I searched around on NPM and found something simple that reflected this attitude: SocialMediaLinks and then built off of there for just the functionality I needed.

Wiring that into Hugo

I embed a few data attributes on .twitter_share using a Hugo partial.

<a href="#"
   target="_blank"
   class="twitter-share in-headline"
   data-url="{{.Permalink}}"
   data-via="_ZPH"
   data-title="{{.Title}}"
   {{ if .IsPage }}
     data-aliases="{{ .Aliases | jsonify }}"
   {{ end }}
   ><i class="fa fa-2x fa-twitter"></i></a>

When the page loads, the div’s href is filled in using this fn:

document.addEventListener("DOMContentLoaded", function() {
  _.each(document.querySelectorAll('.twitter-share'), function(el) {
    const { via, title, aliases } = el.dataset
    var ax, url
    try {
      ax = JSON.parse(aliases)
      url = _.sortBy(ax, length)[0]
    } catch (e) {
      url = el.dataset.url
    }
    const href = SocialMediaLinks.create({account: 'twitter', url: url, title: title, via: via})
    el.href = href
  })
});

Parsing/Stringifying Urls

This is my happiest implementation of url parsing so far in Javascript. The concept is adapted from https://gist.github.com/jlong/2428561 and adapted to suit ES6. The clever trick is getting the browser to do the parsing by making it an a element.

import * as _ from 'lodash'

export default class Link {
  constructor(u) {
    this.url = this.parseURL(u);
  }

  parseURL(url) {
    // Credit: https://www.abeautifulsite.net/parsing-urls-in-javascript
    // And Originally: https://gist.github.com/jlong/2428561
    var parser = document.createElement('a')
    // Let the browser do the work
    parser.href = url;
		//  Available on parser
		// 	protocol
		// 	host
		// 	hostname
		// 	port
		// 	pathname
		// 	search aka queryParams
		// 	hash
    return parser;
  }

  getQueryParams() {
    const kvs = this.url.search.replace(/^\?/, '').split('&');
    return _.reduce(kvs, function(acc, kv) {
      var k, v = kv.split('=');
      if (_.isEmpty(k)) {
        return acc
      } else {
        return acc[k] = v
      }
    }, {})
  }

  setQueryParam(k, obj) {
    const qp = this.getQueryParams()
    qp[k] = obj;
    // Keep Parser in Sync so we can use href
    this.url.search = this.queryParamsToString(qp)
    return qp
  }

  emptyOr(v, ifEmpty, notEmpty) {
    if (_.isEmpty(v)) {
      return ifEmpty
    } else {
      return notEmpty
    }
  }

  queryParamsToString(qp) {
    return _.map(qp, function(v, k) {
      return [k, v].join("=")
    }).join("&")
  }

  toString() {
    return this.url.href;
  }
}

The ShareSocialMedia.create() function appends a query param that’s a hashed value so that retweets and content pathways can be tracked for analytics.

When building the twitter link, we check for a shortcode in the Aliases portion of page metadata and fallback to using the full link. By using aliases frontmatter for this Hugo will autogenerate redirect urls for each of these entries with a 301 link

The redirects work by generating an html document at that alias location like so (from the Hugo docs):

<html>
	<head>
		<link rel="canonical" href="http://mysite.tld/posts/my-original-url"/>
		<meta http-equiv="content-type" content="text/html; charset=utf-8"/>
		<meta http-equiv="refresh" content="0;url=http://mysite.tld/posts/my-original-url"/>
	</head>
</html>

And Finally

My post-new script for creating new posts on blog has a function in it to take the filename of the post, md5 hash it, and take the first 6 chars. That value’s inserted into the page frontmatter.

Try it out ;-) aliased link

Full code

24 Jul 2016

Using Hugo Static Site Generator

I reworked this blog to use Hugo static site generator because my Octopress site was a bit long in the tooth.

It’s now using the following:

The tooling for compiling and releasing is here:

23 Jul 2016

On Being a 10x Engineer

Wise words:

21 Jul 2016

Find and Remove Pesky pLists

While looking up a system crash on OSX today, I found that Console.app was reporting recurrent issues with two old programs that had been migrated to this laptop from prior one. Neither app was still in use, but their plist configurations (init.d type startup scripts) were still trying to run.

So here’s how I nuked them

find ~/ -name "*.plist" | parallel --dry-run 'rm -f {}'

rm -f or mv {} $(basename {}) those out of the way.

21 Jul 2016

Spacemacs Go Mode and go set project

I’m using go-mode and needed the following script to correctly guess my GOPATH:

(defun go-set-project-with-guard ()
  (let* ((g (go-guess-gopath))
          (d (concat (getenv "HOME") "/src/golang"))
          (r (concat (getenv "HOME") "/src:"))
          (has-match (string-match r g)))

    (if (not (eq 0 has-match))
        (setenv "GOPATH" d)
        (setenv "GOPATH" g))))

(eval-after-load 'go-mode
  '(add-hook 'go-mode-hook 'go-set-project-with-guard))

The problem with using go-mode’s script was that my default path for code is ~/src. Which means that go-guess-gopath will recurse up to the top of that path rather than remaining at ~/src/golang.

So I wrapped the script with my own fn to check if script returns incorrect pathing, and set it correctly if so.

PS - I’d prefer to use apply-partially here for concat '(getenv "HOME")' but didn’t get it working in 10 min and figured my time was better spent posting this blog entry.

21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

package main

import (
	"flag"
	"fmt"
	"gopkg.in/redis.v4"
	"log"
	"net"
	"net/url"
	"os"
	"reflect"
	"strconv"
	"sync"
)

type redisKey string
type pattern string

type RedisPipe struct {
	from     *RedisServer
	to       *RedisServer
	keys     string
	shutdown chan bool
}

type RedisServer struct {
	client *redis.Client
	host   string
	port   int
	db     int
	pass   string
}

type Discrepancy struct {
	key redisKey
	src interface{}
	dst interface{}
}

func parseRedisURI(s string) (server *RedisServer, err error) {
	// Defaults
	host := "localhost"
	password := ""
	port := 6379
	db := 0

	u, err := url.Parse(s)
	if err != nil {
		log.Fatal(err)
	}
	if u.Scheme != "redis" {
		log.Fatal("Scheme must be redis")
	}
	q := u.Query()
	dbS := q.Get("db")
	if u.User != nil {
		var ok bool
		password, ok = u.User.Password()
		if !ok {
			password = ""
		}
	}

	var p string
	host, p, _ = net.SplitHostPort(u.Host)

	if p != "" {
		port, err = strconv.Atoi(p)
		if err != nil {
			log.Fatalf("Unable to convert port to integer for %s", err)
		}
	}

	if dbS != "" {
		db, err = strconv.Atoi(dbS)
		if err != nil {
			log.Fatalf("Unable to convert db to integer for %s", dbS)
		}
	}

	client := CreateClient(host, password, port, db)
	return &RedisServer{client, host, port, db, password}, nil
}

func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
	keyChan := make(chan redisKey, 1000)
	split := make(chan []string)

	splitter := func() {
		defer wg.Done()
		defer close(keyChan)
		for {
			select {
			case ks, ok := <-split:
				if !ok {
					return
				}
				for _, k := range ks {
					keyChan <- redisKey(k)
				}
			}
		}
	}

	keyScanner := func(c chan redisKey) {
		defer wg.Done()
		var cursor uint64
		var n int
		for {
			var keys []string
			var err error
			// REDIS SCAN
			// http://redis.io/commands/scan
			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
			if err != nil {
				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
			}
			split <- keys

			n += len(keys)
			if cursor == 0 {
				close(split)
				return
			}
		}
	}

	wg.Add(1)
	go splitter()

	wg.Add(1)
	go keyScanner(keyChan)

	return keyChan
}

func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
	s, err := src.client.Get(string(key)).Result()
	if err != nil {
		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
	}
	d, _ := dst.client.Get(string(key)).Result()
	isMatch := reflect.DeepEqual(s, d)
	return s, d, isMatch

}

func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case _, ok := <-p.shutdown:
				if !ok {
					return
				}
			case k, ok := <-c:
				if !ok {
					return
				}
				s, d, isMatch := p.compare(p.from, p.to, k)
				if !isMatch {
					mismatches <- &Discrepancy{k, s, d}
				}
			}
		}
	}()
}

func CreateClient(host, pass string, port, db int) *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:     fmt.Sprintf("%s:%d", host, port),
		Password: pass,
		DB:       db,
	})
}

func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
	defer wg.Done()
	i := *del
	for d := range c {
		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
	}
}

func main() {
	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
	match := flag.String("keys", "*", "Match subset of keys `*`")
	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
	flag.Parse()
	if *src == "" {
		flag.Usage()
		os.Exit(1)
	}
	from, _ := parseRedisURI(*src)
	to, _ := parseRedisURI(*dst)

	var wg sync.WaitGroup
	shutdown := make(chan bool, 1)
	discrepancies := make(chan *Discrepancy)
	pipe := &RedisPipe{from, to, *match, shutdown}
	keyChan := pipe.from.scanner(pattern(*match), &wg)

	tx := *threads
	for i := 0; i < tx; i++ {
		p := &RedisPipe{from, to, *match, shutdown}
		p.CompareKeys(keyChan, discrepancies, &wg)
	}

	// Setup Writer
	var wgWriter sync.WaitGroup
	wgWriter.Add(1)
	go writer(discrepancies, &wgWriter, delimiter)

	// Wait for threads to complete
	wg.Wait()
	// Start cleanup routine for writer
	close(discrepancies)
	// Wait for writer to close fn
	wgWriter.Wait()
}
package main

import (
	"crypto/rand"
	"flag"
	"fmt"
	"github.com/rlmcpherson/s3gof3r"
	"io"
	"log"
	"os"
	"os/exec"
	"strings"
	"sync"
	"time"
)

var (
	bucketName        = flag.String("bucket", "", "Upload bucket")
	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
	db                = flag.String("db", "", "db name")
	username          = flag.String("username", "", "user name")
	password          = flag.String("password", "", "password")
	host              = flag.String("host", "", "host:port")
	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
	pReader, pWriter  = io.Pipe()

	wg sync.WaitGroup

	bucket *s3gof3r.Bucket
	date   string
)

func mustGetEnv(key string) string {
	s := os.Getenv(key)
	if s == "" {
		log.Fatalf("Missing ENV %s", key)
	}
	return s
}

func createBackup() error {
	defer pWriter.Close()
	defer wg.Done()
	wg.Add(1)
	name, err := exec.LookPath(*mongodump)
	if err != nil {
		log.Fatalf("Mongodump cannot be found on path")
	}
	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
	// 3.0.5 in homebrew is missing --archive
	// 3.2 is where archive to STDOUT became available
	if *excludeCollection != "" {
		*excludeCollection = "--excludeCollection=" + *excludeCollection
	}
	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
	cmd := exec.Command(name, args...)
	cmd.Stdout = pWriter
	cmd.Stderr = os.Stderr
	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
	err = cmd.Run()
	if err != nil {
		return err
	}
	return nil
}

func pseudo_uuid() (uuid string) {
	// Credit: http://stackoverflow.com/a/25736155
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		fmt.Println("Error: ", err)
		return
	}

	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])

	return
}

func setupFlags() {
	flag.Parse()
	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
	fatal := false
	for _, f := range flags {
		fl := flag.Lookup(f)
		s := fl.Value.String()
		if s == "" {
			fatal = true
			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
		}
	}
	if fatal {
		log.Fatal("Exiting because of missing flags.")
	}
}

func setupS3() *s3gof3r.Bucket {
	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
	keys := s3gof3r.Keys{
		AccessKey: awsAccessKey,
		SecretKey: awsSecretKey,
	}
	s3 := s3gof3r.New("", keys)
	return s3.Bucket(*bucketName)
}

func generateS3Key() string {
	now := time.Now().Format("2006-01-02/15")
	prefix := ""
	if *keyPrefix != "" {
		prefix = *keyPrefix + "/"
	}
	uuid := pseudo_uuid()
	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
}

func main() {
	setupFlags()
	bucket := setupS3()

	go createBackup()

	s3Key := generateS3Key()
	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
	w, err := bucket.PutWriter(s3Key, nil, nil)
	if err != nil {
		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
	}
	defer func() {
		w.Close()
		log.Printf("Successfully uploaded %s", output)
	}()

	log.Printf("Uploading to %s", output)
	written, err := io.Copy(w, pReader)
	if err != nil {
		log.Printf("Error Uploading to %s, ERROR: %s", output, err)
	}

	wg.Wait()

	log.Printf("Attempting to write %d bytes", written)
}

19 Jul 2016

File watching revisited
#!/usr/bin/env bash
# Usage:
# re-run ~/go-project make build


if [[ ! -x $(which fswatch) ]];then
  echo "Must install fswatch"
  exit 1
fi

main(){
  DIR_OR_FILE=$1
  shift
  CMD="$@"
  fswatch -or ${DIR_OR_FILE} | xargs -n1 -I{} -- ${CMD}
}

main "$@"

18 Jul 2016

How to Verify Links After Blog Upgrade

Scrape full link set from site:

wget -r -l4 –spider -D blog.xargs.io http://blog.xargs.io

Analyze link set from site

tree -J -f blog.xargs.io | grep file | grep -o 'name.*' | \
  awk -F":" '{print $2}' | tr -d '",}' | sort -u

Curl to see which currently work

# Deal with multiple saved copies of same entry from wget
cat current_links.log | grep -v "\.[[:digit:]]*$" | \
sed 's/blog.xargs.io/http:\/\/blog.xargs.io/g' | \
  parallel -- \
    "curl -o /dev/null --silent --head --write-out '%{http_code} %{url_effective}\n' {}" | \
  sort -u | tail -r > current_links_master.log

Then run the results.csv through a processor to compare your staging site to your production site. Watch for those 404s and make sure your 302s look good.

cat current_links_master.log | sed 's/blog.xargs.io/localhost:5000/g' | \
  parallel -- \
    "curl -o /dev/null --silent --head --write-out '%{http_code} %{url_effective}\n' {}" | \
  sort -u | tail -r | grep -v "(200|302)"

Credit for these scripts:

19 Apr 2016

Feeding Postgres Triggers into the Firehose

Log Architecture

I’ve been considering architectures lately that allow for realtime updates across many disparate systems. We use one of these at work and it allows for a near infinite number of subscribers to watch a Kafka stream(s) for updates. Many different systems feed into this pipeline and many systems consume the data. In case of rare/non-existent Kafka downtime all events are stored temporarily into S3.

The architecture that informed this system was documented in a LinkedIn technical article here: https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying. Go spend twenty minutes reading and digesting the implications of that article.

The Problem

I was recently presented with the problem where many applications modify many records in various postgres databases that all need to be indexed by ElasticSearch. Bulk indexing is possible and actively done, but realtime updates are preferable.

The Solution

The solution to this problem rests with Posgres, NOTIFY/LISTEN, an intermediary application, a Kafka stream, and consumers who know how to update records from DB -> ElasticSearch.

Here’s how I prototyped the solution:

  • Register functions that are called whenever a PG database table performs an INSERT/UPDATE/DELETE, aka a database trigger.
  • Function calls NOTIFY <CHANNEL> <PAYLOAD> (or pg_notify(CHANNEL, PAYLOAD)) where channel is a string identifier of where to publish a stream of those events. Payload is a string of arbitrary data, which I set as table=<NAME>,action=<INSERT|UPDATE|DELETE>,id=<ROW_ID>. This payload configuration is similar in concept to a query string and passes just enough information that an event can be registered on Kafka stream.
  • Intermediary application is registered to listen on <CHANNEL> and call a callback of its own for each message. This parses the message and encodes it in a more advanced/portable manner.
  • Intermediary app posts that formatted event onto Kafka stream.
  • Workers consuming stream pick up that event and fire off a re-index of the row.

For simplicity and because of current excitement about Elixir, I built the intermediary application using Elixir and Boltun. Elixir gives me a good degree of confidence in its reliability and uptime because of the built in OTP architecture with GenServer and Supervisor trees. It’s also approachable for other engineers, regardless of their current familiarity with Elixir.

# Requires setting DB connection details in config/config.exs per Boltun Readme

defmodule Listener do
  use Boltun, otp_app: :listener

  listen do
    channel "watchers", :my_callback
  end

  def my_callback(channel, payload) do
    # Send to Kafka and S3
    IO.puts channel
    IO.puts payload
  end
end

Listener.start_link

Database functions and triggers for NOTIFY/LISTEN

-- DROP TABLE example_table;

CREATE TABLE example_table (id serial primary key, name varchar);

-- create function for DELETE action
-- Uses OLD id instead of NEW because the ID after action will be null
CREATE FUNCTION delete_event() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', 'table=' || TG_TABLE_NAME || ',action=' || TG_OP || ',id=' || OLD.id );
  RETURN OLD;
END;
$$ LANGUAGE plpgsql;

-- create function for INSERT/UPDATE action
CREATE FUNCTION insert_or_update_event() RETURNS trigger AS $$
DECLARE
BEGIN
  PERFORM pg_notify('watchers', 'table=' || TG_TABLE_NAME || ',action=' || TG_OP || ',id=' || NEW.id );
  RETURN new;
END;
$$ LANGUAGE plpgsql;

-- attach insert_or_update_event fn to the update_trigger
-- Which is how we specify to act on INSERT/UPDATE
CREATE TRIGGER updates_trigger BEFORE insert or update ON example_table
FOR EACH ROW EXECUTE PROCEDURE insert_or_update_event();

-- Attach delete_event to deletion_trigger for DELETE
CREATE TRIGGER deletion_trigger BEFORE delete ON example_table
FOR EACH ROW EXECUTE PROCEDURE delete_event();

Now start the Elixir application for watching the NOTIFY stream. Execute insert/updates/deletes.

At this point, notifications will go out via PG’s NOTIFY as:

NOTIFY 'watchers', 'table=example_table,action=INSERT,id=2'

You’ll see Elixir logging those events in realtime via NOTIFY. Which is the equivalent of the following function call.

Listener.callback('watchers', 'table=example_table,action=INSERT,id=2')
-- Sample code for running the above SQL and doing row modifications.
-- PSQL <DBNAME>
-- $ psql mytestdb
-- <DBNAME># \i complete_action.sql
-- <DBNAME># insert into example_table (name) VALUES ('something');
-- <DBNAME># insert into example_table (name) VALUES ('something_else');
-- <DBNAME># DELETE from example_table where id=1;
-- <DBNAME># DELETE from example_table where id=2;

Conclusion

With larger datasets and more realtime data streaming through tech companies, I look forward to seeing and working on more log based architectures. These systems provide resilience, fault tolerance, simplicity, and scalability. By funneling events such as PG’s row modifications into a Kafka stream, we build a robust system of keeping ElasticSearch in near-realtime sync with Postgres.

Feed in data from Postgres, from various server logs, from user events, and from business metrics. Soon the kafka firehose is a central river of data running through the organization.