mirror of
https://github.com/nxshock/zkv.git
synced 2025-07-01 00:13:37 +05:00
Improve store read speed
by skipping store blocks
This commit is contained in:
parent
412ddb11a8
commit
5f0d33828f
6 changed files with 40 additions and 19 deletions
BIN
TestBufferRead.zkv.idx
Normal file
BIN
TestBufferRead.zkv.idx
Normal file
Binary file not shown.
BIN
TestSmallWrites.zkv.idx
Normal file
BIN
TestSmallWrites.zkv.idx
Normal file
Binary file not shown.
|
@ -11,7 +11,7 @@ var defaultOptions = Options{
|
|||
CompressionLevel: zstd.SpeedDefault,
|
||||
MemoryBufferSize: 4 * 1024 * 1024,
|
||||
DiskBufferSize: 1 * 1024 * 1024,
|
||||
UseIndexFile: false,
|
||||
useIndexFile: true,
|
||||
}
|
||||
|
||||
const indexFileExt = ".idx"
|
||||
|
|
|
@ -16,10 +16,12 @@ type Options struct {
|
|||
DiskBufferSize int
|
||||
|
||||
// Use index file
|
||||
UseIndexFile bool
|
||||
useIndexFile bool
|
||||
}
|
||||
|
||||
func (o *Options) setDefaults() {
|
||||
o.useIndexFile = true // TODO: implement database search without index
|
||||
|
||||
if o.MaxParallelReads == 0 {
|
||||
o.MaxParallelReads = defaultOptions.MaxParallelReads
|
||||
}
|
||||
|
|
38
zkv.go
38
zkv.go
|
@ -14,11 +14,15 @@ import (
|
|||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
type Offsets struct {
|
||||
BlockOffset int64
|
||||
RecordOffset int64
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
dataOffset map[string]int64
|
||||
dataOffset map[string]Offsets
|
||||
|
||||
filePath string
|
||||
offset int64
|
||||
|
||||
buffer *bytes.Buffer
|
||||
bufferDataOffset map[string]int64
|
||||
|
@ -34,15 +38,14 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
|||
options.setDefaults()
|
||||
|
||||
database := &Store{
|
||||
dataOffset: make(map[string]int64),
|
||||
dataOffset: make(map[string]Offsets),
|
||||
bufferDataOffset: make(map[string]int64),
|
||||
offset: 0,
|
||||
buffer: new(bytes.Buffer),
|
||||
filePath: filePath,
|
||||
options: options,
|
||||
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
|
||||
|
||||
if options.UseIndexFile {
|
||||
if options.useIndexFile {
|
||||
idxFile, err := os.Open(filePath + indexFileExt)
|
||||
if err == nil {
|
||||
err = gob.NewDecoder(idxFile).Decode(&database.dataOffset)
|
||||
|
@ -80,7 +83,7 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
|||
|
||||
switch record.Type {
|
||||
case RecordTypeSet:
|
||||
database.dataOffset[string(record.KeyHash[:])] = offset
|
||||
database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset
|
||||
case RecordTypeDelete:
|
||||
delete(database.dataOffset, string(record.KeyHash[:]))
|
||||
}
|
||||
|
@ -283,7 +286,7 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) {
|
|||
return record.ValueBytes, nil
|
||||
}
|
||||
|
||||
offset, exists = s.dataOffset[string(keyHash[:])]
|
||||
offsets, exists := s.dataOffset[string(keyHash[:])]
|
||||
if !exists {
|
||||
return nil, ErrNotExists
|
||||
}
|
||||
|
@ -294,13 +297,18 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) {
|
|||
}
|
||||
defer readF.Close()
|
||||
|
||||
_, err = readF.Seek(offsets.BlockOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
err = skip(decompressor, offset)
|
||||
err = skip(decompressor, offsets.RecordOffset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -317,7 +325,6 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) {
|
|||
}
|
||||
|
||||
return record.ValueBytes, nil
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) get(key, value interface{}) error {
|
||||
|
@ -344,13 +351,18 @@ func (s *Store) flush() error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("open store file: %v", err)
|
||||
}
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return fmt.Errorf("stat store file: %v", err)
|
||||
}
|
||||
|
||||
diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize)
|
||||
|
||||
encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel))
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return fmt.Errorf("open store file: %v", err)
|
||||
return fmt.Errorf("init encoder: %v", err)
|
||||
}
|
||||
|
||||
_, err = s.buffer.WriteTo(encoder)
|
||||
|
@ -359,13 +371,11 @@ func (s *Store) flush() error {
|
|||
}
|
||||
|
||||
for key, val := range s.bufferDataOffset {
|
||||
s.dataOffset[key] = val + s.offset
|
||||
s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val}
|
||||
}
|
||||
|
||||
s.bufferDataOffset = make(map[string]int64)
|
||||
|
||||
s.offset += l
|
||||
|
||||
err = encoder.Close()
|
||||
if err != nil {
|
||||
// TODO: truncate file to previous state
|
||||
|
@ -384,7 +394,7 @@ func (s *Store) flush() error {
|
|||
}
|
||||
|
||||
// Update index file only on data update
|
||||
if s.options.UseIndexFile && l > 0 {
|
||||
if s.options.useIndexFile && l > 0 {
|
||||
idxBuf := new(bytes.Buffer)
|
||||
|
||||
err = gob.NewEncoder(idxBuf).Encode(s.dataOffset)
|
||||
|
|
15
zkv_test.go
15
zkv_test.go
|
@ -39,6 +39,7 @@ func TestReadWriteBasic(t *testing.T) {
|
|||
const filePath = "TestReadWriteBasic.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
@ -84,6 +85,7 @@ func TestSmallWrites(t *testing.T) {
|
|||
const filePath = "TestSmallWrites.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
db, err := Open(filePath)
|
||||
|
@ -119,6 +121,7 @@ func TestDeleteBasic(t *testing.T) {
|
|||
const filePath = "TestDeleteBasic.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
@ -164,6 +167,7 @@ func TestDeleteBasic(t *testing.T) {
|
|||
func TestBufferBasic(t *testing.T) {
|
||||
const filePath = "TestBuffer.zkv"
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100})
|
||||
assert.NoError(t, err)
|
||||
|
@ -187,8 +191,9 @@ func TestBufferBasic(t *testing.T) {
|
|||
|
||||
func TestBufferRead(t *testing.T) {
|
||||
const filePath = "TestBufferRead.zkv"
|
||||
const recordCount = 100
|
||||
const recordCount = 2
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100})
|
||||
assert.NoError(t, err)
|
||||
|
@ -241,7 +246,9 @@ func TestBackupBasic(t *testing.T) {
|
|||
const newFilePath = "TestBackupBasic2.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
defer os.Remove(newFilePath)
|
||||
defer os.Remove(newFilePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
@ -280,7 +287,9 @@ func TestBackupWithDeletedRecords(t *testing.T) {
|
|||
const newFilePath = "TestBackupWithDeletedRecords2.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
defer os.Remove(newFilePath)
|
||||
defer os.Remove(newFilePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
@ -335,7 +344,7 @@ func TestIndexFileBasic(t *testing.T) {
|
|||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{UseIndexFile: true})
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
|
@ -358,7 +367,7 @@ func TestIndexFileBasic(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
// try to read
|
||||
db, err = OpenWithOptions(filePath, Options{UseIndexFile: true})
|
||||
db, err = Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue