20 Jan 2017

Announcing MoreSQL (Realtime Mongo -> PG streaming)

I’m proud to announce that MoreSQL is live and production ready :)!

We’ve been using it in production for a few months to stream production data mutations from Mongo to PostgreSQL. The latency is normally subsecond and has scaled well with a small footprint in the two production use cases.

Background

It’s written in Golang and conceived of based on a Ruby project called mosql (built by the wonderful folks over at Stripe). After using that in production our telemetry revealed that it was lagging behind during mutation intensive periods. We were further stymied from upgrading MongoDB due to version incompatibilities in MoSQL.

Implementing the project from the ground up in Golang allowed for better concurrency, lower latency, and a small memory footprint (often 20-50MB under load).

License

Moresql is released under a permissive license and is open source software. You’re welcome to set it up in production environments and submit pull requests to improve the project.

Consulting

If you’d like a turnkey solution implemented for your business in order to have realtime data from Mongo sent to Postgres for reporting, analytics, query performance or as a mongo to postgres migration strategy, send me a note. As the author of Moresql and with running this solution in production on different systems, I’m ready to solve your problem!

Update

We used this project in production from 2017-2019 for our primary data storage and using MoreSQL was a critical component of moving off of MongoDB.

21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

package main

import (
	"flag"
	"fmt"
	"gopkg.in/redis.v4"
	"log"
	"net"
	"net/url"
	"os"
	"reflect"
	"strconv"
	"sync"
)

type redisKey string
type pattern string

type RedisPipe struct {
	from     *RedisServer
	to       *RedisServer
	keys     string
	shutdown chan bool
}

type RedisServer struct {
	client *redis.Client
	host   string
	port   int
	db     int
	pass   string
}

type Discrepancy struct {
	key redisKey
	src interface{}
	dst interface{}
}

func parseRedisURI(s string) (server *RedisServer, err error) {
	// Defaults
	host := "localhost"
	password := ""
	port := 6379
	db := 0

	u, err := url.Parse(s)
	if err != nil {
		log.Fatal(err)
	}
	if u.Scheme != "redis" {
		log.Fatal("Scheme must be redis")
	}
	q := u.Query()
	dbS := q.Get("db")
	if u.User != nil {
		var ok bool
		password, ok = u.User.Password()
		if !ok {
			password = ""
		}
	}

	var p string
	host, p, _ = net.SplitHostPort(u.Host)

	if p != "" {
		port, err = strconv.Atoi(p)
		if err != nil {
			log.Fatalf("Unable to convert port to integer for %s", err)
		}
	}

	if dbS != "" {
		db, err = strconv.Atoi(dbS)
		if err != nil {
			log.Fatalf("Unable to convert db to integer for %s", dbS)
		}
	}

	client := CreateClient(host, password, port, db)
	return &RedisServer{client, host, port, db, password}, nil
}

func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
	keyChan := make(chan redisKey, 1000)
	split := make(chan []string)

	splitter := func() {
		defer wg.Done()
		defer close(keyChan)
		for {
			select {
			case ks, ok := <-split:
				if !ok {
					return
				}
				for _, k := range ks {
					keyChan <- redisKey(k)
				}
			}
		}
	}

	keyScanner := func(c chan redisKey) {
		defer wg.Done()
		var cursor uint64
		var n int
		for {
			var keys []string
			var err error
			// REDIS SCAN
			// http://redis.io/commands/scan
			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
			if err != nil {
				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
			}
			split <- keys

			n += len(keys)
			if cursor == 0 {
				close(split)
				return
			}
		}
	}

	wg.Add(1)
	go splitter()

	wg.Add(1)
	go keyScanner(keyChan)

	return keyChan
}

func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
	s, err := src.client.Get(string(key)).Result()
	if err != nil {
		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
	}
	d, _ := dst.client.Get(string(key)).Result()
	isMatch := reflect.DeepEqual(s, d)
	return s, d, isMatch

}

func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case _, ok := <-p.shutdown:
				if !ok {
					return
				}
			case k, ok := <-c:
				if !ok {
					return
				}
				s, d, isMatch := p.compare(p.from, p.to, k)
				if !isMatch {
					mismatches <- &Discrepancy{k, s, d}
				}
			}
		}
	}()
}

func CreateClient(host, pass string, port, db int) *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:     fmt.Sprintf("%s:%d", host, port),
		Password: pass,
		DB:       db,
	})
}

func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
	defer wg.Done()
	i := *del
	for d := range c {
		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
	}
}

func main() {
	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
	match := flag.String("keys", "*", "Match subset of keys `*`")
	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
	flag.Parse()
	if *src == "" {
		flag.Usage()
		os.Exit(1)
	}
	from, _ := parseRedisURI(*src)
	to, _ := parseRedisURI(*dst)

	var wg sync.WaitGroup
	shutdown := make(chan bool, 1)
	discrepancies := make(chan *Discrepancy)
	pipe := &RedisPipe{from, to, *match, shutdown}
	keyChan := pipe.from.scanner(pattern(*match), &wg)

	tx := *threads
	for i := 0; i < tx; i++ {
		p := &RedisPipe{from, to, *match, shutdown}
		p.CompareKeys(keyChan, discrepancies, &wg)
	}

	// Setup Writer
	var wgWriter sync.WaitGroup
	wgWriter.Add(1)
	go writer(discrepancies, &wgWriter, delimiter)

	// Wait for threads to complete
	wg.Wait()
	// Start cleanup routine for writer
	close(discrepancies)
	// Wait for writer to close fn
	wgWriter.Wait()
}
package main

import (
	"crypto/rand"
	"flag"
	"fmt"
	"github.com/rlmcpherson/s3gof3r"
	"io"
	"log"
	"os"
	"os/exec"
	"strings"
	"sync"
	"time"
)

var (
	bucketName        = flag.String("bucket", "", "Upload bucket")
	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
	db                = flag.String("db", "", "db name")
	username          = flag.String("username", "", "user name")
	password          = flag.String("password", "", "password")
	host              = flag.String("host", "", "host:port")
	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
	pReader, pWriter  = io.Pipe()

	wg sync.WaitGroup

	bucket *s3gof3r.Bucket
	date   string
)

func mustGetEnv(key string) string {
	s := os.Getenv(key)
	if s == "" {
		log.Fatalf("Missing ENV %s", key)
	}
	return s
}

func createBackup() error {
	defer pWriter.Close()
	defer wg.Done()
	wg.Add(1)
	name, err := exec.LookPath(*mongodump)
	if err != nil {
		log.Fatalf("Mongodump cannot be found on path")
	}
	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
	// 3.0.5 in homebrew is missing --archive
	// 3.2 is where archive to STDOUT became available
	if *excludeCollection != "" {
		*excludeCollection = "--excludeCollection=" + *excludeCollection
	}
	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
	cmd := exec.Command(name, args...)
	cmd.Stdout = pWriter
	cmd.Stderr = os.Stderr
	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
	err = cmd.Run()
	if err != nil {
		return err
	}
	return nil
}

func pseudo_uuid() (uuid string) {
	// Credit: http://stackoverflow.com/a/25736155
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		fmt.Println("Error: ", err)
		return
	}

	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])

	return
}

func setupFlags() {
	flag.Parse()
	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
	fatal := false
	for _, f := range flags {
		fl := flag.Lookup(f)
		s := fl.Value.String()
		if s == "" {
			fatal = true
			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
		}
	}
	if fatal {
		log.Fatal("Exiting because of missing flags.")
	}
}

func setupS3() *s3gof3r.Bucket {
	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
	keys := s3gof3r.Keys{
		AccessKey: awsAccessKey,
		SecretKey: awsSecretKey,
	}
	s3 := s3gof3r.New("", keys)
	return s3.Bucket(*bucketName)
}

func generateS3Key() string {
	now := time.Now().Format("2006-01-02/15")
	prefix := ""
	if *keyPrefix != "" {
		prefix = *keyPrefix + "/"
	}
	uuid := pseudo_uuid()
	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
}

func main() {
	setupFlags()
	bucket := setupS3()

	go createBackup()

	s3Key := generateS3Key()
	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
	w, err := bucket.PutWriter(s3Key, nil, nil)
	if err != nil {
		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
	}
	defer func() {
		w.Close()
		log.Printf("Successfully uploaded %s", output)
	}()

	log.Printf("Uploading to %s", output)
	written, err := io.Copy(w, pReader)
	if err != nil {
		log.Printf("Error Uploading to %s, ERROR: %s", output, err)
	}

	wg.Wait()

	log.Printf("Attempting to write %d bytes", written)
}