21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

package main

import (
	"flag"
	"fmt"
	"gopkg.in/redis.v4"
	"log"
	"net"
	"net/url"
	"os"
	"reflect"
	"strconv"
	"sync"
)

type redisKey string
type pattern string

type RedisPipe struct {
	from     *RedisServer
	to       *RedisServer
	keys     string
	shutdown chan bool
}

type RedisServer struct {
	client *redis.Client
	host   string
	port   int
	db     int
	pass   string
}

type Discrepancy struct {
	key redisKey
	src interface{}
	dst interface{}
}

func parseRedisURI(s string) (server *RedisServer, err error) {
	// Defaults
	host := "localhost"
	password := ""
	port := 6379
	db := 0

	u, err := url.Parse(s)
	if err != nil {
		log.Fatal(err)
	}
	if u.Scheme != "redis" {
		log.Fatal("Scheme must be redis")
	}
	q := u.Query()
	dbS := q.Get("db")
	if u.User != nil {
		var ok bool
		password, ok = u.User.Password()
		if !ok {
			password = ""
		}
	}

	var p string
	host, p, _ = net.SplitHostPort(u.Host)

	if p != "" {
		port, err = strconv.Atoi(p)
		if err != nil {
			log.Fatalf("Unable to convert port to integer for %s", err)
		}
	}

	if dbS != "" {
		db, err = strconv.Atoi(dbS)
		if err != nil {
			log.Fatalf("Unable to convert db to integer for %s", dbS)
		}
	}

	client := CreateClient(host, password, port, db)
	return &RedisServer{client, host, port, db, password}, nil
}

func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
	keyChan := make(chan redisKey, 1000)
	split := make(chan []string)

	splitter := func() {
		defer wg.Done()
		defer close(keyChan)
		for {
			select {
			case ks, ok := <-split:
				if !ok {
					return
				}
				for _, k := range ks {
					keyChan <- redisKey(k)
				}
			}
		}
	}

	keyScanner := func(c chan redisKey) {
		defer wg.Done()
		var cursor uint64
		var n int
		for {
			var keys []string
			var err error
			// REDIS SCAN
			// http://redis.io/commands/scan
			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
			if err != nil {
				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
			}
			split <- keys

			n += len(keys)
			if cursor == 0 {
				close(split)
				return
			}
		}
	}

	wg.Add(1)
	go splitter()

	wg.Add(1)
	go keyScanner(keyChan)

	return keyChan
}

func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
	s, err := src.client.Get(string(key)).Result()
	if err != nil {
		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
	}
	d, _ := dst.client.Get(string(key)).Result()
	isMatch := reflect.DeepEqual(s, d)
	return s, d, isMatch

}

func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case _, ok := <-p.shutdown:
				if !ok {
					return
				}
			case k, ok := <-c:
				if !ok {
					return
				}
				s, d, isMatch := p.compare(p.from, p.to, k)
				if !isMatch {
					mismatches <- &Discrepancy{k, s, d}
				}
			}
		}
	}()
}

func CreateClient(host, pass string, port, db int) *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:     fmt.Sprintf("%s:%d", host, port),
		Password: pass,
		DB:       db,
	})
}

func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
	defer wg.Done()
	i := *del
	for d := range c {
		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
	}
}

func main() {
	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
	match := flag.String("keys", "*", "Match subset of keys `*`")
	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
	flag.Parse()
	if *src == "" {
		flag.Usage()
		os.Exit(1)
	}
	from, _ := parseRedisURI(*src)
	to, _ := parseRedisURI(*dst)

	var wg sync.WaitGroup
	shutdown := make(chan bool, 1)
	discrepancies := make(chan *Discrepancy)
	pipe := &RedisPipe{from, to, *match, shutdown}
	keyChan := pipe.from.scanner(pattern(*match), &wg)

	tx := *threads
	for i := 0; i < tx; i++ {
		p := &RedisPipe{from, to, *match, shutdown}
		p.CompareKeys(keyChan, discrepancies, &wg)
	}

	// Setup Writer
	var wgWriter sync.WaitGroup
	wgWriter.Add(1)
	go writer(discrepancies, &wgWriter, delimiter)

	// Wait for threads to complete
	wg.Wait()
	// Start cleanup routine for writer
	close(discrepancies)
	// Wait for writer to close fn
	wgWriter.Wait()
}
package main

import (
	"crypto/rand"
	"flag"
	"fmt"
	"github.com/rlmcpherson/s3gof3r"
	"io"
	"log"
	"os"
	"os/exec"
	"strings"
	"sync"
	"time"
)

var (
	bucketName        = flag.String("bucket", "", "Upload bucket")
	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
	db                = flag.String("db", "", "db name")
	username          = flag.String("username", "", "user name")
	password          = flag.String("password", "", "password")
	host              = flag.String("host", "", "host:port")
	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
	pReader, pWriter  = io.Pipe()

	wg sync.WaitGroup

	bucket *s3gof3r.Bucket
	date   string
)

func mustGetEnv(key string) string {
	s := os.Getenv(key)
	if s == "" {
		log.Fatalf("Missing ENV %s", key)
	}
	return s
}

func createBackup() error {
	defer pWriter.Close()
	defer wg.Done()
	wg.Add(1)
	name, err := exec.LookPath(*mongodump)
	if err != nil {
		log.Fatalf("Mongodump cannot be found on path")
	}
	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
	// 3.0.5 in homebrew is missing --archive
	// 3.2 is where archive to STDOUT became available
	if *excludeCollection != "" {
		*excludeCollection = "--excludeCollection=" + *excludeCollection
	}
	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
	cmd := exec.Command(name, args...)
	cmd.Stdout = pWriter
	cmd.Stderr = os.Stderr
	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
	err = cmd.Run()
	if err != nil {
		return err
	}
	return nil
}

func pseudo_uuid() (uuid string) {
	// Credit: http://stackoverflow.com/a/25736155
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		fmt.Println("Error: ", err)
		return
	}

	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])

	return
}

func setupFlags() {
	flag.Parse()
	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
	fatal := false
	for _, f := range flags {
		fl := flag.Lookup(f)
		s := fl.Value.String()
		if s == "" {
			fatal = true
			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
		}
	}
	if fatal {
		log.Fatal("Exiting because of missing flags.")
	}
}

func setupS3() *s3gof3r.Bucket {
	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
	keys := s3gof3r.Keys{
		AccessKey: awsAccessKey,
		SecretKey: awsSecretKey,
	}
	s3 := s3gof3r.New("", keys)
	return s3.Bucket(*bucketName)
}

func generateS3Key() string {
	now := time.Now().Format("2006-01-02/15")
	prefix := ""
	if *keyPrefix != "" {
		prefix = *keyPrefix + "/"
	}
	uuid := pseudo_uuid()
	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
}

func main() {
	setupFlags()
	bucket := setupS3()

	go createBackup()

	s3Key := generateS3Key()
	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
	w, err := bucket.PutWriter(s3Key, nil, nil)
	if err != nil {
		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
	}
	defer func() {
		w.Close()
		log.Printf("Successfully uploaded %s", output)
	}()

	log.Printf("Uploading to %s", output)
	written, err := io.Copy(w, pReader)
	if err != nil {
		log.Printf("Error Uploading to %s, ERROR: %s", output, err)
	}

	wg.Wait()

	log.Printf("Attempting to write %d bytes", written)
}