12 Sep 2016

Good Dull Best Practices in Operations

Tonight I read this paper: pdf or archive.

And I was impressed by the boring and solid guidelines therein.

My takeaways were:

  • Immutable/recreatable servers and infrastructure. Poignant b/c of an EC2 hardware failure today and needing to recover by snapshotting the root drive and reattaching to new server.
  • Instrument all the things.
  • Spread testing across unit, integration and multi-service tests
  • Gradual deployment of new services/updates. As a colleague and friend would say, “bake it in production for a little while”.
  • “Proven technology is almost always better than operating on the bleeding edge. Stable software is better than an early copy, no matter how valuable the new feature seems.” Bleeding edge is named that way for a reason. I find this tension between proven technology and bleeding edge to be on my mind lately.

21 Jul 2016

Dull and Reliable Golang

I’ve been working on systems lately that are suited for Golang:

  • Memory sensitive
  • Performance sensitive
  • Stability sensitive

And have been very happy with the outcomes of developing tools in Golang.

Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):

It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.

TL;DR - Go’s going well and I tend to reach for it when solving systems issues.

Links and src below

Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).

package main

import (
	"flag"
	"fmt"
	"gopkg.in/redis.v4"
	"log"
	"net"
	"net/url"
	"os"
	"reflect"
	"strconv"
	"sync"
)

type redisKey string
type pattern string

type RedisPipe struct {
	from     *RedisServer
	to       *RedisServer
	keys     string
	shutdown chan bool
}

type RedisServer struct {
	client *redis.Client
	host   string
	port   int
	db     int
	pass   string
}

type Discrepancy struct {
	key redisKey
	src interface{}
	dst interface{}
}

func parseRedisURI(s string) (server *RedisServer, err error) {
	// Defaults
	host := "localhost"
	password := ""
	port := 6379
	db := 0

	u, err := url.Parse(s)
	if err != nil {
		log.Fatal(err)
	}
	if u.Scheme != "redis" {
		log.Fatal("Scheme must be redis")
	}
	q := u.Query()
	dbS := q.Get("db")
	if u.User != nil {
		var ok bool
		password, ok = u.User.Password()
		if !ok {
			password = ""
		}
	}

	var p string
	host, p, _ = net.SplitHostPort(u.Host)

	if p != "" {
		port, err = strconv.Atoi(p)
		if err != nil {
			log.Fatalf("Unable to convert port to integer for %s", err)
		}
	}

	if dbS != "" {
		db, err = strconv.Atoi(dbS)
		if err != nil {
			log.Fatalf("Unable to convert db to integer for %s", dbS)
		}
	}

	client := CreateClient(host, password, port, db)
	return &RedisServer{client, host, port, db, password}, nil
}

func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
	keyChan := make(chan redisKey, 1000)
	split := make(chan []string)

	splitter := func() {
		defer wg.Done()
		defer close(keyChan)
		for {
			select {
			case ks, ok := <-split:
				if !ok {
					return
				}
				for _, k := range ks {
					keyChan <- redisKey(k)
				}
			}
		}
	}

	keyScanner := func(c chan redisKey) {
		defer wg.Done()
		var cursor uint64
		var n int
		for {
			var keys []string
			var err error
			// REDIS SCAN
			// http://redis.io/commands/scan
			// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
			keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
			if err != nil {
				log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
			}
			split <- keys

			n += len(keys)
			if cursor == 0 {
				close(split)
				return
			}
		}
	}

	wg.Add(1)
	go splitter()

	wg.Add(1)
	go keyScanner(keyChan)

	return keyChan
}

func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
	s, err := src.client.Get(string(key)).Result()
	if err != nil {
		log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
	}
	d, _ := dst.client.Get(string(key)).Result()
	isMatch := reflect.DeepEqual(s, d)
	return s, d, isMatch

}

func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case _, ok := <-p.shutdown:
				if !ok {
					return
				}
			case k, ok := <-c:
				if !ok {
					return
				}
				s, d, isMatch := p.compare(p.from, p.to, k)
				if !isMatch {
					mismatches <- &Discrepancy{k, s, d}
				}
			}
		}
	}()
}

func CreateClient(host, pass string, port, db int) *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:     fmt.Sprintf("%s:%d", host, port),
		Password: pass,
		DB:       db,
	})
}

func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
	defer wg.Done()
	i := *del
	for d := range c {
		fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
	}
}

func main() {
	src := flag.String("src", "", "Format redis://:password@host:port?db=0")
	dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
	threads := flag.Int("parallel", 20, "Threading count. Default `20`")
	match := flag.String("keys", "*", "Match subset of keys `*`")
	delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
	flag.Parse()
	if *src == "" {
		flag.Usage()
		os.Exit(1)
	}
	from, _ := parseRedisURI(*src)
	to, _ := parseRedisURI(*dst)

	var wg sync.WaitGroup
	shutdown := make(chan bool, 1)
	discrepancies := make(chan *Discrepancy)
	pipe := &RedisPipe{from, to, *match, shutdown}
	keyChan := pipe.from.scanner(pattern(*match), &wg)

	tx := *threads
	for i := 0; i < tx; i++ {
		p := &RedisPipe{from, to, *match, shutdown}
		p.CompareKeys(keyChan, discrepancies, &wg)
	}

	// Setup Writer
	var wgWriter sync.WaitGroup
	wgWriter.Add(1)
	go writer(discrepancies, &wgWriter, delimiter)

	// Wait for threads to complete
	wg.Wait()
	// Start cleanup routine for writer
	close(discrepancies)
	// Wait for writer to close fn
	wgWriter.Wait()
}
package main

import (
	"crypto/rand"
	"flag"
	"fmt"
	"github.com/rlmcpherson/s3gof3r"
	"io"
	"log"
	"os"
	"os/exec"
	"strings"
	"sync"
	"time"
)

var (
	bucketName        = flag.String("bucket", "", "Upload bucket")
	keyPrefix         = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
	mongodump         = flag.String("mongodump", "mongodump", "Mongodump bin name")
	db                = flag.String("db", "", "db name")
	username          = flag.String("username", "", "user name")
	password          = flag.String("password", "", "password")
	host              = flag.String("host", "", "host:port")
	excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
	pReader, pWriter  = io.Pipe()

	wg sync.WaitGroup

	bucket *s3gof3r.Bucket
	date   string
)

func mustGetEnv(key string) string {
	s := os.Getenv(key)
	if s == "" {
		log.Fatalf("Missing ENV %s", key)
	}
	return s
}

func createBackup() error {
	defer pWriter.Close()
	defer wg.Done()
	wg.Add(1)
	name, err := exec.LookPath(*mongodump)
	if err != nil {
		log.Fatalf("Mongodump cannot be found on path")
	}
	// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
	// 3.0.5 in homebrew is missing --archive
	// 3.2 is where archive to STDOUT became available
	if *excludeCollection != "" {
		*excludeCollection = "--excludeCollection=" + *excludeCollection
	}
	args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
	cmd := exec.Command(name, args...)
	cmd.Stdout = pWriter
	cmd.Stderr = os.Stderr
	log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
	err = cmd.Run()
	if err != nil {
		return err
	}
	return nil
}

func pseudo_uuid() (uuid string) {
	// Credit: http://stackoverflow.com/a/25736155
	b := make([]byte, 16)
	_, err := rand.Read(b)
	if err != nil {
		fmt.Println("Error: ", err)
		return
	}

	uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])

	return
}

func setupFlags() {
	flag.Parse()
	flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
	fatal := false
	for _, f := range flags {
		fl := flag.Lookup(f)
		s := fl.Value.String()
		if s == "" {
			fatal = true
			log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
		}
	}
	if fatal {
		log.Fatal("Exiting because of missing flags.")
	}
}

func setupS3() *s3gof3r.Bucket {
	awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
	awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
	keys := s3gof3r.Keys{
		AccessKey: awsAccessKey,
		SecretKey: awsSecretKey,
	}
	s3 := s3gof3r.New("", keys)
	return s3.Bucket(*bucketName)
}

func generateS3Key() string {
	now := time.Now().Format("2006-01-02/15")
	prefix := ""
	if *keyPrefix != "" {
		prefix = *keyPrefix + "/"
	}
	uuid := pseudo_uuid()
	return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
}

func main() {
	setupFlags()
	bucket := setupS3()

	go createBackup()

	s3Key := generateS3Key()
	output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
	w, err := bucket.PutWriter(s3Key, nil, nil)
	if err != nil {
		log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
	}
	defer func() {
		w.Close()
		log.Printf("Successfully uploaded %s", output)
	}()

	log.Printf("Uploading to %s", output)
	written, err := io.Copy(w, pReader)
	if err != nil {
		log.Printf("Error Uploading to %s, ERROR: %s", output, err)
	}

	wg.Wait()

	log.Printf("Attempting to write %d bytes", written)
}