21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

  • Redis Diff

    package main
    
    import (
    	"flag"
    	"fmt"
    	"gopkg.in/redis.v4"
    	"log"
    	"net"
    	"net/url"
    	"os"
    	"reflect"
    	"strconv"
    	"sync"
    )
    
    type redisKey string
    type pattern string
    
    type RedisPipe struct {
    	from     *RedisServer
    	to       *RedisServer
    	keys     string
    	shutdown chan bool
    }
    
    type RedisServer struct {
    	client *redis.Client
    	host   string
    	port   int
    	db     int
    	pass   string
    }
    
    type Discrepancy struct {
    	key redisKey
    	src interface{}
    	dst interface{}
    }
    
    func parseRedisURI(s string) (server *RedisServer, err error) {
    	// Defaults
    	host := "localhost"
    	password := ""
    	port := 6379
    	db := 0
    
    	u, err := url.Parse(s)
    	if err != nil {
    		log.Fatal(err)
    	}
    	if u.Scheme != "redis" {
    		log.Fatal("Scheme must be redis")
    	}
    	q := u.Query()
    	dbS := q.Get("db")
    	if u.User != nil {
    		var ok bool
    		password, ok = u.User.Password()
    		if !ok {
    			password = ""
    		}
    	}
    
    	var p string
    	host, p, _ = net.SplitHostPort(u.Host)
    
    	if p != "" {
    		port, err = strconv.Atoi(p)
    		if err != nil {
    			log.Fatalf("Unable to convert port to integer for %s", err)
    		}
    	}
    
    	if dbS != "" {
    		db, err = strconv.Atoi(dbS)
    		if err != nil {
    			log.Fatalf("Unable to convert db to integer for %s", dbS)
    		}
    	}
    
    	client := CreateClient(host, password, port, db)
    	return &RedisServer{client, host, port, db, password}, nil
    }
    
    func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
    	keyChan := make(chan redisKey, 1000)
    	split := make(chan []string)
    
    	splitter := func() {
    		defer wg.Done()
    		defer close(keyChan)
    		for {
    			select {
    			case ks, ok := <-split:
    				if !ok {
    					return
    				}
    				for _, k := range ks {
    					keyChan <- redisKey(k)
    				}
    			}
    		}
    	}
    
    	keyScanner := func(c chan redisKey) {
    		defer wg.Done()
    		var cursor uint64
    		var n int
    		for {
    			var keys []string
    			var err error
    			// REDIS SCAN
    			// http://redis.io/commands/scan
    			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
    			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
    			if err != nil {
    				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
    			}
    			split <- keys
    
    			n += len(keys)
    			if cursor == 0 {
    				close(split)
    				return
    			}
    		}
    	}
    
    	wg.Add(1)
    	go splitter()
    
    	wg.Add(1)
    	go keyScanner(keyChan)
    
    	return keyChan
    }
    
    func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
    	s, err := src.client.Get(string(key)).Result()
    	if err != nil {
    		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
    	}
    	d, _ := dst.client.Get(string(key)).Result()
    	isMatch := reflect.DeepEqual(s, d)
    	return s, d, isMatch
    
    }
    
    func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
    	wg.Add(1)
    	go func() {
    		defer wg.Done()
    		for {
    			select {
    			case _, ok := <-p.shutdown:
    				if !ok {
    					return
    				}
    			case k, ok := <-c:
    				if !ok {
    					return
    				}
    				s, d, isMatch := p.compare(p.from, p.to, k)
    				if !isMatch {
    					mismatches <- &Discrepancy{k, s, d}
    				}
    			}
    		}
    	}()
    }
    
    func CreateClient(host, pass string, port, db int) *redis.Client {
    	return redis.NewClient(&redis.Options{
    		Addr:     fmt.Sprintf("%s:%d", host, port),
    		Password: pass,
    		DB:       db,
    	})
    }
    
    func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
    	defer wg.Done()
    	i := *del
    	for d := range c {
    		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
    	}
    }
    
    func main() {
    	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
    	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
    	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
    	match := flag.String("keys", "*", "Match subset of keys `*`")
    	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
    	flag.Parse()
    	if *src == "" {
    		flag.Usage()
    		os.Exit(1)
    	}
    	from, _ := parseRedisURI(*src)
    	to, _ := parseRedisURI(*dst)
    
    	var wg sync.WaitGroup
    	shutdown := make(chan bool, 1)
    	discrepancies := make(chan *Discrepancy)
    	pipe := &RedisPipe{from, to, *match, shutdown}
    	keyChan := pipe.from.scanner(pattern(*match), &wg)
    
    	tx := *threads
    	for i := 0; i < tx; i++ {
    		p := &RedisPipe{from, to, *match, shutdown}
    		p.CompareKeys(keyChan, discrepancies, &wg)
    	}
    
    	// Setup Writer
    	var wgWriter sync.WaitGroup
    	wgWriter.Add(1)
    	go writer(discrepancies, &wgWriter, delimiter)
    
    	// Wait for threads to complete
    	wg.Wait()
    	// Start cleanup routine for writer
    	close(discrepancies)
    	// Wait for writer to close fn
    	wgWriter.Wait()
    }
    
  • GH and Mongo archival to S3

    package main
    
    import (
    	"crypto/rand"
    	"flag"
    	"fmt"
    	"github.com/rlmcpherson/s3gof3r"
    	"io"
    	"log"
    	"os"
    	"os/exec"
    	"strings"
    	"sync"
    	"time"
    )
    
    var (
    	bucketName        = flag.String("bucket", "", "Upload bucket")
    	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
    	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
    	db                = flag.String("db", "", "db name")
    	username          = flag.String("username", "", "user name")
    	password          = flag.String("password", "", "password")
    	host              = flag.String("host", "", "host:port")
    	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
    	pReader, pWriter  = io.Pipe()
    
    	wg sync.WaitGroup
    
    	bucket *s3gof3r.Bucket
    	date   string
    )
    
    func mustGetEnv(key string) string {
    	s := os.Getenv(key)
    	if s == "" {
    		log.Fatalf("Missing ENV %s", key)
    	}
    	return s
    }
    
    func createBackup() error {
    	defer pWriter.Close()
    	defer wg.Done()
    	wg.Add(1)
    	name, err := exec.LookPath(*mongodump)
    	if err != nil {
    		log.Fatalf("Mongodump cannot be found on path")
    	}
    	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
    	// 3.0.5 in homebrew is missing --archive
    	// 3.2 is where archive to STDOUT became available
    	if *excludeCollection != "" {
    		*excludeCollection = "--excludeCollection=" + *excludeCollection
    	}
    	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
    	cmd := exec.Command(name, args...)
    	cmd.Stdout = pWriter
    	cmd.Stderr = os.Stderr
    	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
    	err = cmd.Run()
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    func pseudo_uuid() (uuid string) {
    	// Credit: http://stackoverflow.com/a/25736155
    	b := make([]byte, 16)
    	_, err := rand.Read(b)
    	if err != nil {
    		fmt.Println("Error: ", err)
    		return
    	}
    
    	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
    
    	return
    }
    
    func setupFlags() {
    	flag.Parse()
    	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
    	fatal := false
    	for _, f := range flags {
    		fl := flag.Lookup(f)
    		s := fl.Value.String()
    		if s == "" {
    			fatal = true
    			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
    		}
    	}
    	if fatal {
    		log.Fatal("Exiting because of missing flags.")
    	}
    }
    
    func setupS3() *s3gof3r.Bucket {
    	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
    	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
    	keys := s3gof3r.Keys{
    		AccessKey: awsAccessKey,
    		SecretKey: awsSecretKey,
    	}
    	s3 := s3gof3r.New("", keys)
    	return s3.Bucket(*bucketName)
    }
    
    func generateS3Key() string {
    	now := time.Now().Format("2006-01-02/15")
    	prefix := ""
    	if *keyPrefix != "" {
    		prefix = *keyPrefix + "/"
    	}
    	uuid := pseudo_uuid()
    	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
    }
    
    func main() {
    	setupFlags()
    	bucket := setupS3()
    
    	go createBackup()
    
    	s3Key := generateS3Key()
    	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
    	w, err := bucket.PutWriter(s3Key, nil, nil)
    	if err != nil {
    		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
    	}
    	defer func() {
    		w.Close()
    		log.Printf("Successfully uploaded %s", output)
    	}()
    
    	log.Printf("Uploading to %s", output)
    	written, err := io.Copy(w, pReader)
    	if err != nil {
    		log.Printf("Error Uploading to %s, ERROR: %s", output, err)
    	}
    
    	wg.Wait()
    
    	log.Printf("Attempting to write %d bytes", written)
    }