Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize bytes_queue #207

Merged
merged 9 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bigcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestCacheCapacity(t *testing.T) {

// then
assertEqual(t, keys, cache.Len())
assertEqual(t, 81920, cache.Capacity())
assertEqual(t, 40960, cache.Capacity())
}

func TestCacheStats(t *testing.T) {
Expand Down
70 changes: 51 additions & 19 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
)

const (
// Number of bytes used to keep information about entry size
headerEntrySize = 4
// Number of bytes to encode 0 in uvarint format
minimumHeaderSize = 1
// Bytes before left margin are not used. Zero index means element does not exist in queue, useful while reading slice from index
leftMarginIndex = 1
// Minimum empty blob size in bytes. Empty blob fills space between tail and head in additional memory allocation.
// It keeps entries indexes unchanged
minimumEmptyBlobSize = 32 + headerEntrySize
)

var (
Expand All @@ -25,6 +22,7 @@ var (
// BytesQueue is a non-thread safe queue type of fifo based on bytes array.
// For every push operation index of entry is returned. It can be used to read the entry later
type BytesQueue struct {
full bool
array []byte
capacity int
maxCapacity int
Expand All @@ -41,6 +39,21 @@ type queueError struct {
message string
}

// getUvarintSize returns the number of bytes to encode x in uvarint format
func getUvarintSize(x uint32) int {
if x < 128 {
return 1
} else if x < 16384 {
return 2
} else if x < 2097152 {
return 3
} else if x < 268435456 {
return 4
} else {
return 5
}
}

// NewBytesQueue initialize new bytes queue.
// Initial capacity is used in bytes array allocation
// When verbose flag is set then information about memory allocation are printed
Expand All @@ -49,7 +62,7 @@ func NewBytesQueue(initialCapacity int, maxCapacity int, verbose bool) *BytesQue
array: make([]byte, initialCapacity),
capacity: initialCapacity,
maxCapacity: maxCapacity,
headerBuffer: make([]byte, headerEntrySize),
headerBuffer: make([]byte, binary.MaxVarintLen32),
tail: leftMarginIndex,
head: leftMarginIndex,
rightMargin: leftMarginIndex,
Expand All @@ -71,9 +84,10 @@ func (q *BytesQueue) Reset() {
// Returns index for pushed data or error if maximum size queue limit is reached.
func (q *BytesQueue) Push(data []byte) (int, error) {
dataLen := len(data)
headerEntrySize := getUvarintSize(uint32(dataLen))

if q.availableSpaceAfterTail() < dataLen+headerEntrySize {
if q.availableSpaceBeforeHead() >= dataLen+headerEntrySize {
if !q.canInsertAfterTail(dataLen + headerEntrySize) {
if q.canInsertBeforeHead(dataLen + headerEntrySize) {
q.tail = leftMarginIndex
} else if q.capacity+headerEntrySize+dataLen >= q.maxCapacity && q.maxCapacity > 0 {
return -1, &queueError{"Full queue. Maximum size limit reached."}
Expand Down Expand Up @@ -106,6 +120,7 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
copy(q.array, oldArray[:q.rightMargin])

if q.tail < q.head {
headerEntrySize := getUvarintSize(uint32(q.head - q.tail))
emptyBlobLen := q.head - q.tail - headerEntrySize
q.push(make([]byte, emptyBlobLen), emptyBlobLen)
q.head = leftMarginIndex
Expand All @@ -119,14 +134,17 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
}

func (q *BytesQueue) push(data []byte, len int) {
binary.LittleEndian.PutUint32(q.headerBuffer, uint32(len))
headerEntrySize := binary.PutUvarint(q.headerBuffer, uint64(len))
q.copy(q.headerBuffer, headerEntrySize)

q.copy(data, len)

if q.tail > q.head {
q.rightMargin = q.tail
}
if q.tail == q.head {
q.full = true
}

q.count++
}
Expand All @@ -137,10 +155,11 @@ func (q *BytesQueue) copy(data []byte, len int) {

// Pop reads the oldest entry from queue and moves head pointer to the next one
func (q *BytesQueue) Pop() ([]byte, error) {
data, size, err := q.peek(q.head)
data, headerEntrySize, err := q.peek(q.head)
if err != nil {
return nil, err
}
size := len(data)

q.head += headerEntrySize + size
q.count--
Expand Down Expand Up @@ -199,32 +218,45 @@ func (q *BytesQueue) peekCheckErr(index int) error {
return errInvalidIndex
}

if index+headerEntrySize >= len(q.array) {
if index >= len(q.array) {
return errIndexOutOfBounds
}
return nil
}

// peek returns the data from index and the number of bytes to encode the length of the data in uvarint format
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
err := q.peekCheckErr(index)
if err != nil {
return nil, 0, err
}

blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize]))
return q.array[index+headerEntrySize : index+headerEntrySize+blockSize], blockSize, nil
blockSize, n := binary.Uvarint(q.array[index:])
return q.array[index+n : index+n+int(blockSize)], n, nil
}

func (q *BytesQueue) availableSpaceAfterTail() int {
// canInsertAfterTail returns true if it's possible to insert an entry of size of need after the tail of the queue
func (q *BytesQueue) canInsertAfterTail(need int) bool {
if q.full {
return false
}
if q.tail >= q.head {
return q.capacity - q.tail
return q.capacity-q.tail >= need
}
return q.head - q.tail - minimumEmptyBlobSize
// 1. there is exactly need bytes between head and tail, so we do not need
// to reserve extra space for a potential emtpy entry when re-allco this queeu
// 2. still have unused space between tail and head, then we must reserve
// at least headerEntrySize bytes so we can put an empty entry
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
}

func (q *BytesQueue) availableSpaceBeforeHead() int {
// canInsertBeforeHead returns true if it's possible to insert an entry of size of need before the head of the queue
func (q *BytesQueue) canInsertBeforeHead(need int) bool {
if q.full {
return false
}
if q.tail >= q.head {
return q.head - leftMarginIndex - minimumEmptyBlobSize
return q.head-leftMarginIndex == need || q.head-leftMarginIndex >= need+minimumHeaderSize
}
return q.head - q.tail - minimumEmptyBlobSize
return q.head-q.tail == need || q.head-q.tail >= need+minimumHeaderSize
}
47 changes: 24 additions & 23 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereHeadIsBef
queue := NewBytesQueue(25, 0, false)

// when
queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes
queue.Push(blob('b', 6)) // additional 10 bytes
queue.Pop() // space freed, 7 bytes available at the beginning
queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory
queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes
queue.Push(blob('b', 6)) // additional 7 bytes
queue.Pop() // space freed, 4 bytes available at the beginning
queue.Push(blob('c', 6)) // 7 bytes needed, 13 bytes available at the tail

// then
assertEqual(t, 50, queue.Capacity())
assertEqual(t, 25, queue.Capacity())
assertEqual(t, blob('b', 6), pop(queue))
assertEqual(t, blob('c', 6), pop(queue))
}
Expand All @@ -163,13 +163,13 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereHeadIsBefore
queue := NewBytesQueue(25, 0, false)

// when
queue.Push(blob('a', 3)) // header + entry + left margin = 8 bytes
index, _ := queue.Push(blob('b', 6)) // additional 10 bytes
queue.Pop() // space freed, 7 bytes available at the beginning
newestIndex, _ := queue.Push(blob('c', 6)) // 10 bytes needed, 14 available but not in one segment, allocate additional memory
queue.Push(blob('a', 3)) // header + entry + left margin = 5 bytes
index, _ := queue.Push(blob('b', 6)) // additional 7 bytes
queue.Pop() // space freed, 4 bytes available at the beginning
newestIndex, _ := queue.Push(blob('c', 6)) // 7 bytes needed, 13 available at the tail

// then
assertEqual(t, 50, queue.Capacity())
assertEqual(t, 25, queue.Capacity())
assertEqual(t, blob('b', 6), get(queue, index))
assertEqual(t, blob('c', 6), get(queue, newestIndex))
}
Expand All @@ -181,19 +181,19 @@ func TestAllocateAdditionalSpaceForInsufficientFreeFragmentedSpaceWhereTailIsBef
queue := NewBytesQueue(100, 0, false)

// when
queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes
queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes
queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes
queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes
queue.Pop() // space freed at the beginning
queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer
queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory
queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer
queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory

// then
assertEqual(t, 200, queue.Capacity())
assertEqual(t, blob('c', 30), pop(queue))
// empty blob fills space between tail and head,
// created when additional memory was allocated,
// it keeps current entries indexes unchanged
assertEqual(t, blob(0, 36), pop(queue))
assertEqual(t, blob(0, 39), pop(queue))
assertEqual(t, blob('b', 10), pop(queue))
assertEqual(t, blob('d', 40), pop(queue))
}
Expand All @@ -205,11 +205,11 @@ func TestUnchangedEntriesIndexesAfterAdditionalMemoryAllocationWhereTailIsBefore
queue := NewBytesQueue(100, 0, false)

// when
queue.Push(blob('a', 70)) // header + entry + left margin = 75 bytes
index, _ := queue.Push(blob('b', 10)) // 75 + 10 + 4 = 89 bytes
queue.Push(blob('a', 70)) // header + entry + left margin = 72 bytes
index, _ := queue.Push(blob('b', 10)) // 72 + 10 + 1 = 83 bytes
queue.Pop() // space freed at the beginning
queue.Push(blob('c', 30)) // 34 bytes used at the beginning, tail pointer is before head pointer
newestIndex, _ := queue.Push(blob('d', 40)) // 44 bytes needed but no available in one segment, allocate new memory
queue.Push(blob('c', 30)) // 31 bytes used at the beginning, tail pointer is before head pointer
newestIndex, _ := queue.Push(blob('d', 40)) // 41 bytes needed but no available in one segment, allocate new memory

// then
assertEqual(t, 200, queue.Capacity())
Expand All @@ -225,10 +225,10 @@ func TestAllocateAdditionalSpaceForValueBiggerThanInitQueue(t *testing.T) {

// when
queue.Push(blob('a', 100))

// then
assertEqual(t, blob('a', 100), pop(queue))
assertEqual(t, 230, queue.Capacity())
// 224 = (101 + 11) * 2
assertEqual(t, 224, queue.Capacity())
}

func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) {
Expand All @@ -246,7 +246,8 @@ func TestAllocateAdditionalSpaceForValueBiggerThanQueue(t *testing.T) {
queue.Pop()
queue.Pop()
assertEqual(t, make([]byte, 100), pop(queue))
assertEqual(t, 250, queue.Capacity())
// 244 = (101 + 21) * 2
assertEqual(t, 244, queue.Capacity())
}

func TestPopWholeQueue(t *testing.T) {
Expand Down Expand Up @@ -343,7 +344,7 @@ func TestMaxSizeLimit(t *testing.T) {
queue.Push(blob('a', 25))
queue.Push(blob('b', 5))
capacity := queue.Capacity()
_, err := queue.Push(blob('c', 15))
_, err := queue.Push(blob('c', 20))

// then
assertEqual(t, 50, capacity)
Expand Down