20 Jan 2017

Announcing MoreSQL (Realtime Mongo -> PG streaming)

I’m proud to announce that MoreSQL is live and production ready :)!

We’ve been using it in production for a few months to stream production data mutations from Mongo to PostgreSQL. The latency is normally subsecond and has scaled well with a small footprint in the two production use cases.


It’s written in Golang and conceived of based on a Ruby project called mosql (built by the wonderful folks over at Stripe). After using that in production our telemetry revealed that it was lagging behind during mutation intensive periods. We were further stymied from upgrading MongoDB due to version incompatibilities in MoSQL.

Implementing the project from the ground up in Golang allowed for better concurrency, lower latency, and a small memory footprint (often 20-50MB under load).


Moresql is released under a permissive license and is open source software. You’re welcome to set it up in production environments and submit pull requests to improve the project.


If you’d like a turnkey solution implemented for your business in order to have realtime data from Mongo sent to Postgres for reporting, analytics, query performance or as a mongo to postgres migration strategy, send me a note. As the author of Moresql and with running this solution in production on different systems, I’m ready to solve your problem!


We used this project in production from 2017-2019 for our primary data storage and using MoreSQL was a critical component of moving off of MongoDB.

12 Sep 2016

Good Dull Best Practices in Operations

Tonight I read this paper: pdf or archive.

And I was impressed by the boring and solid guidelines therein.

My takeaways were:

  • Immutable/recreatable servers and infrastructure. Poignant b/c of an EC2 hardware failure today and needing to recover by snapshotting the root drive and reattaching to new server.
  • Instrument all the things.
  • Spread testing across unit, integration and multi-service tests
  • Gradual deployment of new services/updates. As a colleague and friend would say, “bake it in production for a little while”.
  • “Proven technology is almost always better than operating on the bleeding edge. Stable software is better than an early copy, no matter how valuable the new feature seems.” Bleeding edge is named that way for a reason. I find this tension between proven technology and bleeding edge to be on my mind lately.

07 Sep 2016

Using Clojure on AWS Lambda

AWS Lambda is great for ad hoc services without needing to manage additional infrastructure. I’ve used it on a couple tasks for syncing S3 buckets.

The workflow goes like this:

  • Register a lambda function
  • Setup appropriate role and ARN permissions
  • Setup a trigger, ie a circumstance that should invoke this function
  • Build code to respond to the trigger
  • Upload, debug, etc

So this weekend I built an AWS Lambda in python to transform some textfiles that were stored in EDN format into JSON and then partition them according to one key. EDN is a json-ish format from the Clojure world (https://en.wikipedia.org/wiki/Extensible_Data_Notation). These EDN files were on S3 and gzip compressed.

I built the lambda in python, used boto3, and edn_format for freeing the data from EDN. I packaged those dependencies up into a zipfile and shipped it to staging environment.

It worked marvelously on files that were up to 1MB in size. Then larger files started timing out… because AWS Lambda has an upper time limit of 300 seconds per execution. I found the culprit files, mostly ~ 7MB of gzipped EDN, tried them locally, performance profiled it, and realized the issue was in deserializing EDN data in Python. Woops! As you might expect, EDN libraries are few and far between compared to JSON. And they tend to be less robust and don’t delegate to C extensions.

Now clojure is the logical choice for this EDN -> JSON partitioning task. But AWS only officially supports Java, Python and Node.js.

But clojure is really just java under the hood… so I found an article with the basic guidelines and set to work. (Article: https://aws.amazon.com/blogs/compute/clojure/).

The trick to using clojure is needing to expose a static method with the appropriate signature for AWS Lambda and then using a few project.clj configurations.

project.clj - Note the uberjar profile with :aot :all and the aws lambda clojar. Include [com.amazonaws/aws-lambda-java-core “1.0.0”] as dependency and set :profiles {:uberjar {:aot :all}}

Then to help with the aws-lambda protocol, I followed instructions from the original article, along with a secondary source of information from @kobmic on Github. I’m particularly happy with their implementation of the deflambda macro, copied to here:

;; convenience macro for generating gen-class and handleRequest
(defmacro deflambda [name args & body]
  (let [class-name (->> (clojure.string/split (str name) #"-")
                     (mapcat clojure.string/capitalize)
                     (apply str))
        fn-name (symbol (str "handle-" name "-event"))]
    `(do (gen-class
           :name ~(symbol class-name)
           :prefix ~(symbol (str class-name "-"))
           :implements [com.amazonaws.services.lambda.runtime.RequestStreamHandler])

         (defn ~(symbol (str class-name "-handleRequest")) [this# is# os# context#]
           (let [~fn-name (fn ~args ~@body)
                 w# (io/writer os#)]
             (-> (json/read (io/reader is#) :key-fn keyword)
               (json/write w#))
             (.flush w#))))))

Used like

(deflambda s3-split [event]
  (example.core/handler event)

And in the AWS Lambda dashboard, the handler name is S3Split::handleRequest.

So where the Python version of this code was timing out at 300 seconds without completing the task, my clojure lambda burns through it in 20-70 seconds and has been working well.

Additional Code for Deploying/Updating/Building

Create lambda function

#!/usr/bin/env bash

aws lambda create-function --function-name example-lambda --handler S3Put::handleRequest --runtime java8 --memory 512 --timeout 120 --role arn:aws:iam::<ID>:role/example-role-lambda --zip-file fileb://./target/example-0.1.0-SNAPSHOT-standalone.jar

Update lambda function

#!/usr/bin/env bash

aws lambda update-function-code \
  --function-name example-lambda \
  --zip-file fileb://./target/example-1.0.0-SNAPSHOT-standalone.jar


lein uberjar

24 Jul 2016

Reflections on Migrating Redis and PG

I had the task of deploying three production databases with minimal downtime. Here’s the takeaways.

Moving Redis with persistent data

Redis needed to move off a couple providers and into another provider. This needed to happen inside a 30 min maintenance window for one application (which performs critial writes) but some novelty loss of other low value writes was an acceptable tradeoff for having 0 downtime of other services.

One db was easily imported using DB host’s Import tool. Another db was not able to use that mechanism and was transfered by redis-transfer. I enjoyed extending the tool to make it work well for this purpose.


Simplest of all, it was a matter of generating Heroku backup, downloading that link and importing it into other db.

#!/usr/bin/env bash

# References https://devcenter.heroku.com/articles/heroku-postgres-import-export
# Requires heroku commandline tool.
# The following ENV are required

# The following envs are required for the destination DB and are automatically
# used by PG.

# Set this for simpler scripting

# Install heroku toolkit https://zph.xargs.io/heroku-toolkit-install.sh | bash
# sudo apt-get install postgresql
$heroku pg:backups -a $APP_NAME capture && \
  curl -o $OUTPUT_FILE `$heroku pg:backups -a $APP_NAME public-url` && \
  pg_restore --verbose --clean --no-acl --no-owner -d $PGDATABASE $OUTPUT_FILE

The Day Of

I ran through all the steps, outlined them, then setup working scripts for each portion of process. Those were then setup as commands in a command station type tmux session.

Each Tmux tab was a phase of the process: maintenance_mode:on, redis_migrations, maintenance_mode:off, pg_migrations, logging

Inside each tab it had the commands I would need to one, one per section of the window:

| redis1_migration| redis_migration2 |
| point to new r1 | point to new r2  |

Performing the Migration

  • Notified stakeholders in advance
  • Prepared steps, conducted trials against staging
  • Setup migration scripts
  • Walk through checklist in 15 min before time
  • Set one heroku app to maintenance mode
  • Import 2 redis dbs
    • Verify result
    • Run script to point to those new endpoints
  • Maintenance mode off
  • PG migrate
    • Verify results
    • Run script to point to new endpoints


Glad redis-transfer was available to help with a recalcitrant server. And I’m glad to be preparing postgres for more active duty in our stack.

My takeaway from accomplishing this migration was that careful planning leads to quick and uneventful maintenance windows. Also, I’d rather migrate pg than redis.

And have a migration buddy :). Makes it far more enjoyable and extra hands in case things go wrong.

24 Jul 2016

Added Shortlinks To Hugo Blog

I got a bee in my bonnet today about adding unobtrusive Twitter share links to this blog.

It involved the following steps:

  • Finding out how to do it without using Twitter’s SDK on page
  • Wiring that into a Hugo template
  • Adding fragment to share links
  • Adding mechanism for shortlinks on blog

Twitter Shares without their SDK

I prefer not to include Third Party JS on pages for security and purity reasons.

I searched around on NPM and found something simple that reflected this attitude: SocialMediaLinks and then built off of there for just the functionality I needed.

Wiring that into Hugo

I embed a few data attributes on .twitter_share using a Hugo partial.

<a href="#"
   class="twitter-share in-headline"
   {{ if .IsPage }}
     data-aliases="{{ .Aliases | jsonify }}"
   {{ end }}
   ><i class="fa fa-2x fa-twitter"></i></a>

When the page loads, the div’s href is filled in using this fn:

document.addEventListener("DOMContentLoaded", function() {
  _.each(document.querySelectorAll('.twitter-share'), function(el) {
    const { via, title, aliases } = el.dataset
    var ax, url
    try {
      ax = JSON.parse(aliases)
      url = _.sortBy(ax, length)[0]
    } catch (e) {
      url = el.dataset.url
    const href = SocialMediaLinks.create({account: 'twitter', url: url, title: title, via: via})
    el.href = href

Parsing/Stringifying Urls

This is my happiest implementation of url parsing so far in Javascript. The concept is adapted from https://gist.github.com/jlong/2428561 and adapted to suit ES6. The clever trick is getting the browser to do the parsing by making it an a element.

import * as _ from 'lodash'

export default class Link {
  constructor(u) {
    this.url = this.parseURL(u);

  parseURL(url) {
    // Credit: https://www.abeautifulsite.net/parsing-urls-in-javascript
    // And Originally: https://gist.github.com/jlong/2428561
    var parser = document.createElement('a')
    // Let the browser do the work
    parser.href = url;
		//  Available on parser
		// 	protocol
		// 	host
		// 	hostname
		// 	port
		// 	pathname
		// 	search aka queryParams
		// 	hash
    return parser;

  getQueryParams() {
    const kvs = this.url.search.replace(/^\?/, '').split('&');
    return _.reduce(kvs, function(acc, kv) {
      var k, v = kv.split('=');
      if (_.isEmpty(k)) {
        return acc
      } else {
        return acc[k] = v
    }, {})

  setQueryParam(k, obj) {
    const qp = this.getQueryParams()
    qp[k] = obj;
    // Keep Parser in Sync so we can use href
    this.url.search = this.queryParamsToString(qp)
    return qp

  emptyOr(v, ifEmpty, notEmpty) {
    if (_.isEmpty(v)) {
      return ifEmpty
    } else {
      return notEmpty

  queryParamsToString(qp) {
    return _.map(qp, function(v, k) {
      return [k, v].join("=")

  toString() {
    return this.url.href;

The ShareSocialMedia.create() function appends a query param that’s a hashed value so that retweets and content pathways can be tracked for analytics.

When building the twitter link, we check for a shortcode in the Aliases portion of page metadata and fallback to using the full link. By using aliases frontmatter for this Hugo will autogenerate redirect urls for each of these entries with a 301 link

The redirects work by generating an html document at that alias location like so (from the Hugo docs):

		<link rel="canonical" href="http://mysite.tld/posts/my-original-url"/>
		<meta http-equiv="content-type" content="text/html; charset=utf-8"/>
		<meta http-equiv="refresh" content="0;url=http://mysite.tld/posts/my-original-url"/>

And Finally

My post-new script for creating new posts on blog has a function in it to take the filename of the post, md5 hash it, and take the first 6 chars. That value’s inserted into the page frontmatter.

Try it out ;-) aliased link

Full code

24 Jul 2016

Using Hugo Static Site Generator

I reworked this blog to use Hugo static site generator because my Octopress site was a bit long in the tooth.

It’s now using the following:

The tooling for compiling and releasing is here:

23 Jul 2016

On Being a 10x Engineer

Wise words:

21 Jul 2016

Find and Remove Pesky pLists

While looking up a system crash on OSX today, I found that Console.app was reporting recurrent issues with two old programs that had been migrated to this laptop from prior one. Neither app was still in use, but their plist configurations (init.d type startup scripts) were still trying to run.

So here’s how I nuked them

find ~/ -name "*.plist" | parallel --dry-run 'rm -f {}'

rm -f or mv {} $(basename {}) those out of the way.

21 Jul 2016

Spacemacs Go Mode and go set project

I’m using go-mode and needed the following script to correctly guess my GOPATH:

(defun go-set-project-with-guard ()
  (let* ((g (go-guess-gopath))
          (d (concat (getenv "HOME") "/src/golang"))
          (r (concat (getenv "HOME") "/src:"))
          (has-match (string-match r g)))

    (if (not (eq 0 has-match))
        (setenv "GOPATH" d)
        (setenv "GOPATH" g))))

(eval-after-load 'go-mode
  '(add-hook 'go-mode-hook 'go-set-project-with-guard))

The problem with using go-mode’s script was that my default path for code is ~/src. Which means that go-guess-gopath will recurse up to the top of that path rather than remaining at ~/src/golang.

So I wrapped the script with my own fn to check if script returns incorrect pathing, and set it correctly if so.

PS - I’d prefer to use apply-partially here for concat '(getenv "HOME")' but didn’t get it working in 10 min and figured my time was better spent posting this blog entry.

21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

package main

import (

type redisKey string
type pattern string

type RedisPipe struct {
	from     *RedisServer
	to       *RedisServer
	keys     string
	shutdown chan bool

type RedisServer struct {
	client *redis.Client
	host   string
	port   int
	db     int
	pass   string

type Discrepancy struct {
	key redisKey
	src interface{}
	dst interface{}

func parseRedisURI(s string) (server *RedisServer, err error) {
	// Defaults
	host := "localhost"
	password := ""
	port := 6379
	db := 0

	u, err := url.Parse(s)
	if err != nil {
	if u.Scheme != "redis" {
		log.Fatal("Scheme must be redis")
	q := u.Query()
	dbS := q.Get("db")
	if u.User != nil {
		var ok bool
		password, ok = u.User.Password()
		if !ok {
			password = ""

	var p string
	host, p, _ = net.SplitHostPort(u.Host)

	if p != "" {
		port, err = strconv.Atoi(p)
		if err != nil {
			log.Fatalf("Unable to convert port to integer for %s", err)

	if dbS != "" {
		db, err = strconv.Atoi(dbS)
		if err != nil {
			log.Fatalf("Unable to convert db to integer for %s", dbS)

	client := CreateClient(host, password, port, db)
	return &RedisServer{client, host, port, db, password}, nil

func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
	keyChan := make(chan redisKey, 1000)
	split := make(chan []string)

	splitter := func() {
		defer wg.Done()
		defer close(keyChan)
		for {
			select {
			case ks, ok := <-split:
				if !ok {
				for _, k := range ks {
					keyChan <- redisKey(k)

	keyScanner := func(c chan redisKey) {
		defer wg.Done()
		var cursor uint64
		var n int
		for {
			var keys []string
			var err error
			// http://redis.io/commands/scan
			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
			if err != nil {
				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
			split <- keys

			n += len(keys)
			if cursor == 0 {

	go splitter()

	go keyScanner(keyChan)

	return keyChan

func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
	s, err := src.client.Get(string(key)).Result()
	if err != nil {
		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
	d, _ := dst.client.Get(string(key)).Result()
	isMatch := reflect.DeepEqual(s, d)
	return s, d, isMatch


func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
	go func() {
		defer wg.Done()
		for {
			select {
			case _, ok := <-p.shutdown:
				if !ok {
			case k, ok := <-c:
				if !ok {
				s, d, isMatch := p.compare(p.from, p.to, k)
				if !isMatch {
					mismatches <- &Discrepancy{k, s, d}

func CreateClient(host, pass string, port, db int) *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:     fmt.Sprintf("%s:%d", host, port),
		Password: pass,
		DB:       db,

func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
	defer wg.Done()
	i := *del
	for d := range c {
		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)

func main() {
	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
	match := flag.String("keys", "*", "Match subset of keys `*`")
	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
	if *src == "" {
	from, _ := parseRedisURI(*src)
	to, _ := parseRedisURI(*dst)

	var wg sync.WaitGroup
	shutdown := make(chan bool, 1)
	discrepancies := make(chan *Discrepancy)
	pipe := &RedisPipe{from, to, *match, shutdown}
	keyChan := pipe.from.scanner(pattern(*match), &wg)

	tx := *threads
	for i := 0; i < tx; i++ {
		p := &RedisPipe{from, to, *match, shutdown}
		p.CompareKeys(keyChan, discrepancies, &wg)

	// Setup Writer
	var wgWriter sync.WaitGroup
	go writer(discrepancies, &wgWriter, delimiter)

	// Wait for threads to complete
	// Start cleanup routine for writer
	// Wait for writer to close fn
package main

import (

var (
	bucketName        = flag.String("bucket", "", "Upload bucket")
	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
	db                = flag.String("db", "", "db name")
	username          = flag.String("username", "", "user name")
	password          = flag.String("password", "", "password")
	host              = flag.String("host", "", "host:port")
	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
	pReader, pWriter  = io.Pipe()

	wg sync.WaitGroup

	bucket *s3gof3r.Bucket
	date   string

func mustGetEnv(key string) string {
	s := os.Getenv(key)
	if s == "" {
		log.Fatalf("Missing ENV %s", key)
	return s

func createBackup() error {
	defer pWriter.Close()
	defer wg.Done()
	name, err := exec.LookPath(*mongodump)
	if err != nil {
		log.Fatalf("Mongodump cannot be found on path")
	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
	// 3.0.5 in homebrew is missing --archive
	// 3.2 is where archive to STDOUT became available
	if *excludeCollection != "" {
		*excludeCollection = "--excludeCollection=" + *excludeCollection
	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
	cmd := exec.Command(name, args...)
	cmd.Stdout = pWriter
	cmd.Stderr = os.Stderr
	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
	err = cmd.Run()
	if err != nil {
		return err
	return nil

func pseudo_uuid() (uuid string) {
	// Credit: http://stackoverflow.com/a/25736155
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		fmt.Println("Error: ", err)

	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])


func setupFlags() {
	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
	fatal := false
	for _, f := range flags {
		fl := flag.Lookup(f)
		s := fl.Value.String()
		if s == "" {
			fatal = true
			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
	if fatal {
		log.Fatal("Exiting because of missing flags.")

func setupS3() *s3gof3r.Bucket {
	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
	keys := s3gof3r.Keys{
		AccessKey: awsAccessKey,
		SecretKey: awsSecretKey,
	s3 := s3gof3r.New("", keys)
	return s3.Bucket(*bucketName)

func generateS3Key() string {
	now := time.Now().Format("2006-01-02/15")
	prefix := ""
	if *keyPrefix != "" {
		prefix = *keyPrefix + "/"
	uuid := pseudo_uuid()
	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)

func main() {
	bucket := setupS3()

	go createBackup()

	s3Key := generateS3Key()
	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
	w, err := bucket.PutWriter(s3Key, nil, nil)
	if err != nil {
		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
	defer func() {
		log.Printf("Successfully uploaded %s", output)

	log.Printf("Uploading to %s", output)
	written, err := io.Copy(w, pReader)
	if err != nil {
		log.Printf("Error Uploading to %s, ERROR: %s", output, err)


	log.Printf("Attempting to write %d bytes", written)