I’ve been working on systems lately that are suited for Golang:
- Memory sensitive
- Performance sensitive
- Stability sensitive
And have been very happy with the outcomes of developing tools in Golang.
Here’s a set of links to my recent work in Golang (some my own full creations while others are building on others work or extending/remixing their work):
It’s been productive and performant. I also sense that I could revisit these projects in a year or two and still grok what’s happening. Given how much I need to context switch between languages right now, I appreciate projects that are easily picked back up after a hiatus.
TL;DR - Go’s going well and I tend to reach for it when solving systems issues.
Links and src below
Big thanks to @adarqui for putting their code on Github. It helped with a data migration I was doing. And also YAY to open source since they merged back in my updates and improvements, using SCAN vs KEYS, to the project :).
package main
import (
"flag"
"fmt"
"gopkg.in/redis.v4"
"log"
"net"
"net/url"
"os"
"reflect"
"strconv"
"sync"
)
type redisKey string
type pattern string
type RedisPipe struct {
from *RedisServer
to *RedisServer
keys string
shutdown chan bool
}
type RedisServer struct {
client *redis.Client
host string
port int
db int
pass string
}
type Discrepancy struct {
key redisKey
src interface{}
dst interface{}
}
func parseRedisURI(s string) (server *RedisServer, err error) {
// Defaults
host := "localhost"
password := ""
port := 6379
db := 0
u, err := url.Parse(s)
if err != nil {
log.Fatal(err)
}
if u.Scheme != "redis" {
log.Fatal("Scheme must be redis")
}
q := u.Query()
dbS := q.Get("db")
if u.User != nil {
var ok bool
password, ok = u.User.Password()
if !ok {
password = ""
}
}
var p string
host, p, _ = net.SplitHostPort(u.Host)
if p != "" {
port, err = strconv.Atoi(p)
if err != nil {
log.Fatalf("Unable to convert port to integer for %s", err)
}
}
if dbS != "" {
db, err = strconv.Atoi(dbS)
if err != nil {
log.Fatalf("Unable to convert db to integer for %s", dbS)
}
}
client := CreateClient(host, password, port, db)
return &RedisServer{client, host, port, db, password}, nil
}
func (s *RedisServer) scanner(match pattern, wg *sync.WaitGroup) chan redisKey {
keyChan := make(chan redisKey, 1000)
split := make(chan []string)
splitter := func() {
defer wg.Done()
defer close(keyChan)
for {
select {
case ks, ok := <-split:
if !ok {
return
}
for _, k := range ks {
keyChan <- redisKey(k)
}
}
}
}
keyScanner := func(c chan redisKey) {
defer wg.Done()
var cursor uint64
var n int
for {
var keys []string
var err error
// REDIS SCAN
// http://redis.io/commands/scan
// Preferable because it doesn't lock complete database on larger keysets for 250ms+.
keys, cursor, err = s.client.Scan(cursor, string(match), 1000).Result()
if err != nil {
log.Fatal("KeysRedis: error obtaining keys list from redis: ", err)
}
split <- keys
n += len(keys)
if cursor == 0 {
close(split)
return
}
}
}
wg.Add(1)
go splitter()
wg.Add(1)
go keyScanner(keyChan)
return keyChan
}
func (p *RedisPipe) compare(src, dst *RedisServer, key redisKey) (interface{}, interface{}, bool) {
s, err := src.client.Get(string(key)).Result()
if err != nil {
log.Printf("Unable to get expected key %s from src: %+v", key, src.client)
}
d, _ := dst.client.Get(string(key)).Result()
isMatch := reflect.DeepEqual(s, d)
return s, d, isMatch
}
func (p *RedisPipe) CompareKeys(c chan redisKey, mismatches chan *Discrepancy, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case _, ok := <-p.shutdown:
if !ok {
return
}
case k, ok := <-c:
if !ok {
return
}
s, d, isMatch := p.compare(p.from, p.to, k)
if !isMatch {
mismatches <- &Discrepancy{k, s, d}
}
}
}
}()
}
func CreateClient(host, pass string, port, db int) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%s:%d", host, port),
Password: pass,
DB: db,
})
}
func writer(c chan *Discrepancy, wg *sync.WaitGroup, del *string) {
defer wg.Done()
i := *del
for d := range c {
fmt.Printf("%s%s%s%s%s\n", d.key, i, d.src, i, d.dst)
}
}
func main() {
src := flag.String("src", "", "Format redis://:password@host:port?db=0")
dst := flag.String("dst", "redis://localhost:6379", "redis://:password@host:port?db=0")
threads := flag.Int("parallel", 20, "Threading count. Default `20`")
match := flag.String("keys", "*", "Match subset of keys `*`")
delimiter := flag.String("delimiter", "|", "Delimiter that will be used to separate output")
flag.Parse()
if *src == "" {
flag.Usage()
os.Exit(1)
}
from, _ := parseRedisURI(*src)
to, _ := parseRedisURI(*dst)
var wg sync.WaitGroup
shutdown := make(chan bool, 1)
discrepancies := make(chan *Discrepancy)
pipe := &RedisPipe{from, to, *match, shutdown}
keyChan := pipe.from.scanner(pattern(*match), &wg)
tx := *threads
for i := 0; i < tx; i++ {
p := &RedisPipe{from, to, *match, shutdown}
p.CompareKeys(keyChan, discrepancies, &wg)
}
// Setup Writer
var wgWriter sync.WaitGroup
wgWriter.Add(1)
go writer(discrepancies, &wgWriter, delimiter)
// Wait for threads to complete
wg.Wait()
// Start cleanup routine for writer
close(discrepancies)
// Wait for writer to close fn
wgWriter.Wait()
}
package main
import (
"crypto/rand"
"flag"
"fmt"
"github.com/rlmcpherson/s3gof3r"
"io"
"log"
"os"
"os/exec"
"strings"
"sync"
"time"
)
var (
bucketName = flag.String("bucket", "", "Upload bucket")
keyPrefix = flag.String("prefix", "", "S3 key prefix, eg bucket/prefix/output")
mongodump = flag.String("mongodump", "mongodump", "Mongodump bin name")
db = flag.String("db", "", "db name")
username = flag.String("username", "", "user name")
password = flag.String("password", "", "password")
host = flag.String("host", "", "host:port")
excludeCollection = flag.String("excludeCollection", "", "collections to exclude")
pReader, pWriter = io.Pipe()
wg sync.WaitGroup
bucket *s3gof3r.Bucket
date string
)
func mustGetEnv(key string) string {
s := os.Getenv(key)
if s == "" {
log.Fatalf("Missing ENV %s", key)
}
return s
}
func createBackup() error {
defer pWriter.Close()
defer wg.Done()
wg.Add(1)
name, err := exec.LookPath(*mongodump)
if err != nil {
log.Fatalf("Mongodump cannot be found on path")
}
// TODO: test for newness of mongo Archive requires newish >= 3.1 version of mongodump
// 3.0.5 in homebrew is missing --archive
// 3.2 is where archive to STDOUT became available
if *excludeCollection != "" {
*excludeCollection = "--excludeCollection=" + *excludeCollection
}
args := []string{"--archive", "--db=" + *db, "--username=" + *username, "--password=" + *password, "--host=" + *host, *excludeCollection, "--gzip"}
cmd := exec.Command(name, args...)
cmd.Stdout = pWriter
cmd.Stderr = os.Stderr
log.Printf("CMD: $ %s %s", name, strings.Join(cmd.Args, " "))
err = cmd.Run()
if err != nil {
return err
}
return nil
}
func pseudo_uuid() (uuid string) {
// Credit: http://stackoverflow.com/a/25736155
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
fmt.Println("Error: ", err)
return
}
uuid = fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
return
}
func setupFlags() {
flag.Parse()
flags := []string{"bucket", "mongodump", "db", "username", "password", "host"}
fatal := false
for _, f := range flags {
fl := flag.Lookup(f)
s := fl.Value.String()
if s == "" {
fatal = true
log.Printf("Flag missing -%s which requires %s", fl.Name, fl.Usage)
}
}
if fatal {
log.Fatal("Exiting because of missing flags.")
}
}
func setupS3() *s3gof3r.Bucket {
awsAccessKey := mustGetEnv("AWS_ACCESS_KEY_ID")
awsSecretKey := mustGetEnv("AWS_SECRET_ACCESS_KEY")
keys := s3gof3r.Keys{
AccessKey: awsAccessKey,
SecretKey: awsSecretKey,
}
s3 := s3gof3r.New("", keys)
return s3.Bucket(*bucketName)
}
func generateS3Key() string {
now := time.Now().Format("2006-01-02/15")
prefix := ""
if *keyPrefix != "" {
prefix = *keyPrefix + "/"
}
uuid := pseudo_uuid()
return fmt.Sprintf("%s%s/%s/%s.tar.gz", prefix, *db, now, uuid)
}
func main() {
setupFlags()
bucket := setupS3()
go createBackup()
s3Key := generateS3Key()
output := fmt.Sprintf("s3://%s/%s", *bucketName, s3Key)
w, err := bucket.PutWriter(s3Key, nil, nil)
if err != nil {
log.Fatalf("Error with bucket (%s/%s) PutWriter: %s", *bucketName, s3Key, err)
}
defer func() {
w.Close()
log.Printf("Successfully uploaded %s", output)
}()
log.Printf("Uploading to %s", output)
written, err := io.Copy(w, pReader)
if err != nil {
log.Printf("Error Uploading to %s, ERROR: %s", output, err)
}
wg.Wait()
log.Printf("Attempting to write %d bytes", written)
}