Skip to content

Commit

Permalink
Apply matcher aliases to tables on config load
Browse files Browse the repository at this point in the history
  • Loading branch information
vgarvardt committed Apr 3, 2020
1 parent ffb0bf8 commit 9398461
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 67 deletions.
8 changes: 4 additions & 4 deletions cmd/steal.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type (
// StealOptions represents the command options
StealOptions struct {
configPath string
cfgSpec *config.Spec
cfgTables config.Tables

from string
to string
Expand All @@ -49,7 +49,7 @@ func NewStealCmd() *cobra.Command {
Short: "Steals and anonymises databases",
PreRunE: func(cmd *cobra.Command, args []string) error {
var err error
opts.cfgSpec, err = config.LoadSpecFromFile(opts.configPath)
opts.cfgTables, err = config.LoadSpecFromFile(opts.configPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -96,7 +96,7 @@ func RunSteal(opts *StealOptions) (err error) {
}
}()

source = anonymiser.NewAnonymiser(source, opts.cfgSpec.Tables)
source = anonymiser.NewAnonymiser(source, opts.cfgTables)
target, err := dumper.NewDumper(dumper.ConnOpts{
DSN: opts.to,
Timeout: opts.writeOpts.timeout,
Expand All @@ -119,7 +119,7 @@ func RunSteal(opts *StealOptions) (err error) {
defer close(done)

start := time.Now()
if err := target.Dump(done, opts.cfgSpec, opts.concurrency); err != nil {
if err := target.Dump(done, opts.cfgTables, opts.concurrency); err != nil {
return wErrors.Wrap(err, "Error while dumping")
}

Expand Down
2 changes: 1 addition & 1 deletion features/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *MysqlTestSuite) TestExample() {

done := make(chan struct{})
defer close(done)
s.Require().NoError(dmp.Dump(done, new(config.Spec), 4), "Failed to dump")
s.Require().NoError(dmp.Dump(done, config.Tables{}, 4), "Failed to dump")

<-done

Expand Down
2 changes: 1 addition & 1 deletion features/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *PostgresTestSuite) TestExample() {

done := make(chan struct{})
defer close(done)
s.Require().NoError(dmp.Dump(done, new(config.Spec), 4), "Failed to dump")
s.Require().NoError(dmp.Dump(done, config.Tables{}, 4), "Failed to dump")

<-done

Expand Down
8 changes: 4 additions & 4 deletions pkg/anonymiser/anonymiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ func NewAnonymiser(source reader.Reader, tables config.Tables) reader.Reader {
}

// ReadTable decorates reader.ReadTable method for anonymising rows published from the reader.Reader
func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error {
func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt) error {
logger := log.WithField("table", tableName)
logger.Debug("Loading anonymiser config")
table := a.tables.FindByName(tableName)
if table == nil {
logger.Debug("the table is not configured to be anonymised")
return a.Reader.ReadTable(tableName, rowChan, opts, matchers)
return a.Reader.ReadTable(tableName, rowChan, opts)
}

if len(table.Anonymise) == 0 {
logger.Debug("Skipping anonymiser")
return a.Reader.ReadTable(tableName, rowChan, opts, matchers)
return a.Reader.ReadTable(tableName, rowChan, opts)
}

// Create read/write chanel
Expand Down Expand Up @@ -92,7 +92,7 @@ func (a *anonymiser) ReadTable(tableName string, rowChan chan<- database.Row, op
}
}(rowChan, rawChan, table)

if err := a.Reader.ReadTable(tableName, rawChan, opts, matchers); err != nil {
if err := a.Reader.ReadTable(tableName, rawChan, opts); err != nil {
return errors.Wrap(err, "anonymiser: error while reading table")
}

Expand Down
27 changes: 11 additions & 16 deletions pkg/anonymiser/anonymiser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,75 +17,70 @@ func TestReadTable(t *testing.T) {

tests := []struct {
scenario string
function func(*testing.T, reader.ReadTableOpt, config.Tables, config.Matchers)
function func(*testing.T, reader.ReadTableOpt, config.Tables)
opts reader.ReadTableOpt
matchers config.Matchers
config config.Tables
}{
{
scenario: "when anonymiser is not initialized",
function: testWhenAnonymiserIsNotInitialized,
opts: reader.ReadTableOpt{},
config: config.Tables{{Name: "test"}},
matchers: make(config.Matchers),
},
{
scenario: "when table is not set in the config",
function: testWhenTableIsNotSetInConfig,
opts: reader.ReadTableOpt{},
config: config.Tables{{Name: "test"}},
matchers: make(config.Matchers),
},
{
scenario: "when column is anonymised",
function: testWhenColumnIsAnonymised,
opts: reader.ReadTableOpt{},
config: config.Tables{{Name: "test", Anonymise: map[string]string{"column_test": "FirstName"}}},
matchers: make(config.Matchers),
},
{
scenario: "when column is anonymised with literal",
function: testWhenColumnIsAnonymisedWithLiteral,
opts: reader.ReadTableOpt{},
config: config.Tables{{Name: "test", Anonymise: map[string]string{"column_test": "literal:Hello"}}},
matchers: make(config.Matchers),
},
}

for _, test := range tests {
t.Run(test.scenario, func(t *testing.T) {
test.function(t, test.opts, test.config, test.matchers)
test.function(t, test.opts, test.config)
})
}
}

func testWhenAnonymiserIsNotInitialized(t *testing.T, opts reader.ReadTableOpt, tables config.Tables, matchers config.Matchers) {
func testWhenAnonymiserIsNotInitialized(t *testing.T, opts reader.ReadTableOpt, tables config.Tables) {
anonymiser := NewAnonymiser(&mockReader{}, tables)

rowChan := make(chan database.Row, 1)
defer close(rowChan)

err := anonymiser.ReadTable("test", rowChan, opts, matchers)
err := anonymiser.ReadTable("test", rowChan, opts)
require.NoError(t, err)
}

func testWhenTableIsNotSetInConfig(t *testing.T, opts reader.ReadTableOpt, tables config.Tables, matchers config.Matchers) {
func testWhenTableIsNotSetInConfig(t *testing.T, opts reader.ReadTableOpt, tables config.Tables) {
anonymiser := NewAnonymiser(&mockReader{}, tables)

rowChan := make(chan database.Row, 1)
defer close(rowChan)

err := anonymiser.ReadTable("other_table", rowChan, opts, matchers)
err := anonymiser.ReadTable("other_table", rowChan, opts)
require.NoError(t, err)
}

func testWhenColumnIsAnonymised(t *testing.T, opts reader.ReadTableOpt, tables config.Tables, matchers config.Matchers) {
func testWhenColumnIsAnonymised(t *testing.T, opts reader.ReadTableOpt, tables config.Tables) {
anonymiser := NewAnonymiser(&mockReader{}, tables)

rowChan := make(chan database.Row)
defer close(rowChan)

err := anonymiser.ReadTable("test", rowChan, opts, matchers)
err := anonymiser.ReadTable("test", rowChan, opts)
require.NoError(t, err)

for {
Expand All @@ -95,13 +90,13 @@ func testWhenColumnIsAnonymised(t *testing.T, opts reader.ReadTableOpt, tables c
}
}

func testWhenColumnIsAnonymisedWithLiteral(t *testing.T, opts reader.ReadTableOpt, tables config.Tables, matchers config.Matchers) {
func testWhenColumnIsAnonymisedWithLiteral(t *testing.T, opts reader.ReadTableOpt, tables config.Tables) {
anonymiser := NewAnonymiser(&mockReader{}, tables)

rowChan := make(chan database.Row)
defer close(rowChan)

err := anonymiser.ReadTable("test", rowChan, opts, matchers)
err := anonymiser.ReadTable("test", rowChan, opts)
require.NoError(t, err)

for {
Expand All @@ -121,7 +116,7 @@ func (m *mockReader) Close() error { return nil }
func (m *mockReader) FormatColumn(tbl string, col string) string {
return fmt.Sprintf("%s.%s", strconv.Quote(tbl), strconv.Quote(col))
}
func (m *mockReader) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt, matchers config.Matchers) error {
func (m *mockReader) ReadTable(tableName string, rowChan chan<- database.Row, opts reader.ReadTableOpt) error {
row := make(database.Row)
row["column_test"] = "to_be_anonimised"
rowChan <- row
Expand Down
24 changes: 22 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"strings"

wErrors "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand Down Expand Up @@ -74,7 +76,7 @@ func (t Tables) FindByName(name string) *Table {
}

// LoadSpecFromFile loads klepto spec from file
func LoadSpecFromFile(configPath string) (*Spec, error) {
func LoadSpecFromFile(configPath string) (Tables, error) {
if configPath == "" {
return nil, wErrors.New("config file path can not be empty")
}
Expand All @@ -93,5 +95,23 @@ func LoadSpecFromFile(configPath string) (*Spec, error) {
return nil, wErrors.Wrap(err, "could not unmarshal config file")
}

return cfgSpec, nil
// replace matchers aliases in tables with matchers expressions
for i, t := range cfgSpec.Tables {
if t.Filter.Match == "" {
continue
}

if m, ok := cfgSpec.Matchers[t.Filter.Match]; ok {
cfgSpec.Tables[i].Filter.Match = m
continue
}

// matcher keys can be lower-cased by the parser - check this case as well
if m, ok := cfgSpec.Matchers[strings.ToLower(t.Filter.Match)]; ok {
cfgSpec.Tables[i].Filter.Match = m
continue
}
}

return cfgSpec.Tables, nil
}
6 changes: 2 additions & 4 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ func TestLoadSpecFromFile(t *testing.T) {
// klepto/pkg/config/../../fixtures/.klepto.toml
configPath := filepath.Join(cwd, "..", "..", "fixtures", ".klepto.toml")

spec, err := LoadSpecFromFile(configPath)
cfgTables, err := LoadSpecFromFile(configPath)
require.NoError(t, err)

cfgTables := spec.Tables
require.Len(t, cfgTables, 3)

users := cfgTables.FindByName("users")
Expand All @@ -31,5 +29,5 @@ func TestLoadSpecFromFile(t *testing.T) {

orders := cfgTables.FindByName("orders")
require.NotNil(t, orders)
assert.Equal(t, "ActiveUsers", orders.Filter.Match)
assert.Equal(t, "users.active = TRUE", orders.Filter.Match)
}
2 changes: 1 addition & 1 deletion pkg/dumper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type (
// A Dumper writes a database's structure to the provided stream.
Dumper interface {
// Dump executes the dump process.
Dump(chan<- struct{}, *config.Spec, int) error
Dump(chan<- struct{}, config.Tables, int) error
// Close closes the dumper resources and releases them.
Close() error
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/dumper/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ func New(rdr reader.Reader, dumper Dumper) dumper.Dumper {
}

// Dump executes the dump process.
func (e *Engine) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error {
func (e *Engine) Dump(done chan<- struct{}, cfgTables config.Tables, concurrency int) error {
if err := e.readAndDumpStructure(); err != nil {
return err
}

return e.readAndDumpTables(done, spec, concurrency)
return e.readAndDumpTables(done, cfgTables, concurrency)
}

func (e *Engine) readAndDumpStructure() error {
Expand All @@ -70,7 +70,7 @@ func (e *Engine) readAndDumpStructure() error {
return nil
}

func (e *Engine) readAndDumpTables(done chan<- struct{}, spec *config.Spec, concurrency int) error {
func (e *Engine) readAndDumpTables(done chan<- struct{}, cfgTables config.Tables, concurrency int) error {
tables, err := e.reader.GetTables()
if err != nil {
return wErrors.Wrap(err, "failed to read and dump tables")
Expand All @@ -87,7 +87,7 @@ func (e *Engine) readAndDumpTables(done chan<- struct{}, spec *config.Spec, conc
var wg sync.WaitGroup
for _, tbl := range tables {
logger := log.WithField("table", tbl)
tableConfig := spec.Tables.FindByName(tbl)
tableConfig := cfgTables.FindByName(tbl)
if tableConfig == nil {
logger.Debug("no configuration found for table")
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func (e *Engine) readAndDumpTables(done chan<- struct{}, spec *config.Spec, conc
}(tbl, rowChan, logger)

go func(tableName string, opts reader.ReadTableOpt, rowChan chan<- database.Row, logger *log.Entry) {
if err := e.reader.ReadTable(tableName, rowChan, opts, spec.Matchers); err != nil {
if err := e.reader.ReadTable(tableName, rowChan, opts); err != nil {
logger.WithError(err).Error("Failed to read table")
}
}(tbl, opts, rowChan, logger)
Expand Down
27 changes: 19 additions & 8 deletions pkg/dumper/query/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewDumper(output io.Writer, rdr reader.Reader) dumper.Dumper {
}

// Dump executes the dump stream process.
func (d *textDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency int) error {
func (d *textDumper) Dump(done chan<- struct{}, cfgTables config.Tables, concurrency int) error {
tables, err := d.reader.GetTables()
if err != nil {
return wErrors.Wrap(err, "failed to get tables")
Expand All @@ -42,15 +42,22 @@ func (d *textDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency i
if err != nil {
return wErrors.Wrap(err, "could not get database structure")
}
io.WriteString(d.output, structure)
if _, err := io.WriteString(d.output, structure); err != nil {
return wErrors.Wrap(err, "could not write structure to output")
}

for _, tbl := range tables {
var opts reader.ReadTableOpt
logger := log.WithField("table", tbl)

tableConfig := spec.Tables.FindByName(tbl)
tableConfig := cfgTables.FindByName(tbl)
if tableConfig == nil {
log.WithField("table", tbl).Debug("no configuration found for table")
logger.Debug("no configuration found for table")
} else {
if tableConfig.IgnoreData {
logger.Debug("ignoring data to dump")
continue
}
opts = reader.NewReadTableOpt(tableConfig)
}

Expand All @@ -67,16 +74,20 @@ func (d *textDumper) Dump(done chan<- struct{}, spec *config.Spec, concurrency i

columnMap, err := d.toSQLColumnMap(row)
if err != nil {
log.WithError(err).Fatal("could not convert value to string")
logger.WithError(err).Fatal("could not convert value to string")
}

insert := sq.Insert(tableName).SetMap(columnMap)
io.WriteString(d.output, sq.DebugSqlizer(insert))
io.WriteString(d.output, "\n")
if _, err := io.WriteString(d.output, sq.DebugSqlizer(insert)); err != nil {
logger.WithError(err).Error("could not write insert statement to output")
}
if _, err := io.WriteString(d.output, "\n"); err != nil {
logger.WithError(err).Error("could not write new line to output")
}
}
}(tbl)

if err := d.reader.ReadTable(tbl, rowChan, opts, spec.Matchers); err != nil {
if err := d.reader.ReadTable(tbl, rowChan, opts); err != nil {
log.WithError(err).WithField("table", tbl).Error("error while reading table")
}
}
Expand Down
Loading

0 comments on commit 9398461

Please sign in to comment.