Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using JSON line format for QueryCapture #193

Merged
merged 4 commits into from
Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions acra-censor/acra-censor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func TestSerialization(t *testing.T) {
t.Fatal("Expected: " + strings.Join(testQueries, " | ") + "\nGot: " + strings.Join(handler.GetAllInputQueries(), " | "))
}

err = handler.Serialize()
err = handler.DumpAllQueriesToFile()
if err != nil {
t.Fatal(err)
}
Expand All @@ -553,7 +553,7 @@ func TestSerialization(t *testing.T) {
t.Fatal("Expected no queries \nGot: " + strings.Join(handler.GetAllInputQueries(), " | "))
}

err = handler.Deserialize()
err = handler.ReadAllQueriesFromFile()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func TestLogging(t *testing.T) {
loggingHandler.MarkQueryAsForbidden(testQueries[0])
loggingHandler.MarkQueryAsForbidden(testQueries[1])
loggingHandler.MarkQueryAsForbidden(testQueries[2])
loggingHandler.Serialize()
loggingHandler.DumpAllQueriesToFile()

err = blacklist.AddQueries(loggingHandler.GetForbiddenQueries())
if err != nil {
Expand Down Expand Up @@ -697,7 +697,10 @@ func TestQueryCapture(t *testing.T) {
}
}

expected := "[{\"RawQuery\":\"SELECT Student_ID FROM STUDENT;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM STUDENT;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM X;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM Y;\",\"IsForbidden\":false}]"
expected := "{\"RawQuery\":\"SELECT Student_ID FROM STUDENT;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM STUDENT;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM X;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM Y;\",\"IsForbidden\":false}\n"

defaultTimeout := handler.GetSerializationTimeout()
handler.SetSerializationTimeout(50 * time.Millisecond)
Expand All @@ -719,7 +722,11 @@ func TestQueryCapture(t *testing.T) {
t.Fatal(err)
}

expected = "[{\"RawQuery\":\"SELECT Student_ID FROM STUDENT;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM STUDENT;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM X;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM Y;\",\"IsForbidden\":false},{\"RawQuery\":\"SELECT * FROM Z;\",\"IsForbidden\":false}]"
expected = "{\"RawQuery\":\"SELECT Student_ID FROM STUDENT;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM STUDENT;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM X;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM Y;\",\"IsForbidden\":false}\n" +
"{\"RawQuery\":\"SELECT * FROM Z;\",\"IsForbidden\":false}\n"
time.Sleep(handler.GetSerializationTimeout() + extraWaitTime)

result, err = ioutil.ReadFile(tmpFile.Name())
Expand Down
2 changes: 2 additions & 0 deletions acra-censor/handlers/handlers_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var ErrStructureSyntaxError = errors.New("fail to parse specified structure")

var ErrComplexSerializationError = errors.New("can't perform complex serialization of queries")
var ErrSingleQueryCaptureError = errors.New("can't capture single query")
var ErrCantOpenFileError = errors.New("can't open file to write queries")
var ErrCantReadQueriesFromFileError = errors.New("can't read queries from file")
var ErrUnexpectedCaptureChannelClose = errors.New("unexpected channel closing while query logging")

var ErrUnexpectedTypeError = errors.New("should never appear")
Expand Down
201 changes: 125 additions & 76 deletions acra-censor/handlers/querycapture_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"syscall"
"time"

"bytes"
"github.com/cossacklabs/acra/logging"
log "github.com/sirupsen/logrus"
)

const MaxQueriesInChannel = 10
const DefaultSerializationTimeout = time.Second
const LogQueryLength = 100

func trimToN(query string, n int) string {
if len(query) <= n {
Expand All @@ -25,9 +24,9 @@ func trimToN(query string, n int) string {
}

type QueryCaptureHandler struct {
Queries []QueryInfo
Queries []*QueryInfo
BufferedQueries []*QueryInfo
filePath string
logChannel chan QueryInfo
signalBackgroundExit chan bool
serializationTimeout time.Duration
serializationTicker *time.Ticker
Expand All @@ -38,52 +37,31 @@ type QueryInfo struct {
}

func NewQueryCaptureHandler(filePath string) (*QueryCaptureHandler, error) {
var _, err = os.Stat(filePath)

// create file if not exists
if err != nil {
if os.IsNotExist(err) {
var file, err = os.Create(filePath)
if err != nil {
return nil, err
}
defer file.Close()
} else {
return nil, err
}
}

bufferBytes, err := ioutil.ReadFile(filePath)
// open or create file, APPEND MODE
openedFile, err := os.OpenFile(filePath, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
log.WithError(ErrCantReadQueriesFromFileError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
return nil, err
}

var queries []QueryInfo

if len(bufferBytes) != 0 {
if err = json.Unmarshal(bufferBytes, &queries); err != nil {
return nil, err
}
}

logChannel := make(chan QueryInfo, MaxQueriesInChannel)

// signals
signalShutdown := make(chan os.Signal, 2)
signal.Notify(signalShutdown, os.Interrupt, syscall.SIGTERM)

signalBackgroundExit := make(chan bool)

// create handler
handler := &QueryCaptureHandler{}
handler.filePath = filePath
handler.Queries = queries
handler.logChannel = logChannel
handler.signalBackgroundExit = signalBackgroundExit
handler.serializationTimeout = DefaultSerializationTimeout
handler.serializationTicker = time.NewTicker(DefaultSerializationTimeout)

f, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0600)
// read existing queries from file
err = handler.ReadAllQueriesFromFile()
if err != nil {
log.WithError(ErrSingleQueryCaptureError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
log.WithError(ErrCantReadQueriesFromFileError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
openedFile.Close()
return nil, err
}

Expand All @@ -92,44 +70,24 @@ func NewQueryCaptureHandler(filePath string) (*QueryCaptureHandler, error) {
for {
select {
case <-handler.serializationTicker.C:
err := handler.Serialize()
err := handler.DumpBufferedQueriesToFile(openedFile)
if err != nil {
log.WithError(ErrComplexSerializationError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
log.WithError(ErrComplexSerializationError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
}
handler.serializationTicker.Stop()
handler.serializationTicker = time.NewTicker(handler.serializationTimeout)

case queryInfo, ok := <-handler.logChannel:
if ok {
bytes, err := json.Marshal(queryInfo)
if err != nil {
log.WithError(ErrSingleQueryCaptureError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
}

if _, err = f.WriteString("\n"); err != nil {
log.WithError(ErrSingleQueryCaptureError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
}

if _, err = f.Write(bytes); err != nil {
log.WithError(ErrSingleQueryCaptureError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
}

} else {
//channel is unexpectedly closed
log.WithError(ErrUnexpectedCaptureChannelClose).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
}

case <-signalBackgroundExit:
handler.serializationTicker.Stop()
f.Close()
err := handler.FinishAndCloseFile(openedFile)
if err != nil {
log.WithError(ErrComplexSerializationError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
}
return

case <-signalShutdown:
handler.serializationTicker.Stop()
f.Close()
err := handler.Serialize()
err := handler.FinishAndCloseFile(openedFile)
if err != nil {
log.WithError(ErrComplexSerializationError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorSecurityError)
log.WithError(ErrComplexSerializationError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
}
return

Expand All @@ -141,6 +99,7 @@ func NewQueryCaptureHandler(filePath string) (*QueryCaptureHandler, error) {

return handler, nil
}

func (handler *QueryCaptureHandler) CheckQuery(query string) (bool, error) {
//skip already captured queries
for _, queryInfo := range handler.Queries {
Expand All @@ -151,24 +110,30 @@ func (handler *QueryCaptureHandler) CheckQuery(query string) (bool, error) {
queryInfo := &QueryInfo{}
queryInfo.RawQuery = query
queryInfo.IsForbidden = false
handler.Queries = append(handler.Queries, *queryInfo)

select {
case handler.logChannel <- *queryInfo: // channel is ok
default: //channel is full
log.Errorf("Can't process too many queries. Skip query: %s", trimToN(query, LogQueryLength))
}
handler.Queries = append(handler.Queries, queryInfo)
handler.BufferedQueries = append(handler.BufferedQueries, queryInfo)

return true, nil
}
func (handler *QueryCaptureHandler) Reset() {
handler.Queries = nil
handler.BufferedQueries = nil
}

func (handler *QueryCaptureHandler) Release() {
handler.Reset()
handler.signalBackgroundExit <- true
}

func (handler *QueryCaptureHandler) FinishAndCloseFile(openedFile *os.File) error {
handler.serializationTicker.Stop()
err := handler.DumpAllQueriesToFile()
if err != nil {
return err
}
return openedFile.Close()
}

func (handler *QueryCaptureHandler) GetAllInputQueries() []string {
var queries []string
for _, queryInfo := range handler.Queries {
Expand All @@ -192,25 +157,109 @@ func (handler *QueryCaptureHandler) GetForbiddenQueries() []string {
}
return forbiddenQueries
}

func (handler *QueryCaptureHandler) SetSerializationTimeout(timeout time.Duration) {
handler.serializationTimeout = timeout
}

func (handler *QueryCaptureHandler) GetSerializationTimeout() time.Duration {
return handler.serializationTimeout
}

func (handler *QueryCaptureHandler) Serialize() error {
jsonFile, err := json.Marshal(handler.Queries)
func (handler *QueryCaptureHandler) DumpAllQueriesToFile() error {
// open or create file, NO APPEND
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about to truncate file and add O_TRUNC flag too?

f, err := os.OpenFile(handler.filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
log.WithError(ErrCantOpenFileError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
return err
}

// write all queries
return AppendQueries(handler.Queries, f)
}

func (handler *QueryCaptureHandler) DumpBufferedQueriesToFile(openedFile *os.File) error {
// nothing to dump
if len(handler.BufferedQueries) == 0 {
return nil
}

err := AppendQueries(handler.BufferedQueries, openedFile)
if err != nil {
return err
}

// clean buffered queries only after successful write
handler.BufferedQueries = nil

return nil
}

func (handler *QueryCaptureHandler) ReadAllQueriesFromFile() error {
q, err := ReadQueries(handler.filePath)
if err != nil {
log.WithError(ErrCantReadQueriesFromFileError).WithField(logging.FieldKeyEventCode, logging.EventCodeErrorCensorIOError)
return err
}
return ioutil.WriteFile(handler.filePath, jsonFile, 0600)

// read existing queries from file
handler.Queries = q
return nil
}
func (handler *QueryCaptureHandler) Deserialize() error {
var bufferBytes []byte
bufferBytes, err := ioutil.ReadFile(handler.filePath)

func AppendQueries(queries []*QueryInfo, openedFile *os.File) error {
if len(queries) == 0 {
return nil
}

lines, err := SerializeQueries(queries)
if err != nil {
return err
}
return json.Unmarshal(bufferBytes, &handler.Queries)

if _, err := openedFile.Write(lines); err != nil {
return err
}

return nil
}


func SerializeQueries(queries []*QueryInfo) ([]byte, error) {
var linesToAppend []byte
for _, queryInfo := range queries {
jsonQueryInfo, err := json.Marshal(queryInfo)
if err != nil {
return nil, err
}
if len(jsonQueryInfo) > 0 {
jsonQueryInfo = append(jsonQueryInfo, '\n')
linesToAppend = append(linesToAppend, jsonQueryInfo...)
}
}
return linesToAppend, nil

}

func ReadQueries(filePath string) ([]*QueryInfo, error) {
bufferBytes, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, err
}

var queries []*QueryInfo

if len(bufferBytes) != 0 {
for _, line := range bytes.Split(bufferBytes, []byte{'\n'}) {
if len(line) == 0 {
continue
}
var oneQuery QueryInfo
if err = json.Unmarshal(line, &oneQuery); err != nil {
return nil, err
}
queries = append(queries, &oneQuery)
}
}
return queries, nil
}
1 change: 1 addition & 0 deletions logging/event_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
EventCodeErrorCensorSetupError = 561
EventCodeErrorCensorSecurityError = 562
EventCodeErrorCensorQueryParseError = 563
EventCodeErrorCensorIOError = 564

// response connector
EventCodeErrorResponseConnectorCantWriteToDB = 570
Expand Down