Skip to content

Commit

Permalink
Merge pull request #111 from hellofresh/hotfix/init-config-read
Browse files Browse the repository at this point in the history
PT-6909 Extracted spec loading to config and added some tests
  • Loading branch information
vgarvardt authored Apr 2, 2020
2 parents a750a29 + 0216803 commit f81f7a5
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 127 deletions.
4 changes: 1 addition & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@ klepto
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
/dist
debug
.klepto.toml
/*.klepto.toml
coverage.txt

# Vendor stuff
.glide/
vendor/

*.coverprofile
13 changes: 8 additions & 5 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ func NewInitCmd() *cobra.Command {

// RunInit runs the init command
func RunInit() error {
log.Infof("Initializing %s", configFileName)
log.Infof("Initializing %s", config.DefaultConfigFileName)

_, err := os.Stat(configFileName)
_, err := os.Stat(config.DefaultConfigFileName)
if !os.IsNotExist(err) {
log.Fatal("Config file already exists, refusing to overwrite")
}

f, err := os.Create(configFileName)
f, err := os.Create(config.DefaultConfigFileName)
if err != nil {
return wErrors.Wrap(err, "could not create file")
}

e := toml.NewEncoder(bufio.NewWriter(f))
err = e.Encode(config.Spec{
Matchers: map[string]string{
"ActiveUsers": "users.active = TRUE",
},
Tables: []*config.Table{
{
Name: "users",
Expand All @@ -54,7 +57,7 @@ func RunInit() error {
{
Name: "orders",
Filter: config.Filter{
Match: "users.active = TRUE",
Match: "ActiveUsers",
Limit: 10,
},
Relationships: []*config.Relationship{
Expand All @@ -75,7 +78,7 @@ func RunInit() error {
return wErrors.Wrap(err, "could not encode config")
}

log.Infof("Created %s!", configFileName)
log.Infof("Created %s!", config.DefaultConfigFileName)

return nil
}
53 changes: 9 additions & 44 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ package cmd
import (
"os"

"github.com/hellofresh/klepto/pkg/config"
"github.com/hellofresh/klepto/pkg/formatter"
wErrors "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/hellofresh/klepto/pkg/formatter"
)

var (
globalConfig *config.Spec
configFile string
configFileName = ".klepto.toml"
verbose bool
verbose bool

// RootCmd steals and anonymises databases
RootCmd = &cobra.Command{
Expand All @@ -32,48 +27,18 @@ var (
)

func init() {
RootCmd.PersistentFlags().StringVarP(&configFile, "config", "c", "", "Path to config file (default is ./.klepto)")
RootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Make the operation more talkative")
RootCmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if verbose {
log.SetLevel(log.DebugLevel)
}
}

RootCmd.AddCommand(NewStealCmd())
RootCmd.AddCommand(NewVersionCmd())
RootCmd.AddCommand(NewUpdateCmd())
RootCmd.AddCommand(NewInitCmd())
RootCmd.AddCommand(NewStealCmd())

log.SetOutput(os.Stderr)
log.SetFormatter(&formatter.CliFormatter{})
}

func initConfig(c *cobra.Command, args []string) error {
if verbose {
log.SetLevel(log.DebugLevel)
}

log.Debugf("Reading config from %s...", configFileName)

if configFile != "" {
// Use config file from the flag.
viper.SetConfigFile(configFile)
} else {
cwd, err := os.Getwd()
if err != nil {
return wErrors.Wrap(err, "can't find current working directory")
}

viper.SetConfigName(".klepto")
viper.AddConfigPath(cwd)
viper.AddConfigPath(".")
}

err := viper.ReadInConfig()
if err != nil {
return wErrors.Wrap(err, "could not read configurations")
}

err = viper.Unmarshal(&globalConfig)
if err != nil {
return wErrors.Wrap(err, "could not unmarshal config file")
}

return nil
}
99 changes: 52 additions & 47 deletions cmd/steal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"runtime"
"time"

"github.com/hellofresh/klepto/pkg/anonymiser"
"github.com/hellofresh/klepto/pkg/dumper"
"github.com/hellofresh/klepto/pkg/reader"
wErrors "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/hellofresh/klepto/pkg/anonymiser"
"github.com/hellofresh/klepto/pkg/config"
"github.com/hellofresh/klepto/pkg/dumper"
"github.com/hellofresh/klepto/pkg/reader"

// imports dumpers and readers
_ "github.com/hellofresh/klepto/pkg/dumper/mysql"
_ "github.com/hellofresh/klepto/pkg/dumper/postgres"
Expand All @@ -22,15 +24,18 @@ import (
type (
// StealOptions represents the command options
StealOptions struct {
configPath string
cfgSpec *config.Spec

from string
to string
concurrency int
readOpts connOpts
writeOpts connOpts
}
connOpts struct {
timeout string
maxConnLifetime string
timeout time.Duration
maxConnLifetime time.Duration
maxConns int
maxIdleConns int
}
Expand All @@ -40,81 +45,81 @@ type (
func NewStealCmd() *cobra.Command {
opts := new(StealOptions)
cmd := &cobra.Command{
Use: "steal",
Short: "Steals and anonymises databases",
PreRunE: initConfig,
Use: "steal",
Short: "Steals and anonymises databases",
PreRunE: func(cmd *cobra.Command, args []string) error {
var err error
opts.cfgSpec, err = config.LoadSpecFromFile(opts.configPath)
if err != nil {
return err
}

return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
return RunSteal(opts)
},
}

cmd.PersistentFlags().StringVarP(&opts.from, "from", "f", "root:root@tcp(localhost:3306)/klepto", "Database dsn to steal from")
cmd.PersistentFlags().StringVarP(&opts.to, "to", "t", "os://stdout/", "Database to output to (default writes to stdOut)")
cmd.PersistentFlags().IntVar(&opts.concurrency, "concurrency", runtime.NumCPU(), "Sets the amount of dumps to be performed concurrently")
cmd.PersistentFlags().StringVar(&opts.readOpts.timeout, "read-timeout", "5m", "Sets the timeout for read operations")
cmd.PersistentFlags().StringVar(&opts.writeOpts.timeout, "write-timeout", "30s", "Sets the timeout for write operations")
cmd.PersistentFlags().StringVar(&opts.readOpts.maxConnLifetime, "read-conn-lifetime", "0", "Sets the maximum amount of time a connection may be reused on the read database")
cmd.PersistentFlags().IntVar(&opts.readOpts.maxConns, "read-max-conns", 5, "Sets the maximum number of open connections to the read database")
cmd.PersistentFlags().IntVar(&opts.readOpts.maxIdleConns, "read-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the read database")
cmd.PersistentFlags().StringVar(&opts.writeOpts.maxConnLifetime, "write-conn-lifetime", "0", "Sets the maximum amount of time a connection may be reused on the write database")
cmd.PersistentFlags().IntVar(&opts.writeOpts.maxConns, "write-max-conns", 5, "Sets the maximum number of open connections to the write database")
cmd.PersistentFlags().IntVar(&opts.writeOpts.maxIdleConns, "write-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the write database")
persistentFlags := cmd.PersistentFlags()
persistentFlags.StringVarP(&opts.configPath, "config", "c", config.DefaultConfigFileName, "Path to config file")
persistentFlags.StringVarP(&opts.from, "from", "f", "mysql://root:root@tcp(localhost:3306)/klepto", "Database dsn to steal from")
persistentFlags.StringVarP(&opts.to, "to", "t", "os://stdout/", "Database to output to (default writes to stdOut)")
persistentFlags.IntVar(&opts.concurrency, "concurrency", runtime.NumCPU(), "Sets the amount of dumps to be performed concurrently")
persistentFlags.DurationVar(&opts.readOpts.timeout, "read-timeout", 5*time.Minute, "Sets the timeout for read operations")
persistentFlags.DurationVar(&opts.readOpts.maxConnLifetime, "read-conn-lifetime", 0, "Sets the maximum amount of time a connection may be reused on the read database")
persistentFlags.IntVar(&opts.readOpts.maxConns, "read-max-conns", 5, "Sets the maximum number of open connections to the read database")
persistentFlags.IntVar(&opts.readOpts.maxIdleConns, "read-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the read database")
persistentFlags.DurationVar(&opts.writeOpts.timeout, "write-timeout", 30*time.Second, "Sets the timeout for write operations")
persistentFlags.DurationVar(&opts.writeOpts.maxConnLifetime, "write-conn-lifetime", 0, "Sets the maximum amount of time a connection may be reused on the write database")
persistentFlags.IntVar(&opts.writeOpts.maxConns, "write-max-conns", 5, "Sets the maximum number of open connections to the write database")
persistentFlags.IntVar(&opts.writeOpts.maxIdleConns, "write-max-idle-conns", 0, "Sets the maximum number of connections in the idle connection pool for the write database")

return cmd
}

// RunSteal is the handler for the rootCmd.
func RunSteal(opts *StealOptions) (err error) {
readTimeout, err := time.ParseDuration(opts.readOpts.timeout)
if err != nil {
return wErrors.Wrap(err, "Failed to parse read timeout duration")
}

writeTimeout, err := time.ParseDuration(opts.readOpts.timeout)
if err != nil {
return wErrors.Wrap(err, "Failed to parse write timeout duration")
}

readMaxConnLifetime, err := time.ParseDuration(opts.readOpts.maxConnLifetime)
if err != nil {
return wErrors.Wrap(err, "Failed to parse write timeout duration")
}

writeMaxConnLifetime, err := time.ParseDuration(opts.writeOpts.maxConnLifetime)
if err != nil {
return wErrors.Wrap(err, "Failed to parse the timeout duration")
}

source, err := reader.Connect(reader.ConnOpts{
DSN: opts.from,
Timeout: readTimeout,
MaxConnLifetime: readMaxConnLifetime,
Timeout: opts.readOpts.timeout,
MaxConnLifetime: opts.readOpts.maxConnLifetime,
MaxConns: opts.readOpts.maxConns,
MaxIdleConns: opts.readOpts.maxIdleConns,
})
if err != nil {
return wErrors.Wrap(err, "Could not connecting to reader")
}
defer source.Close()
defer func() {
if err := source.Close(); err != nil {
log.WithError(err).Error("Something is not ok with closing source connection")
}
}()

source = anonymiser.NewAnonymiser(source, globalConfig.Tables)
source = anonymiser.NewAnonymiser(source, opts.cfgSpec.Tables)
target, err := dumper.NewDumper(dumper.ConnOpts{
DSN: opts.to,
Timeout: writeTimeout,
MaxConnLifetime: writeMaxConnLifetime,
Timeout: opts.writeOpts.timeout,
MaxConnLifetime: opts.writeOpts.maxConnLifetime,
MaxConns: opts.writeOpts.maxConns,
MaxIdleConns: opts.writeOpts.maxIdleConns,
}, source)
if err != nil {
return wErrors.Wrap(err, "Error creating dumper")
}
defer target.Close()
defer func() {
if err := target.Close(); err != nil {
log.WithError(err).Error("Something is not ok with closing target connection")
}
}()

log.Info("Stealing...")

done := make(chan struct{})
defer close(done)

start := time.Now()
if err := target.Dump(done, globalConfig, opts.concurrency); err != nil {
if err := target.Dump(done, opts.cfgSpec, opts.concurrency); err != nil {
return wErrors.Wrap(err, "Error while dumping")
}

Expand Down
34 changes: 34 additions & 0 deletions fixtures/.klepto.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[Matchers]
ActiveUsers = "users.active = TRUE"

[[Tables]]
Name = "users"
IgnoreData = false
[Tables.Filter]
Match = "users.active = TRUE"
Limit = 100
[Tables.Filter.Sorts]
"user.id" = "asc"
[Tables.Anonymise]
email = "EmailAddress"
firstName = "FirstName"

[[Tables]]
Name = "orders"
IgnoreData = false
[Tables.Filter]
Match = "ActiveUsers"
Limit = 10

[[Tables.Relationships]]
Table = ""
ForeignKey = "user_id"
ReferencedTable = "users"
ReferencedKey = "id"

[[Tables]]
Name = "logs"
IgnoreData = true
[Tables.Filter]
Match = ""
Limit = 0
6 changes: 3 additions & 3 deletions pkg/anonymiser/anonymiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func NewAnonymiser(source reader.Reader, tables config.Tables) reader.Reader {
func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error {
logger := log.WithField("table", tableName)
logger.Debug("Loading anonymiser config")
table, err := a.tables.FindByName(tableName)
if err != nil {
logger.WithError(err).Debug("the table is not configured to be anonymised")
table := a.tables.FindByName(tableName)
if table == nil {
logger.Debug("the table is not configured to be anonymised")
return a.Reader.ReadTable(tableName, rowChan, opts, matchers)
}

Expand Down
Loading

0 comments on commit f81f7a5

Please sign in to comment.