20 Jan 2017

Announcing MoreSQL (Realtime Mongo -> PG streaming)

I’m proud to announce that MoreSQL is live and production ready :)!

We’ve been using it in production for a few months to stream production data mutations from Mongo to PostgreSQL. The latency is normally subsecond and has scaled well with a small footprint in the two production use cases.

Background

It’s written in Golang and conceived of based on a Ruby project called mosql. After using that in production our telemetry revealed that it was lagging behind during mutation intensive periods. Implementing the project from the ground up in Golang allowed for better concurrency, lower latency, and a small memory footprint (often 20-50MB under load).

License

Moresql is released under a permissive license and is open source software. You’re welcome to set it up in production environments and submit pull requests to improve the project.

Consulting

If you’d like a turnkey solution implemented for your business in order to have realtime data from Mongo sent to Postgres for reporting, analytics, query performance or as a mongo to postgres migration strategy, send me a note. As the author of Moresql and with running this solution in production on different systems, I’m ready to solve your problem!

21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

  • Redis Diff

    package main
    
    import (
    	"flag"
    	"fmt"
    	"gopkg.in/redis.v4"
    	"log"
    	"net"
    	"net/url"
    	"os"
    	"reflect"
    	"strconv"
    	"sync"
    )
    
    type redisKey string
    type pattern string
    
    type RedisPipe struct {
    	from     *RedisServer
    	to       *RedisServer
    	keys     string
    	shutdown chan bool
    }
    
    type RedisServer struct {
    	client *redis.Client
    	host   string
    	port   int
    	db     int
    	pass   string
    }
    
    type Discrepancy struct {
    	key redisKey
    	src interface{}
    	dst interface{}
    }
    
    func parseRedisURI(s string) (server *RedisServer, err error) {
    	// Defaults
    	host := "localhost"
    	password := ""
    	port := 6379
    	db := 0
    
    	u, err := url.Parse(s)
    	if err != nil {
    		log.Fatal(err)
    	}
    	if u.Scheme != "redis" {
    		log.Fatal("Scheme must be redis")
    	}
    	q := u.Query()
    	dbS := q.Get("db")
    	if u.User != nil {
    		var ok bool
    		password, ok = u.User.Password()
    		if !ok {
    			password = ""
    		}
    	}
    
    	var p string
    	host, p, _ = net.SplitHostPort(u.Host)
    
    	if p != "" {
    		port, err = strconv.Atoi(p)
    		if err != nil {
    			log.Fatalf("Unable to convert port to integer for %s", err)
    		}
    	}
    
    	if dbS != "" {
    		db, err = strconv.Atoi(dbS)
    		if err != nil {
    			log.Fatalf("Unable to convert db to integer for %s", dbS)
    		}
    	}
    
    	client := CreateClient(host, password, port, db)
    	return &RedisServer{client, host, port, db, password}, nil
    }
    
    func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
    	keyChan := make(chan redisKey, 1000)
    	split := make(chan []string)
    
    	splitter := func() {
    		defer wg.Done()
    		defer close(keyChan)
    		for {
    			select {
    			case ks, ok := <-split:
    				if !ok {
    					return
    				}
    				for _, k := range ks {
    					keyChan <- redisKey(k)
    				}
    			}
    		}
    	}
    
    	keyScanner := func(c chan redisKey) {
    		defer wg.Done()
    		var cursor uint64
    		var n int
    		for {
    			var keys []string
    			var err error
    			// REDIS SCAN
    			// http://redis.io/commands/scan
    			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
    			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
    			if err != nil {
    				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
    			}
    			split <- keys
    
    			n += len(keys)
    			if cursor == 0 {
    				close(split)
    				return
    			}
    		}
    	}
    
    	wg.Add(1)
    	go splitter()
    
    	wg.Add(1)
    	go keyScanner(keyChan)
    
    	return keyChan
    }
    
    func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
    	s, err := src.client.Get(string(key)).Result()
    	if err != nil {
    		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
    	}
    	d, _ := dst.client.Get(string(key)).Result()
    	isMatch := reflect.DeepEqual(s, d)
    	return s, d, isMatch
    
    }
    
    func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
    	wg.Add(1)
    	go func() {
    		defer wg.Done()
    		for {
    			select {
    			case _, ok := <-p.shutdown:
    				if !ok {
    					return
    				}
    			case k, ok := <-c:
    				if !ok {
    					return
    				}
    				s, d, isMatch := p.compare(p.from, p.to, k)
    				if !isMatch {
    					mismatches <- &Discrepancy{k, s, d}
    				}
    			}
    		}
    	}()
    }
    
    func CreateClient(host, pass string, port, db int) *redis.Client {
    	return redis.NewClient(&redis.Options{
    		Addr:     fmt.Sprintf("%s:%d", host, port),
    		Password: pass,
    		DB:       db,
    	})
    }
    
    func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
    	defer wg.Done()
    	i := *del
    	for d := range c {
    		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
    	}
    }
    
    func main() {
    	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
    	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
    	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
    	match := flag.String("keys", "*", "Match subset of keys `*`")
    	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
    	flag.Parse()
    	if *src == "" {
    		flag.Usage()
    		os.Exit(1)
    	}
    	from, _ := parseRedisURI(*src)
    	to, _ := parseRedisURI(*dst)
    
    	var wg sync.WaitGroup
    	shutdown := make(chan bool, 1)
    	discrepancies := make(chan *Discrepancy)
    	pipe := &RedisPipe{from, to, *match, shutdown}
    	keyChan := pipe.from.scanner(pattern(*match), &wg)
    
    	tx := *threads
    	for i := 0; i < tx; i++ {
    		p := &RedisPipe{from, to, *match, shutdown}
    		p.CompareKeys(keyChan, discrepancies, &wg)
    	}
    
    	// Setup Writer
    	var wgWriter sync.WaitGroup
    	wgWriter.Add(1)
    	go writer(discrepancies, &wgWriter, delimiter)
    
    	// Wait for threads to complete
    	wg.Wait()
    	// Start cleanup routine for writer
    	close(discrepancies)
    	// Wait for writer to close fn
    	wgWriter.Wait()
    }
    
  • GH and Mongo archival to S3

    package main
    
    import (
    	"crypto/rand"
    	"flag"
    	"fmt"
    	"github.com/rlmcpherson/s3gof3r"
    	"io"
    	"log"
    	"os"
    	"os/exec"
    	"strings"
    	"sync"
    	"time"
    )
    
    var (
    	bucketName        = flag.String("bucket", "", "Upload bucket")
    	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
    	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
    	db                = flag.String("db", "", "db name")
    	username          = flag.String("username", "", "user name")
    	password          = flag.String("password", "", "password")
    	host              = flag.String("host", "", "host:port")
    	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
    	pReader, pWriter  = io.Pipe()
    
    	wg sync.WaitGroup
    
    	bucket *s3gof3r.Bucket
    	date   string
    )
    
    func mustGetEnv(key string) string {
    	s := os.Getenv(key)
    	if s == "" {
    		log.Fatalf("Missing ENV %s", key)
    	}
    	return s
    }
    
    func createBackup() error {
    	defer pWriter.Close()
    	defer wg.Done()
    	wg.Add(1)
    	name, err := exec.LookPath(*mongodump)
    	if err != nil {
    		log.Fatalf("Mongodump cannot be found on path")
    	}
    	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
    	// 3.0.5 in homebrew is missing --archive
    	// 3.2 is where archive to STDOUT became available
    	if *excludeCollection != "" {
    		*excludeCollection = "--excludeCollection=" + *excludeCollection
    	}
    	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
    	cmd := exec.Command(name, args...)
    	cmd.Stdout = pWriter
    	cmd.Stderr = os.Stderr
    	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
    	err = cmd.Run()
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    func pseudo_uuid() (uuid string) {
    	// Credit: http://stackoverflow.com/a/25736155
    	b := make([]byte, 16)
    	_, err := rand.Read(b)
    	if err != nil {
    		fmt.Println("Error: ", err)
    		return
    	}
    
    	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
    
    	return
    }
    
    func setupFlags() {
    	flag.Parse()
    	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
    	fatal := false
    	for _, f := range flags {
    		fl := flag.Lookup(f)
    		s := fl.Value.String()
    		if s == "" {
    			fatal = true
    			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
    		}
    	}
    	if fatal {
    		log.Fatal("Exiting because of missing flags.")
    	}
    }
    
    func setupS3() *s3gof3r.Bucket {
    	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
    	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
    	keys := s3gof3r.Keys{
    		AccessKey: awsAccessKey,
    		SecretKey: awsSecretKey,
    	}
    	s3 := s3gof3r.New("", keys)
    	return s3.Bucket(*bucketName)
    }
    
    func generateS3Key() string {
    	now := time.Now().Format("2006-01-02/15")
    	prefix := ""
    	if *keyPrefix != "" {
    		prefix = *keyPrefix + "/"
    	}
    	uuid := pseudo_uuid()
    	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
    }
    
    func main() {
    	setupFlags()
    	bucket := setupS3()
    
    	go createBackup()
    
    	s3Key := generateS3Key()
    	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
    	w, err := bucket.PutWriter(s3Key, nil, nil)
    	if err != nil {
    		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
    	}
    	defer func() {
    		w.Close()
    		log.Printf("Successfully uploaded %s", output)
    	}()
    
    	log.Printf("Uploading to %s", output)
    	written, err := io.Copy(w, pReader)
    	if err != nil {
    		log.Printf("Error Uploading to %s, ERROR: %s", output, err)
    	}
    
    	wg.Wait()
    
    	log.Printf("Attempting to write %d bytes", written)
    }