From d40b88eebb260d98ad8e93cc2073e7bba49443cd Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 2 Dec 2022 21:37:15 +0500 Subject: [PATCH 01/28] Replace Mutex with RWMutex --- zkv.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zkv.go b/zkv.go index d8b75fa..25fbb94 100644 --- a/zkv.go +++ b/zkv.go @@ -18,7 +18,7 @@ type Database struct { filePath string offset int64 - mu sync.Mutex + mu sync.RWMutex } func (db *Database) Close() error { @@ -60,8 +60,8 @@ func (db *Database) Set(key, value interface{}) error { } func (db *Database) Get(key, value interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() + db.mu.RLock() + defer db.mu.RUnlock() hashToFind, err := hashInterface(key) if err != nil { From db258424297ec2efc22822fb0a4a603b5aae2bb9 Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 2 Dec 2022 21:37:34 +0500 Subject: [PATCH 02/28] Make constant key length --- record.go | 2 +- utils.go | 11 ++++------- zkv.go | 14 +++++++------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/record.go b/record.go index eac0325..85a8c2b 100644 --- a/record.go +++ b/record.go @@ -16,7 +16,7 @@ const ( type Record struct { Type RecordType - KeyHash []byte + KeyHash [28]byte ValueBytes []byte } diff --git a/utils.go b/utils.go index 10e0e15..97d63be 100644 --- a/utils.go +++ b/utils.go @@ -17,20 +17,17 @@ func decode(b []byte, value interface{}) error { return gob.NewDecoder(bytes.NewReader(b)).Decode(value) } -func hashInterface(value interface{}) ([]byte, error) { +func hashInterface(value interface{}) ([sha256.Size224]byte, error) { valueBytes, err := encode(value) if err != nil { - return nil, err + return [sha256.Size224]byte{}, err } return hashBytes(valueBytes), nil } -func hashBytes(b []byte) []byte { - bytes := sha256.Sum224(b) - - return bytes[:] - +func hashBytes(b []byte) [sha256.Size224]byte { + return sha256.Sum224(b) } func skip(r io.Reader, count int64) (err error) { diff --git a/zkv.go b/zkv.go index 25fbb94..4df780b 100644 --- a/zkv.go +++ b/zkv.go @@ -47,7 +47,7 @@ func (db *Database) Set(key, value interface{}) error { return err } - db.dataOffset[string(record.KeyHash)] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки + db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки _, err = db.compressor.Write(b) if err != nil { @@ -68,7 +68,7 @@ func (db *Database) Get(key, value interface{}) error { return err } - offset, exists := db.dataOffset[string(hashToFind)] + offset, exists := db.dataOffset[string(hashToFind[:])] if !exists { return ErrNotExists } @@ -95,8 +95,8 @@ func (db *Database) Get(key, value interface{}) error { return err } - if !bytes.Equal(record.KeyHash, hashToFind) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind), base64.StdEncoding.EncodeToString(record.KeyHash)) // TODO: заменить на константную ошибку + if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку } return decode(record.ValueBytes, value) @@ -147,9 +147,9 @@ func Open(filePath string) (*Database, error) { switch record.Type { case RecordTypeSet: - database.dataOffset[string(record.KeyHash)] = offset + database.dataOffset[string(record.KeyHash[:])] = offset case RecordTypeDelete: - delete(database.dataOffset, string(record.KeyHash)) + delete(database.dataOffset, string(record.KeyHash[:])) } offset += n @@ -177,7 +177,7 @@ func (db *Database) Delete(key interface{}) error { return err } - delete(db.dataOffset, string(record.KeyHash)) + delete(db.dataOffset, string(record.KeyHash[:])) _, err = db.compressor.Write(b) if err != nil { From b5043e1319d4c1cac99975825bc6f483508fb1b9 Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 2 Dec 2022 21:37:56 +0500 Subject: [PATCH 03/28] Add README with small amount of documentation --- README.md | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..f250cd2 --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +# zkv + +Simple key-value store for single-user applications. + +## Pros + +* Simple file structure +* Internal compression +* Threadsafe operations through `sync.RWMutex` + +## Cons + +* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) +* Need to read the whole file on store open to create file index +* No way to recover disk space from deleted records +* Write/Delete operations block Read and each other operations + +## Usage + +Create or open existing file: + +```go +db, err := Open("path to file") +``` + +Data operations: + +```go +// Write data +err = db.Set(key, value) // key and value can be any of type + +// Read data +var value ValueType +err = db.Get(key) + +// Delete data +err = db.Delete(key) +``` + +## File structure + +Record is `encoding/gob` structure: + +| Field | Description | Size | +| ---------- | ---------------------------------- | -------- | +| Type | Record type | uint8 | +| KeyHash | Key hash | 28 bytes | +| ValueBytes | Value gob-encoded bytes | variable | + +File is log stuctured list of commands: + +| Field | Description | Size | +| -------| ------------------------ | -------- | +| Length | Record body bytes length | int64 | +| Body | Gob-encoded record | variable | From 350634de38c1afd514292b89be1e8037da3e5cea Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:40:36 +0500 Subject: [PATCH 04/28] Add option for limit a number of parallel reads --- defaults.go | 5 +++++ options.go | 13 +++++++++++++ zkv.go | 32 ++++++++++++++++++++++++++------ 3 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 defaults.go create mode 100644 options.go diff --git a/defaults.go b/defaults.go new file mode 100644 index 0000000..2e3437f --- /dev/null +++ b/defaults.go @@ -0,0 +1,5 @@ +package zkv + +var defaultOptions = Options{ + MaxParallelReads: 64, +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..7105683 --- /dev/null +++ b/options.go @@ -0,0 +1,13 @@ +package zkv + +type Options struct { + MaxParallelReads uint +} + +func (o *Options) Validate() error { + if o.MaxParallelReads == 0 { + o.MaxParallelReads = defaultOptions.MaxParallelReads + } + + return nil +} diff --git a/zkv.go b/zkv.go index 4df780b..f12364c 100644 --- a/zkv.go +++ b/zkv.go @@ -18,6 +18,10 @@ type Database struct { filePath string offset int64 + options Options + + readOrderChan chan struct{} + mu sync.RWMutex } @@ -63,6 +67,9 @@ func (db *Database) Get(key, value interface{}) error { db.mu.RLock() defer db.mu.RUnlock() + db.readOrderChan <- struct{}{} + defer func() { <-db.readOrderChan }() + hashToFind, err := hashInterface(key) if err != nil { return err @@ -102,23 +109,31 @@ func (db *Database) Get(key, value interface{}) error { return decode(record.ValueBytes, value) } -func Open(filePath string) (*Database, error) { +func OpenWithOptions(filePath string, options Options) (*Database, error) { f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) } + if options.Validate() != nil { + return nil, err + } + compressor, err := zstd.NewWriter(f) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath} + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -130,6 +145,7 @@ func Open(filePath string) (*Database, error) { decompressor, err := zstd.NewReader(readF) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) } defer decompressor.Close() @@ -158,6 +174,10 @@ func Open(filePath string) (*Database, error) { return database, nil } +func Open(filePath string) (*Database, error) { + return OpenWithOptions(filePath, defaultOptions) +} + func (db *Database) Delete(key interface{}) error { db.mu.Lock() defer db.mu.Unlock() From 06a429ae4c1940093982360b4d9f86026bd831f8 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:41:11 +0500 Subject: [PATCH 05/28] Add some notes --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f250cd2..9b64c19 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ Simple key-value store for single-user applications. ## Pros -* Simple file structure -* Internal compression +* Simple one file structure +* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons From 8ab9e96ef662addb87522840fb1d2481bac3a754 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:55:42 +0500 Subject: [PATCH 06/28] Add compression level option --- defaults.go | 3 +++ options.go | 12 ++++++++++-- zkv.go | 6 ++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/defaults.go b/defaults.go index 2e3437f..959736c 100644 --- a/defaults.go +++ b/defaults.go @@ -1,5 +1,8 @@ package zkv +import "github.com/klauspost/compress/zstd" + var defaultOptions = Options{ MaxParallelReads: 64, + CompressionLevel: zstd.SpeedDefault, } diff --git a/options.go b/options.go index 7105683..dbbd27c 100644 --- a/options.go +++ b/options.go @@ -1,13 +1,21 @@ package zkv +import "github.com/klauspost/compress/zstd" + type Options struct { + // Maximum number of concurrent reads MaxParallelReads uint + + // Compression level + CompressionLevel zstd.EncoderLevel } -func (o *Options) Validate() error { +func (o *Options) setDefaults() { if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } - return nil + if o.CompressionLevel == 0 { + o.CompressionLevel = defaultOptions.CompressionLevel + } } diff --git a/zkv.go b/zkv.go index f12364c..1dbc52b 100644 --- a/zkv.go +++ b/zkv.go @@ -110,16 +110,14 @@ func (db *Database) Get(key, value interface{}) error { } func OpenWithOptions(filePath string, options Options) (*Database, error) { + options.setDefaults() + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { f.Close() return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) } - if options.Validate() != nil { - return nil, err - } - compressor, err := zstd.NewWriter(f) if err != nil { f.Close() From 8ef0fd240b740c355c13337a5d158bded0f37ee1 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 20:57:50 +0500 Subject: [PATCH 07/28] Update README.md --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9b64c19..0d39f1e 100644 --- a/README.md +++ b/README.md @@ -31,12 +31,19 @@ err = db.Set(key, value) // key and value can be any of type // Read data var value ValueType -err = db.Get(key) +err = db.Get(key, &value) // Delete data err = db.Delete(key) ``` +Other methods: + +```go +// Flush data to disk +err = db.Flush() +``` + ## File structure Record is `encoding/gob` structure: From 56c9dcce00d88bc0bd78ebaa70e038036e0c567c Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 20:59:17 +0500 Subject: [PATCH 08/28] Implement write buffer --- defaults.go | 9 +- options.go | 5 +- zkv.go | 290 +++++++++++++++++++++++++++++++++------------------- zkv_test.go | 95 ++++++++++++++++- 4 files changed, 288 insertions(+), 111 deletions(-) diff --git a/defaults.go b/defaults.go index 959736c..ad476f8 100644 --- a/defaults.go +++ b/defaults.go @@ -1,8 +1,13 @@ package zkv -import "github.com/klauspost/compress/zstd" +import ( + "runtime" + + "github.com/klauspost/compress/zstd" +) var defaultOptions = Options{ - MaxParallelReads: 64, + MaxParallelReads: runtime.NumCPU(), CompressionLevel: zstd.SpeedDefault, + BufferSize: 4 * 1024 * 1024, } diff --git a/options.go b/options.go index dbbd27c..69b3c28 100644 --- a/options.go +++ b/options.go @@ -4,10 +4,13 @@ import "github.com/klauspost/compress/zstd" type Options struct { // Maximum number of concurrent reads - MaxParallelReads uint + MaxParallelReads int // Compression level CompressionLevel zstd.EncoderLevel + + // Write buffer size in bytes + BufferSize int } func (o *Options) setDefaults() { diff --git a/zkv.go b/zkv.go index 1dbc52b..7cc346d 100644 --- a/zkv.go +++ b/zkv.go @@ -11,12 +11,16 @@ import ( "github.com/klauspost/compress/zstd" ) -type Database struct { +type Store struct { dataOffset map[string]int64 - file *os.File - compressor *zstd.Encoder - filePath string - offset int64 + + file *os.File + filePath string + offset int64 + encoder *zstd.Encoder + + buffer *bytes.Buffer + bufferDataOffset map[string]int64 options Options @@ -25,91 +29,7 @@ type Database struct { mu sync.RWMutex } -func (db *Database) Close() error { - db.mu.Lock() - defer db.mu.Unlock() - - err := db.compressor.Close() - if err != nil { - return err - } - - return db.file.Close() -} - -func (db *Database) Set(key, value interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() - - record, err := newRecord(RecordTypeSet, key, value) - if err != nil { - return err - } - - b, err := record.Marshal() - if err != nil { - return err - } - - db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки - - _, err = db.compressor.Write(b) - if err != nil { - return err - } - - db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки - - return nil -} - -func (db *Database) Get(key, value interface{}) error { - db.mu.RLock() - defer db.mu.RUnlock() - - db.readOrderChan <- struct{}{} - defer func() { <-db.readOrderChan }() - - hashToFind, err := hashInterface(key) - if err != nil { - return err - } - - offset, exists := db.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(db.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) - if err != nil { - return err - } - - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку - } - - return decode(record.ValueBytes, value) -} - -func OpenWithOptions(filePath string, options Options) (*Database, error) { +func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) @@ -124,14 +44,16 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) { return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } - database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath, - options: options, - readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} + database := &Store{ + dataOffset: make(map[string]int64), + bufferDataOffset: make(map[string]int64), + offset: 0, + file: f, + encoder: compressor, + buffer: new(bytes.Buffer), + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -172,13 +94,27 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) { return database, nil } -func Open(filePath string) (*Database, error) { +func Open(filePath string) (*Store, error) { return OpenWithOptions(filePath, defaultOptions) } -func (db *Database) Delete(key interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() +func (s *Store) Set(key, value interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.set(key, value) +} + +func (s *Store) Get(key, value interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.get(key, value) +} + +func (s *Store) Delete(key interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() keyHash, err := hashInterface(key) if err != nil { @@ -195,14 +131,158 @@ func (db *Database) Delete(key interface{}) error { return err } - delete(db.dataOffset, string(record.KeyHash[:])) + delete(s.dataOffset, string(record.KeyHash[:])) + delete(s.bufferDataOffset, string(record.KeyHash[:])) - _, err = db.compressor.Write(b) + _, err = s.buffer.Write(b) if err != nil { return err } - db.offset += int64(len(b)) + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.flush() +} + +func (s *Store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + err = s.encoder.Close() + if err != nil { + return err + } + + return s.file.Close() +} + +func (s *Store) set(key, value interface{}) error { + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) get(key, value interface{}) error { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := s.bufferDataOffset[string(hashToFind[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err = skip(reader, offset) + if err != nil { + return err + } + + _, record, err := readRecord(reader) + if err != nil { + return err + } + + return decode(record.ValueBytes, value) + } + + offset, exists = s.dataOffset[string(hashToFind[:])] + if !exists { + return ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) +} + +func (s *Store) flush() error { + l := int64(s.buffer.Len()) + + _, err := s.buffer.WriteTo(s.encoder) + if err != nil { + return err + } + + for key, val := range s.bufferDataOffset { + s.dataOffset[key] = val + s.offset + } + + s.bufferDataOffset = make(map[string]int64) + + s.offset += l + + err = s.encoder.Flush() + if err != nil { + return err + } return nil } diff --git a/zkv_test.go b/zkv_test.go index 36f6aac..a497a3c 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -20,7 +20,16 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } err = db.Close() assert.NoError(t, err) @@ -91,12 +100,15 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount-1) + var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -110,6 +122,8 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.bufferDataOffset, 0) + value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -118,3 +132,78 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } + +func TestBufferBasic(t *testing.T) { + const filePath = "TestBuffer.zkv" + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + err = db.Set(1, make([]byte, 100)) + assert.NoError(t, err) + + assert.NotEqual(t, 0, db.dataOffset) + assert.Len(t, db.bufferDataOffset, 0) + assert.Equal(t, 0, db.buffer.Len()) + + var gotValue []byte + err = db.Get(1, &gotValue) + assert.NoError(t, err) + + assert.Equal(t, make([]byte, 100), gotValue) + + err = db.Close() + assert.NoError(t, err) +} + +func TestBufferRead(t *testing.T) { + const filePath = "TestBufferRead.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +} From aa1a2edda6ed00969769d91397c022cae71c6f23 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 21:03:27 +0500 Subject: [PATCH 09/28] Add TODOs --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 0d39f1e..0a85958 100644 --- a/README.md +++ b/README.md @@ -60,3 +60,9 @@ File is log stuctured list of commands: | -------| ------------------------ | -------- | | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | + +## TODO + +- [ ] Implement `Copy()` method to copy store without deleted records +- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) +- [ ] Implement optional separate index file to speedup store initialization From 20a99d9f35355b79daef99dfa68ec63a8ca7f9f2 Mon Sep 17 00:00:00 2001 From: nxshock Date: Mon, 5 Dec 2022 21:24:47 +0500 Subject: [PATCH 10/28] Translate messages to english --- zkv.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zkv.go b/zkv.go index 7cc346d..e207e3b 100644 --- a/zkv.go +++ b/zkv.go @@ -35,13 +35,13 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) + return nil, fmt.Errorf("open store file: %v", err) } compressor, err := zstd.NewWriter(f) if err != nil { f.Close() - return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) + return nil, fmt.Errorf("compressor initialization: %v", err) } database := &Store{ @@ -59,14 +59,14 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { readF, err := os.Open(filePath) if err != nil { f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) + return nil, fmt.Errorf("open file for indexing: %v", err) } defer readF.Close() decompressor, err := zstd.NewReader(readF) if err != nil { f.Close() - return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) + return nil, fmt.Errorf("decompressor initialization: %v", err) } defer decompressor.Close() @@ -78,7 +78,7 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { } if err != nil { f.Close() - return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) + return nil, fmt.Errorf("read record error: %v", err) } switch record.Type { From 9f116ad35eaf060c6c0c870a2a3f72a34830bdfe Mon Sep 17 00:00:00 2001 From: nxshock Date: Mon, 5 Dec 2022 21:26:54 +0500 Subject: [PATCH 11/28] Add Backup() method --- README.md | 5 +- record.go | 19 +++++--- zkv.go | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++- zkv_test.go | 67 +++++++++++++++++++++++++++ 4 files changed, 211 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 0a85958..12eed2b 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,9 @@ Other methods: ```go // Flush data to disk err = db.Flush() + +// Backup data to another file +err = db.Backup("new/file/path") ``` ## File structure @@ -63,6 +66,6 @@ File is log stuctured list of commands: ## TODO -- [ ] Implement `Copy()` method to copy store without deleted records +- [ ] Add delete records test for `Backup()` method - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Implement optional separate index file to speedup store initialization diff --git a/record.go b/record.go index 85a8c2b..0c3c941 100644 --- a/record.go +++ b/record.go @@ -2,6 +2,7 @@ package zkv import ( "bytes" + "crypto/sha256" "encoding/binary" "encoding/gob" "io" @@ -20,8 +21,17 @@ type Record struct { ValueBytes []byte } +func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) { + record := &Record{ + Type: recordType, + KeyHash: keyHash, + ValueBytes: valueBytes} + + return record, nil +} + func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { - keyBytes, err := encode(key) + keyHash, err := hashInterface(key) if err != nil { return nil, err } @@ -31,12 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { return nil, err } - record := &Record{ - Type: recordType, - KeyHash: hashBytes(keyBytes), - ValueBytes: valueBytes} - - return record, nil + return newRecordBytes(recordType, keyHash, valueBytes) } func (r *Record) Marshal() ([]byte, error) { diff --git a/zkv.go b/zkv.go index e207e3b..d1bedae 100644 --- a/zkv.go +++ b/zkv.go @@ -2,6 +2,7 @@ package zkv import ( "bytes" + "crypto/sha256" "encoding/base64" "fmt" "io" @@ -157,6 +158,43 @@ func (s *Store) Flush() error { return s.flush() } +func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + newStore, err := OpenWithOptions(filePath, newFileOptions) + if err != nil { + return err + } + + for keyHashStr := range s.dataOffset { + var keyHash [sha256.Size224]byte + copy(keyHash[:], keyHashStr) + + valueBytes, err := s.getGobBytes(keyHash) + if err != nil { + newStore.Close() + return err + } + err = newStore.setBytes(keyHash, valueBytes) + if err != nil { + newStore.Close() + return err + } + } + + return newStore.Close() +} + +func (s *Store) Backup(filePath string) error { + return s.BackupWithOptions(filePath, defaultOptions) +} + func (s *Store) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -174,6 +212,35 @@ func (s *Store) Close() error { return s.file.Close() } +func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { + record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + func (s *Store) set(key, value interface{}) error { record, err := newRecord(RecordTypeSet, key, value) if err != nil { @@ -203,6 +270,64 @@ func (s *Store) set(key, value interface{}) error { return nil } +func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + offset, exists := s.bufferDataOffset[string(keyHash[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err := skip(reader, offset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(reader) + if err != nil { + return nil, err + } + + return record.ValueBytes, nil + } + + offset, exists = s.dataOffset[string(keyHash[:])] + if !exists { + return nil, ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return nil, err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return nil, err + } + + if !bytes.Equal(record.KeyHash[:], keyHash[:]) { + expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:]) + gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) + return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) + } + + return record.ValueBytes, nil + +} + func (s *Store) get(key, value interface{}) error { s.readOrderChan <- struct{}{} defer func() { <-s.readOrderChan }() @@ -257,7 +382,9 @@ func (s *Store) get(key, value interface{}) error { } if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку + expectedHashStr := base64.StdEncoding.EncodeToString(hashToFind[:]) + gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) + return fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) } return decode(record.ValueBytes, value) diff --git a/zkv_test.go b/zkv_test.go index a497a3c..1fc3983 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,12 +1,40 @@ package zkv import ( + "bytes" "os" "testing" "github.com/stretchr/testify/assert" ) +func TestRecord(t *testing.T) { + buf := new(bytes.Buffer) + + var records []Record + + for i := 0; i < 10; i++ { + record, err := newRecord(RecordTypeSet, i, i) + assert.NoError(t, err) + + records = append(records, *record) + + b, err := record.Marshal() + assert.NoError(t, err) + + _, err = buf.Write(b) + assert.NoError(t, err) + } + + for i := 0; i < 10; i++ { + _, record, err := readRecord(buf) + assert.NoError(t, err) + + assert.Equal(t, record.KeyHash, records[i].KeyHash) + assert.Equal(t, record.ValueBytes, records[i].ValueBytes) + } +} + func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 @@ -207,3 +235,42 @@ func TestBufferRead(t *testing.T) { assert.NoError(t, err) } + +func TestBackupBasic(t *testing.T) { + const filePath = "TestBackupBasic.zkv" + const newFilePath = "TestBackupBasic2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(newFilePath) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +} From 80540a662a218299ec74bba7212f69428a7f1504 Mon Sep 17 00:00:00 2001 From: nxshock Date: Wed, 7 Dec 2022 19:20:38 +0500 Subject: [PATCH 12/28] Add test for backup with deleted records --- README.md | 1 - zkv_test.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 12eed2b..b66f815 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,5 @@ File is log stuctured list of commands: ## TODO -- [ ] Add delete records test for `Backup()` method - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Implement optional separate index file to speedup store initialization diff --git a/zkv_test.go b/zkv_test.go index 1fc3983..a252f9d 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -274,3 +274,58 @@ func TestBackupBasic(t *testing.T) { assert.NoError(t, err) } + +func TestBackupWithDeletedRecords(t *testing.T) { + const filePath = "TestBackupWithDeletedRecords.zkv" + const newFilePath = "TestBackupWithDeletedRecords2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(newFilePath) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Flush() + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + if i%2 == 1 { + continue + } + + err = db.Delete(i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount/2) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + if i%2 == 0 { + assert.ErrorIs(t, err, ErrNotExists) + } else { + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + } + + err = db.Close() + assert.NoError(t, err) + +} From e166e07daaa21ced22f35fcb387dc8fd6bc13d63 Mon Sep 17 00:00:00 2001 From: nxshock Date: Wed, 7 Dec 2022 21:06:36 +0500 Subject: [PATCH 13/28] Do not block file for writes --- README.md | 1 + zkv.go | 52 ++++++++++++++++++++++++---------------------------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index b66f815..0622264 100644 --- a/README.md +++ b/README.md @@ -68,3 +68,4 @@ File is log stuctured list of commands: - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Implement optional separate index file to speedup store initialization +- [ ] Add recovery previous state of store file on write error diff --git a/zkv.go b/zkv.go index d1bedae..1bbd4b4 100644 --- a/zkv.go +++ b/zkv.go @@ -15,10 +15,8 @@ import ( type Store struct { dataOffset map[string]int64 - file *os.File filePath string offset int64 - encoder *zstd.Encoder buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -33,24 +31,10 @@ type Store struct { func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() - f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - f.Close() - return nil, fmt.Errorf("open store file: %v", err) - } - - compressor, err := zstd.NewWriter(f) - if err != nil { - f.Close() - return nil, fmt.Errorf("compressor initialization: %v", err) - } - database := &Store{ dataOffset: make(map[string]int64), bufferDataOffset: make(map[string]int64), offset: 0, - file: f, - encoder: compressor, buffer: new(bytes.Buffer), filePath: filePath, options: options, @@ -58,15 +42,16 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { // restore file data readF, err := os.Open(filePath) - if err != nil { - f.Close() + if os.IsNotExist(err) { + // Empty datebase + return database, nil + } else if err != nil { return nil, fmt.Errorf("open file for indexing: %v", err) } defer readF.Close() decompressor, err := zstd.NewReader(readF) if err != nil { - f.Close() return nil, fmt.Errorf("decompressor initialization: %v", err) } defer decompressor.Close() @@ -78,7 +63,6 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { break } if err != nil { - f.Close() return nil, fmt.Errorf("read record error: %v", err) } @@ -204,12 +188,7 @@ func (s *Store) Close() error { return err } - err = s.encoder.Close() - if err != nil { - return err - } - - return s.file.Close() + return nil } func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { @@ -393,7 +372,18 @@ func (s *Store) get(key, value interface{}) error { func (s *Store) flush() error { l := int64(s.buffer.Len()) - _, err := s.buffer.WriteTo(s.encoder) + f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open store file: %v", err) + } + + encoder, err := zstd.NewWriter(f, zstd.WithEncoderLevel(s.options.CompressionLevel)) + if err != nil { + f.Close() + return fmt.Errorf("open store file: %v", err) + } + + _, err = s.buffer.WriteTo(encoder) if err != nil { return err } @@ -406,7 +396,13 @@ func (s *Store) flush() error { s.offset += l - err = s.encoder.Flush() + err = encoder.Close() + if err != nil { + // TODO: truncate file to previous state + return err + } + + err = f.Close() if err != nil { return err } From 8dfc73af1d58f2a9e3fecf6e2a5623b78f6b68e2 Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 9 Dec 2022 20:05:30 +0500 Subject: [PATCH 14/28] Speedup small writes by using write buffer --- defaults.go | 3 ++- options.go | 7 +++++-- zkv.go | 17 +++++++++++++---- zkv_test.go | 4 ++-- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/defaults.go b/defaults.go index ad476f8..f22add0 100644 --- a/defaults.go +++ b/defaults.go @@ -9,5 +9,6 @@ import ( var defaultOptions = Options{ MaxParallelReads: runtime.NumCPU(), CompressionLevel: zstd.SpeedDefault, - BufferSize: 4 * 1024 * 1024, + MemoryBufferSize: 4 * 1024 * 1024, + DiskBufferSize: 1 * 1024 * 1024, } diff --git a/options.go b/options.go index 69b3c28..1471014 100644 --- a/options.go +++ b/options.go @@ -9,8 +9,11 @@ type Options struct { // Compression level CompressionLevel zstd.EncoderLevel - // Write buffer size in bytes - BufferSize int + // Memory write buffer size in bytes + MemoryBufferSize int + + // Diwk write buffer size in bytes + DiskBufferSize int } func (o *Options) setDefaults() { diff --git a/zkv.go b/zkv.go index 1bbd4b4..11d2cee 100644 --- a/zkv.go +++ b/zkv.go @@ -1,6 +1,7 @@ package zkv import ( + "bufio" "bytes" "crypto/sha256" "encoding/base64" @@ -124,7 +125,7 @@ func (s *Store) Delete(key interface{}) error { return err } - if s.buffer.Len() > s.options.BufferSize { + if s.buffer.Len() > s.options.MemoryBufferSize { err = s.flush() if err != nil { @@ -209,7 +210,7 @@ func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error return err } - if s.buffer.Len() > s.options.BufferSize { + if s.buffer.Len() > s.options.MemoryBufferSize { err = s.flush() if err != nil { @@ -238,7 +239,7 @@ func (s *Store) set(key, value interface{}) error { return err } - if s.buffer.Len() > s.options.BufferSize { + if s.buffer.Len() > s.options.MemoryBufferSize { err = s.flush() if err != nil { @@ -377,7 +378,9 @@ func (s *Store) flush() error { return fmt.Errorf("open store file: %v", err) } - encoder, err := zstd.NewWriter(f, zstd.WithEncoderLevel(s.options.CompressionLevel)) + diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) + + encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) if err != nil { f.Close() return fmt.Errorf("open store file: %v", err) @@ -402,6 +405,12 @@ func (s *Store) flush() error { return err } + err = diskWriteBuffer.Flush() + if err != nil { + // TODO: truncate file to previous state + return err + } + err = f.Close() if err != nil { return err diff --git a/zkv_test.go b/zkv_test.go index a252f9d..2b8005f 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -165,7 +165,7 @@ func TestBufferBasic(t *testing.T) { const filePath = "TestBuffer.zkv" defer os.Remove(filePath) - db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) err = db.Set(1, make([]byte, 100)) @@ -190,7 +190,7 @@ func TestBufferRead(t *testing.T) { const recordCount = 100 defer os.Remove(filePath) - db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) for i := 1; i <= recordCount; i++ { From 23ee15dc23b58bb149b05641a3a1f6aa6eacca80 Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 9 Dec 2022 20:05:57 +0500 Subject: [PATCH 15/28] Fix usage example --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0622264..efeaeeb 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Simple key-value store for single-user applications. Create or open existing file: ```go -db, err := Open("path to file") +db, err := zkv.Open("path to file") ``` Data operations: From f093b7feed36a3d267233d7af9da79aca57420dc Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 9 Dec 2022 20:06:52 +0500 Subject: [PATCH 16/28] Remove seekable zstd streams note Looks like this library does not support chunked zstd streams --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index efeaeeb..416d4d0 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,5 @@ File is log stuctured list of commands: ## TODO -- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Implement optional separate index file to speedup store initialization - [ ] Add recovery previous state of store file on write error From fe90a5532291bfe6b9f83c3b86c6a032bc74dc16 Mon Sep 17 00:00:00 2001 From: nxshock Date: Fri, 9 Dec 2022 20:28:24 +0500 Subject: [PATCH 17/28] Add note about memory consumption --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 416d4d0..4db5c20 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Simple key-value store for single-user applications. ## Cons -* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) +* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) - average 200-250 Mb of RAM per 1M keys * Need to read the whole file on store open to create file index * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations From 533eddaed4a2504a0a68387d8df296598008e1b1 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 10 Dec 2022 21:34:16 +0500 Subject: [PATCH 18/28] Fix typo --- options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/options.go b/options.go index 1471014..103fe4e 100644 --- a/options.go +++ b/options.go @@ -12,7 +12,7 @@ type Options struct { // Memory write buffer size in bytes MemoryBufferSize int - // Diwk write buffer size in bytes + // Disk write buffer size in bytes DiskBufferSize int } From 82a36a1b9eb7aa87926df69b6a88bf7360804d94 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 10 Dec 2022 21:39:24 +0500 Subject: [PATCH 19/28] Add separate index file option --- README.md | 25 +++++++++++++++++++++++-- defaults.go | 3 +++ options.go | 11 +++++++++++ zkv.go | 29 ++++++++++++++++++++++++++++- zkv_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 111 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4db5c20..f27c5e2 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Simple key-value store for single-user applications. ## Cons * Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) - average 200-250 Mb of RAM per 1M keys -* Need to read the whole file on store open to create file index +* Need to read the whole file on store open to create file index (you can use index file options to avoid this) * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations @@ -47,6 +47,28 @@ err = db.Flush() err = db.Backup("new/file/path") ``` +## Store options + +```go +type Options struct { + // Maximum number of concurrent reads + MaxParallelReads int + + // Compression level + CompressionLevel zstd.EncoderLevel + + // Memory write buffer size in bytes + MemoryBufferSize int + + // Disk write buffer size in bytes + DiskBufferSize int + + // Use index file + UseIndexFile bool +} + +``` + ## File structure Record is `encoding/gob` structure: @@ -66,5 +88,4 @@ File is log stuctured list of commands: ## TODO -- [ ] Implement optional separate index file to speedup store initialization - [ ] Add recovery previous state of store file on write error diff --git a/defaults.go b/defaults.go index f22add0..d263685 100644 --- a/defaults.go +++ b/defaults.go @@ -11,4 +11,7 @@ var defaultOptions = Options{ CompressionLevel: zstd.SpeedDefault, MemoryBufferSize: 4 * 1024 * 1024, DiskBufferSize: 1 * 1024 * 1024, + UseIndexFile: false, } + +const indexFileExt = ".idx" diff --git a/options.go b/options.go index 103fe4e..f42f14f 100644 --- a/options.go +++ b/options.go @@ -14,6 +14,9 @@ type Options struct { // Disk write buffer size in bytes DiskBufferSize int + + // Use index file + UseIndexFile bool } func (o *Options) setDefaults() { @@ -24,4 +27,12 @@ func (o *Options) setDefaults() { if o.CompressionLevel == 0 { o.CompressionLevel = defaultOptions.CompressionLevel } + + if o.MemoryBufferSize == 0 { + o.MemoryBufferSize = defaultOptions.MemoryBufferSize + } + + if o.DiskBufferSize == 0 { + o.DiskBufferSize = defaultOptions.DiskBufferSize + } } diff --git a/zkv.go b/zkv.go index 11d2cee..4706c28 100644 --- a/zkv.go +++ b/zkv.go @@ -5,6 +5,7 @@ import ( "bytes" "crypto/sha256" "encoding/base64" + "encoding/gob" "fmt" "io" "os" @@ -41,6 +42,16 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { options: options, readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} + if options.UseIndexFile { + idxFile, err := os.Open(filePath + indexFileExt) + if err == nil { + err = gob.NewDecoder(idxFile).Decode(&database.dataOffset) + if err == nil { + return database, nil + } + } + } + // restore file data readF, err := os.Open(filePath) if os.IsNotExist(err) { @@ -81,7 +92,8 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { } func Open(filePath string) (*Store, error) { - return OpenWithOptions(filePath, defaultOptions) + options := defaultOptions + return OpenWithOptions(filePath, options) } func (s *Store) Set(key, value interface{}) error { @@ -416,5 +428,20 @@ func (s *Store) flush() error { return err } + // Update index file only on data update + if s.options.UseIndexFile && l > 0 { + idxBuf := new(bytes.Buffer) + + err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) + if err != nil { + return err + } + + err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644) + if err != nil { + return err + } + } + return nil } diff --git a/zkv_test.go b/zkv_test.go index 2b8005f..b10a2a2 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -327,5 +327,50 @@ func TestBackupWithDeletedRecords(t *testing.T) { err = db.Close() assert.NoError(t, err) - +} + +func TestIndexFileBasic(t *testing.T) { + const filePath = "TestReadWriteBasic.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + db, err := OpenWithOptions(filePath, Options{UseIndexFile: true}) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = OpenWithOptions(filePath, Options{UseIndexFile: true}) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) } From 28f43e56d5abc320dfeaf00294bd6433fdfe6133 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 10 Dec 2022 22:00:08 +0500 Subject: [PATCH 20/28] Add resource consumption block --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index f27c5e2..cfa473f 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,13 @@ File is log stuctured list of commands: | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | +## Resource consumption + +Store requirements: + +* around 300 Mb of RAM per 1 million of keys +* around 34 Mb of disk space for index file per 1 million of keys + ## TODO - [ ] Add recovery previous state of store file on write error From 0458ac515222c3c841b3370dff03e89565042a18 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 10 Dec 2022 22:00:42 +0500 Subject: [PATCH 21/28] Add info about read value issue --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cfa473f..5ed2b58 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,11 @@ Simple key-value store for single-user applications. ## Cons -* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) - average 200-250 Mb of RAM per 1M keys +* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) * Need to read the whole file on store open to create file index (you can use index file options to avoid this) * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations +* Need to decode whole file until stored value ## Usage @@ -96,3 +97,4 @@ Store requirements: ## TODO - [ ] Add recovery previous state of store file on write error +- [ ] Add fast file seek to value (add compressed block start position) From 412ddb11a8f7be09e4f09f920312a8283d3e5414 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 18:16:23 +0500 Subject: [PATCH 22/28] Remove duplicated code --- zkv.go | 49 ++----------------------------------------------- 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/zkv.go b/zkv.go index 4706c28..de1263d 100644 --- a/zkv.go +++ b/zkv.go @@ -329,57 +329,12 @@ func (s *Store) get(key, value interface{}) error { return err } - offset, exists := s.bufferDataOffset[string(hashToFind[:])] - if exists { - reader := bytes.NewReader(s.buffer.Bytes()) - - err = skip(reader, offset) - if err != nil { - return err - } - - _, record, err := readRecord(reader) - if err != nil { - return err - } - - return decode(record.ValueBytes, value) - } - - offset, exists = s.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(s.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) + b, err := s.getGobBytes(hashToFind) if err != nil { return err } - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - expectedHashStr := base64.StdEncoding.EncodeToString(hashToFind[:]) - gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) - return fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) - } - - return decode(record.ValueBytes, value) + return decode(b, value) } func (s *Store) flush() error { From 5f0d33828f6b611cb8bcdfbfcabedb0981937ef7 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 21:00:36 +0500 Subject: [PATCH 23/28] Improve store read speed by skipping store blocks --- TestBufferRead.zkv.idx | Bin 0 -> 131 bytes TestSmallWrites.zkv.idx | Bin 0 -> 3465 bytes defaults.go | 2 +- options.go | 4 +++- zkv.go | 38 ++++++++++++++++++++++++-------------- zkv_test.go | 15 ++++++++++++--- 6 files changed, 40 insertions(+), 19 deletions(-) create mode 100644 TestBufferRead.zkv.idx create mode 100644 TestSmallWrites.zkv.idx diff --git a/TestBufferRead.zkv.idx b/TestBufferRead.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..ced4e784f1da329060e7333b77539b4bdd131895 GIT binary patch literal 131 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr1_qb^9Slq|6TYa57L>+(+4J+>Y^Xrv#WBkYHzyJUezb&5t literal 0 HcmV?d00001 diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..15a6d389fca5b0bafaacc87682cfb2e099a9f8ad GIT binary patch literal 3465 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr28MsU9{)QSQe^V#`j-nt@beyD^5?)k^EZoISG`s^z~|w(VEWV|#(x=47-W>L zTKuZdbq{~?aBhJ{SVMNP>}|nK$4uvb$+S7m_^+UeLB^lse|ONP^bIG|?>_y`knwM= z81E~Y2vupV*v~5&|CKf{$n<)+Mf}vA7bc*!beG-g3(q?*s9OG+@$669AtMpSe*zf{ zGX6>3A4<|E|4)@kF1mlqy)*T5Wm=QZzYB>G$w7?&?9VaC1ess?HM9Omr27r!1ob!j zSL-Yiy!z73(r+&FZ3)JIybl;;7AIvsOkolVT(>ge^b6g|FDCbtAIc7!X&1Nh-9E;D zWrrAK@)9RR7r%6yV|QeA!7F*rPW9Yu1HtIBIhu@q#~A-9Y+#VFd31Z-v`s(q_IFp8 zJl@ct9#H@JQPzg4%nshYIgI~u)-lKwm*-x5*?y}2s@12UkNUsmWJRWEi94-#JnU{f zf$?AZ5(XKSD`KtuXN%k|HY{-~7QE8+yrJNvhFib#U(@nj#(xzY3^GBc3&hL|cLi?M zd|{<^(YEAUBD;_4f#rfNfjbv6{xdzsAT!G)txWdR@AFH)Ol@DzWw~nV$HJSlPo3_4 z>TU1B_|N|ygN#?P+GM?s+KNm!e#ig%PmXyhND9s=a=r8Mm-7wAf2z|MWKyFg&TehC z-5xzjF7ZgN(e2Eee^M;9W27yHS|1cWfD{PnkE)> z&&+JIIL7!db{2z-X6bS5g2j3feA4bG=HyOTaJ_biBT=o|N=iNrmBK#_@;CUw?Yf_%Gjz zK}NClgWxqzofmx)sgBRK2o#`T^s=m=Fe;V_Q5I{a7v)_}k(3&B?8er*}L$U827C zKhJaF4q3*37Jdve@+q2+WY51io*^eAc*EpL&W@wW7ux2v#TOlC3ugSM>Bk_mhvCrf zr$46s?=mo260h})KYNkxvkT8l)EPsBR2l!J8ZpScjoM=Uw^;f_l34fatg8adO!pR- zil(F{v3jUFGXBdhVUS^zU-Mh&y;t&FC;9&KdB1msM89EOm)~Q`oOfvlpsYEcy zOuc#ke7V~C$&R+PEG(z-Kx(*o6)IUl~)#?E21S5d zTYQh3H0487wEioWFvfpYrx;|mN%(y{`CsecKDp^-iQX=Y?{Du}|N7Ro6^wr+dKmv1 zaWTlq_i^6H?T;+K#=UMncZbub7ZNAZY+B~>&D?+HDC55b2?m+`@-CB%1&be^I=xJN z;g0@IJa3m7?C0#iwt{W)KgNIDaSSrjDiva4vrep(o|U!f?`j!-w%iTrcDAh-UN_y@ z%J|QQhe5_Jl-n)yF&obnMQIz|`<+v^-RWO#Z6vVh5}o^fhw9QxH<=mtiZ&bWN%v>`=edDFM!PlhnW9do zsCHc<@07bwW~fZbxq2tDXydWAy-bY%#Gf(9@V;cYw8~61tH~jQO=-(g&rADGFS9ts zki9ePhcn|pzc~yt?bGxUmYti+P|Bgn!ho`~Fn(?2U4uj0rhnsmLeh53a+pO<+dh@cASNq!C zF$PEbGym$pXZ**nz##Ljd234O=@rb?QtQvA%P3peZQijv_3?`rlbZI{GX9fFVUStz z^P-sVtFL;NxjQR8{aBt)S-Weg+uEaniJvYnXZ#m=hCxPx>#z8{I|)tO;tKj!#AsGM zeY)Oh;?B3>zGn9G82|CiV~~k|8a(+0cN)(LdoBNjK)vs$_CL&DU~5vdGQC}n@t>py zgG`}~r>R)qbV&)8?KzT_-|r}v$Gwid{D&dMNBalkzn}~T86F+|p#7o(?n~G67^iLc zv1;0#;O{H6kG4d2Fw9~6SAC8_M#9=7{k@#0<2~yphf>CW z24@&#T)&-I8I>yOyY!5j%%S`G?^JdwSS3!p>a;u0T88mo+zkeqPn(_2MT(p$yzd~; z_NqkYqf~c?!EK{wPc+Xw?qmEHyof>O&v%X8+K-p^Z$ExVPh+kT!)L+Ek9Y1fTmJv7 zhAQJfuQv=bjWXee7gsR9oa23HPjB?J;PdNO=)clF`1+}1$Y;iXfpQEoY?JQ({qsY$ zuJ8PIE|2=RUmtut!QL~;eWzr$L-jQ{j|7-VK2EG(8uT-|RHJ!`^|_DZ9UYO75T zJoRU2aV%fN_)l;RgG_VF$}{D6N>m`e0FVZ`ZUpx+o#OqpLd+`U$h8= z%$-?BnR#`-8c&oxrpcOImA|Q?u%*y^1*hx>O(({Gg@+hqCSLO1b70+?Nz2uZ_NPjQ zzw7eQFTC%&qFJncZyu_un}-?|e$p z+4s`gRuiMP*e}lvKau%4k@25%6@v^HgTy80l3$s%Y2Vi0y&U8GBj<(PhY=JNDY9rKN>eIGx2etX~YJN3fNI-5()jQ^xpG02oNT`zaa?hiWi zx%hE)b(im=@*+a}ou45^6+@O=G?s4bx>rDq4|5^4h$Xuvc$@^{E zq@;%@*^Q+3Je?WOv{^}|U;XTHc!CT57% z_Ihe}H5`c7SityC=pBPh$?x*$1=kNKyzSaoVeZh_arwsj_at`A^b0!9v z3rzlN&9?qHwaWa>9EWEo57fxbn9OMMoip;>|38fXWS%g{ggToyCOO#WeLE)Ko$&1Y zGpm;l-OCcV%KvW(d&u}t@eYGbRL$A8E23ur79=ur$+RwS3Q&!=@~9T+&m`7cu@5kztUzyk%YbkMOH;aqHt8 zss;R=*4~x*&#Zd=-0Z2kdW`?Lcjnt~NVcW&HikyMv7XnD;OM0LXbj0{{R3 literal 0 HcmV?d00001 diff --git a/defaults.go b/defaults.go index d263685..cb4a086 100644 --- a/defaults.go +++ b/defaults.go @@ -11,7 +11,7 @@ var defaultOptions = Options{ CompressionLevel: zstd.SpeedDefault, MemoryBufferSize: 4 * 1024 * 1024, DiskBufferSize: 1 * 1024 * 1024, - UseIndexFile: false, + useIndexFile: true, } const indexFileExt = ".idx" diff --git a/options.go b/options.go index f42f14f..148992b 100644 --- a/options.go +++ b/options.go @@ -16,10 +16,12 @@ type Options struct { DiskBufferSize int // Use index file - UseIndexFile bool + useIndexFile bool } func (o *Options) setDefaults() { + o.useIndexFile = true // TODO: implement database search without index + if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } diff --git a/zkv.go b/zkv.go index de1263d..1c4d543 100644 --- a/zkv.go +++ b/zkv.go @@ -14,11 +14,15 @@ import ( "github.com/klauspost/compress/zstd" ) +type Offsets struct { + BlockOffset int64 + RecordOffset int64 +} + type Store struct { - dataOffset map[string]int64 + dataOffset map[string]Offsets filePath string - offset int64 buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -34,15 +38,14 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() database := &Store{ - dataOffset: make(map[string]int64), + dataOffset: make(map[string]Offsets), bufferDataOffset: make(map[string]int64), - offset: 0, buffer: new(bytes.Buffer), filePath: filePath, options: options, readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - if options.UseIndexFile { + if options.useIndexFile { idxFile, err := os.Open(filePath + indexFileExt) if err == nil { err = gob.NewDecoder(idxFile).Decode(&database.dataOffset) @@ -80,7 +83,7 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { switch record.Type { case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset + database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset case RecordTypeDelete: delete(database.dataOffset, string(record.KeyHash[:])) } @@ -283,7 +286,7 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { return record.ValueBytes, nil } - offset, exists = s.dataOffset[string(keyHash[:])] + offsets, exists := s.dataOffset[string(keyHash[:])] if !exists { return nil, ErrNotExists } @@ -294,13 +297,18 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } defer readF.Close() + _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) + if err != nil { + return nil, err + } + decompressor, err := zstd.NewReader(readF) if err != nil { return nil, err } defer decompressor.Close() - err = skip(decompressor, offset) + err = skip(decompressor, offsets.RecordOffset) if err != nil { return nil, err } @@ -317,7 +325,6 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } return record.ValueBytes, nil - } func (s *Store) get(key, value interface{}) error { @@ -344,13 +351,18 @@ func (s *Store) flush() error { if err != nil { return fmt.Errorf("open store file: %v", err) } + stat, err := f.Stat() + if err != nil { + f.Close() + return fmt.Errorf("stat store file: %v", err) + } diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) if err != nil { f.Close() - return fmt.Errorf("open store file: %v", err) + return fmt.Errorf("init encoder: %v", err) } _, err = s.buffer.WriteTo(encoder) @@ -359,13 +371,11 @@ func (s *Store) flush() error { } for key, val := range s.bufferDataOffset { - s.dataOffset[key] = val + s.offset + s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} } s.bufferDataOffset = make(map[string]int64) - s.offset += l - err = encoder.Close() if err != nil { // TODO: truncate file to previous state @@ -384,7 +394,7 @@ func (s *Store) flush() error { } // Update index file only on data update - if s.options.UseIndexFile && l > 0 { + if s.options.useIndexFile && l > 0 { idxBuf := new(bytes.Buffer) err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) diff --git a/zkv_test.go b/zkv_test.go index b10a2a2..4ec0d41 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -39,6 +39,7 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -84,6 +85,7 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -119,6 +121,7 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -164,6 +167,7 @@ func TestDeleteBasic(t *testing.T) { func TestBufferBasic(t *testing.T) { const filePath = "TestBuffer.zkv" defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -187,8 +191,9 @@ func TestBufferBasic(t *testing.T) { func TestBufferRead(t *testing.T) { const filePath = "TestBufferRead.zkv" - const recordCount = 100 + const recordCount = 2 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -241,7 +246,9 @@ func TestBackupBasic(t *testing.T) { const newFilePath = "TestBackupBasic2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -280,7 +287,9 @@ func TestBackupWithDeletedRecords(t *testing.T) { const newFilePath = "TestBackupWithDeletedRecords2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -335,7 +344,7 @@ func TestIndexFileBasic(t *testing.T) { defer os.Remove(filePath) defer os.Remove(filePath + indexFileExt) - db, err := OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err := Open(filePath) assert.NoError(t, err) for i := 1; i <= recordCount; i++ { @@ -358,7 +367,7 @@ func TestIndexFileBasic(t *testing.T) { assert.NoError(t, err) // try to read - db, err = OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err = Open(filePath) assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount) From d950b6546c1fba17c5ee980e4235f3d424b7ac98 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 21:33:51 +0500 Subject: [PATCH 24/28] Add notes about current store state --- README.md | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 5ed2b58..4474fec 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,17 @@ Simple key-value store for single-user applications. ## Pros -* Simple one file structure +* Simple two file structure (data file and index file) * Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons * Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) -* Need to read the whole file on store open to create file index (you can use index file options to avoid this) +* No transaction system +* Index file is fully rewrited on every store commit * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations -* Need to decode whole file until stored value ## Usage @@ -63,9 +63,6 @@ type Options struct { // Disk write buffer size in bytes DiskBufferSize int - - // Use index file - UseIndexFile bool } ``` @@ -87,6 +84,17 @@ File is log stuctured list of commands: | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | +Index file is simple gob-encoded map: + +```go +map[string]struct { + BlockOffset int64 + RecordOffset int64 +} +``` + +where map key is data key hash and value - data offset in data file. + ## Resource consumption Store requirements: @@ -97,4 +105,4 @@ Store requirements: ## TODO - [ ] Add recovery previous state of store file on write error -- [ ] Add fast file seek to value (add compressed block start position) +- [ ] Add method for index rebuild From 2791b39d486aff9cdf93c83244fe01e19e8696e8 Mon Sep 17 00:00:00 2001 From: nxshock Date: Mon, 12 Dec 2022 21:01:00 +0500 Subject: [PATCH 25/28] Delete test files --- TestBufferRead.zkv.idx | Bin 131 -> 0 bytes TestSmallWrites.zkv.idx | Bin 3465 -> 0 bytes 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 TestBufferRead.zkv.idx delete mode 100644 TestSmallWrites.zkv.idx diff --git a/TestBufferRead.zkv.idx b/TestBufferRead.zkv.idx deleted file mode 100644 index ced4e784f1da329060e7333b77539b4bdd131895..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 131 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr1_qb^9Slq|6TYa57L>+(+4J+>Y^Xrv#WBkYHzyJUezb&5t diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx deleted file mode 100644 index 15a6d389fca5b0bafaacc87682cfb2e099a9f8ad..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3465 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr28MsU9{)QSQe^V#`j-nt@beyD^5?)k^EZoISG`s^z~|w(VEWV|#(x=47-W>L zTKuZdbq{~?aBhJ{SVMNP>}|nK$4uvb$+S7m_^+UeLB^lse|ONP^bIG|?>_y`knwM= z81E~Y2vupV*v~5&|CKf{$n<)+Mf}vA7bc*!beG-g3(q?*s9OG+@$669AtMpSe*zf{ zGX6>3A4<|E|4)@kF1mlqy)*T5Wm=QZzYB>G$w7?&?9VaC1ess?HM9Omr27r!1ob!j zSL-Yiy!z73(r+&FZ3)JIybl;;7AIvsOkolVT(>ge^b6g|FDCbtAIc7!X&1Nh-9E;D zWrrAK@)9RR7r%6yV|QeA!7F*rPW9Yu1HtIBIhu@q#~A-9Y+#VFd31Z-v`s(q_IFp8 zJl@ct9#H@JQPzg4%nshYIgI~u)-lKwm*-x5*?y}2s@12UkNUsmWJRWEi94-#JnU{f zf$?AZ5(XKSD`KtuXN%k|HY{-~7QE8+yrJNvhFib#U(@nj#(xzY3^GBc3&hL|cLi?M zd|{<^(YEAUBD;_4f#rfNfjbv6{xdzsAT!G)txWdR@AFH)Ol@DzWw~nV$HJSlPo3_4 z>TU1B_|N|ygN#?P+GM?s+KNm!e#ig%PmXyhND9s=a=r8Mm-7wAf2z|MWKyFg&TehC z-5xzjF7ZgN(e2Eee^M;9W27yHS|1cWfD{PnkE)> z&&+JIIL7!db{2z-X6bS5g2j3feA4bG=HyOTaJ_biBT=o|N=iNrmBK#_@;CUw?Yf_%Gjz zK}NClgWxqzofmx)sgBRK2o#`T^s=m=Fe;V_Q5I{a7v)_}k(3&B?8er*}L$U827C zKhJaF4q3*37Jdve@+q2+WY51io*^eAc*EpL&W@wW7ux2v#TOlC3ugSM>Bk_mhvCrf zr$46s?=mo260h})KYNkxvkT8l)EPsBR2l!J8ZpScjoM=Uw^;f_l34fatg8adO!pR- zil(F{v3jUFGXBdhVUS^zU-Mh&y;t&FC;9&KdB1msM89EOm)~Q`oOfvlpsYEcy zOuc#ke7V~C$&R+PEG(z-Kx(*o6)IUl~)#?E21S5d zTYQh3H0487wEioWFvfpYrx;|mN%(y{`CsecKDp^-iQX=Y?{Du}|N7Ro6^wr+dKmv1 zaWTlq_i^6H?T;+K#=UMncZbub7ZNAZY+B~>&D?+HDC55b2?m+`@-CB%1&be^I=xJN z;g0@IJa3m7?C0#iwt{W)KgNIDaSSrjDiva4vrep(o|U!f?`j!-w%iTrcDAh-UN_y@ z%J|QQhe5_Jl-n)yF&obnMQIz|`<+v^-RWO#Z6vVh5}o^fhw9QxH<=mtiZ&bWN%v>`=edDFM!PlhnW9do zsCHc<@07bwW~fZbxq2tDXydWAy-bY%#Gf(9@V;cYw8~61tH~jQO=-(g&rADGFS9ts zki9ePhcn|pzc~yt?bGxUmYti+P|Bgn!ho`~Fn(?2U4uj0rhnsmLeh53a+pO<+dh@cASNq!C zF$PEbGym$pXZ**nz##Ljd234O=@rb?QtQvA%P3peZQijv_3?`rlbZI{GX9fFVUStz z^P-sVtFL;NxjQR8{aBt)S-Weg+uEaniJvYnXZ#m=hCxPx>#z8{I|)tO;tKj!#AsGM zeY)Oh;?B3>zGn9G82|CiV~~k|8a(+0cN)(LdoBNjK)vs$_CL&DU~5vdGQC}n@t>py zgG`}~r>R)qbV&)8?KzT_-|r}v$Gwid{D&dMNBalkzn}~T86F+|p#7o(?n~G67^iLc zv1;0#;O{H6kG4d2Fw9~6SAC8_M#9=7{k@#0<2~yphf>CW z24@&#T)&-I8I>yOyY!5j%%S`G?^JdwSS3!p>a;u0T88mo+zkeqPn(_2MT(p$yzd~; z_NqkYqf~c?!EK{wPc+Xw?qmEHyof>O&v%X8+K-p^Z$ExVPh+kT!)L+Ek9Y1fTmJv7 zhAQJfuQv=bjWXee7gsR9oa23HPjB?J;PdNO=)clF`1+}1$Y;iXfpQEoY?JQ({qsY$ zuJ8PIE|2=RUmtut!QL~;eWzr$L-jQ{j|7-VK2EG(8uT-|RHJ!`^|_DZ9UYO75T zJoRU2aV%fN_)l;RgG_VF$}{D6N>m`e0FVZ`ZUpx+o#OqpLd+`U$h8= z%$-?BnR#`-8c&oxrpcOImA|Q?u%*y^1*hx>O(({Gg@+hqCSLO1b70+?Nz2uZ_NPjQ zzw7eQFTC%&qFJncZyu_un}-?|e$p z+4s`gRuiMP*e}lvKau%4k@25%6@v^HgTy80l3$s%Y2Vi0y&U8GBj<(PhY=JNDY9rKN>eIGx2etX~YJN3fNI-5()jQ^xpG02oNT`zaa?hiWi zx%hE)b(im=@*+a}ou45^6+@O=G?s4bx>rDq4|5^4h$Xuvc$@^{E zq@;%@*^Q+3Je?WOv{^}|U;XTHc!CT57% z_Ihe}H5`c7SityC=pBPh$?x*$1=kNKyzSaoVeZh_arwsj_at`A^b0!9v z3rzlN&9?qHwaWa>9EWEo57fxbn9OMMoip;>|38fXWS%g{ggToyCOO#WeLE)Ko$&1Y zGpm;l-OCcV%KvW(d&u}t@eYGbRL$A8E23ur79=ur$+RwS3Q&!=@~9T+&m`7cu@5kztUzyk%YbkMOH;aqHt8 zss;R=*4~x*&#Zd=-0Z2kdW`?Lcjnt~NVcW&HikyMv7XnD;OM0LXbj0{{R3 From 822504e8a01b5e62cf72733ffb488b514a78c106 Mon Sep 17 00:00:00 2001 From: nxshock Date: Wed, 5 Apr 2023 21:17:38 +0500 Subject: [PATCH 26/28] Update deps --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 392f6ec..4dc95a4 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/nxshock/zkv go 1.19 require ( - github.com/klauspost/compress v1.15.12 - github.com/stretchr/testify v1.8.1 + github.com/klauspost/compress v1.16.4 + github.com/stretchr/testify v1.8.2 ) require ( diff --git a/go.sum b/go.sum index 8609118..abc35b1 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= -github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -10,8 +10,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 832d12c43bcbd80b0ec8975318eb67d4e36105c1 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 15 Apr 2023 22:33:37 +0500 Subject: [PATCH 27/28] Move record tests to separate file --- record_test.go | 35 +++++++++++++++++++++++++++++++++++ zkv_test.go | 28 ---------------------------- 2 files changed, 35 insertions(+), 28 deletions(-) create mode 100644 record_test.go diff --git a/record_test.go b/record_test.go new file mode 100644 index 0000000..1e69354 --- /dev/null +++ b/record_test.go @@ -0,0 +1,35 @@ +package zkv + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRecord(t *testing.T) { + buf := new(bytes.Buffer) + + var records []Record + + for i := 0; i < 10; i++ { + record, err := newRecord(RecordTypeSet, i, i) + assert.NoError(t, err) + + records = append(records, *record) + + b, err := record.Marshal() + assert.NoError(t, err) + + _, err = buf.Write(b) + assert.NoError(t, err) + } + + for i := 0; i < 10; i++ { + _, record, err := readRecord(buf) + assert.NoError(t, err) + + assert.Equal(t, record.KeyHash, records[i].KeyHash) + assert.Equal(t, record.ValueBytes, records[i].ValueBytes) + } +} diff --git a/zkv_test.go b/zkv_test.go index 4ec0d41..5229178 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,40 +1,12 @@ package zkv import ( - "bytes" "os" "testing" "github.com/stretchr/testify/assert" ) -func TestRecord(t *testing.T) { - buf := new(bytes.Buffer) - - var records []Record - - for i := 0; i < 10; i++ { - record, err := newRecord(RecordTypeSet, i, i) - assert.NoError(t, err) - - records = append(records, *record) - - b, err := record.Marshal() - assert.NoError(t, err) - - _, err = buf.Write(b) - assert.NoError(t, err) - } - - for i := 0; i < 10; i++ { - _, record, err := readRecord(buf) - assert.NoError(t, err) - - assert.Equal(t, record.KeyHash, records[i].KeyHash) - assert.Equal(t, record.ValueBytes, records[i].ValueBytes) - } -} - func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 From 583f60da9f9ffd00701a3ebbbbad8ef17f232a90 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 16 Apr 2023 10:32:58 +0500 Subject: [PATCH 28/28] Add RebuildIndex() func --- TestDeleteBasic.zkv.idx | Bin 0 -> 3431 bytes TestSmallWrites.zkv.idx | Bin 0 -> 3465 bytes testdata/TestReadBlock.zkv | Bin 0 -> 644 bytes utils.go | 12 +++ zkv.go | 188 ++++++++++++++++++++++++++++--------- zkv_test.go | 55 +++++++++++ 6 files changed, 209 insertions(+), 46 deletions(-) create mode 100644 TestDeleteBasic.zkv.idx create mode 100644 TestSmallWrites.zkv.idx create mode 100644 testdata/TestReadBlock.zkv diff --git a/TestDeleteBasic.zkv.idx b/TestDeleteBasic.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..b341db72544cb67bd4fd560f43e99a37bd67a577 GIT binary patch literal 3431 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr28MsUYX3VJl4Z6&+{_#CL)f|9W_`!go0pxu+Sl%mF*w?v`B(ov(?8xX3^KD^ z(#m8{{XW0+%hdMeT$Zb*ek{B>`_$>)r{4B1O#i$%7-ZDb7`A+`dOah({Qku840ex;`8n#G;ND3=vxt^ zS@rbkdZ&py--i2|+0SG8$Gwa}#`W8Yl~JjZzDv)j$sD?`|4wD6f>q+gt4_P~tYw(~ zh5uoYxxnPV)@)D1%yD>j@<5H;jLD29-#H`C{r|)CPihN;%**P>yqxaMGLuw; zzPsLvwo}RE=$Phpr=E4v){9L46qhi_s8-s!`K@77-?(_y<9p_X$wi&Kt(IGNcFjvz zE6((f$%8>=#m|dkzOTOOS?2Dn^z>tSK4tB$rEY7F1}1*GyqxJ@un&XG;-t)nDNI6v z>sBV5exW=0#pIsyL)l?7?c!Fx+sE`TPliF}?Tt60D;KLQ^Ejg8wlGu4_Q*YbFSk>N z?p(XxtHAV+WdVcCu`Qm9ek_*?{Oxf2=H%AK(>orWE>Yk6pXa%7hb+@S6CMVcmX?Ex zOx@RWFRYac6^Yr%Vy>xQA+hyIm|w4?8`D2VH3k{Q)(?W$ICWn1Nu)YH+agrBGW$)$ zgoIzL9S7!rXZq*2gF%M(CBvmvW~y0D4jF7pTb6oW+IM=H#VLmDonb$mnf`e^V30AF zkes49JHh|fsm~gE9)FP&T zi2)2UZ=<$Y|1Fk2ktEjrI_s(cGt<4rrJ^aRNvs~Kj!gfO*D%Ozy`$c!r*mT7RY%95 z*AeV#*JP)<9E;@Hc4=Q`4%5HF84NP>qbA<^*wu7VWZ&C;)!%mBmEq>G?%ce^|6Wjh zF4MoL00x;06)SnaO`DYT@FcsD)Sjm^<3LJrhl0&3^KXTqt&usX?Dr&UM1h`>Hbyn-*g*J$-F~d zOl?P){^@5h$S}&U`7QL`D|xPyeE<2p-@8Jh->|OB?=fZ0yR?JppZ^mEnYB}Xn6_Kq z;J*_jsnqX$^4Ybu>C;3%Zl5xbf8KGXe<4*2GOu3m{%69w<duXxvZdb!c( zg~k4wAAkO5`j`5PL8jz)dGvzo2Nd3R?W-_%XzaLrWBu}v1wjkHPdGV;>7UUp2AMzK zHFj%1UfRF?_#Hisxke111us9|xzB9*|FasZO#fVy7-al8{&xp$O5boY{qEE63>p8{ zit)aZiBOf+iv7Hj>0izl1{swrVy*mVi`*?XEO9Fqywdf&q2Q#3Tfg#O)AC%VfB7B^ zGO=5WKUo^aPM>!^rReN?X>F^CQCsYnXNI51{G7=2&n}BW=F?`UbCDuv3hz4zw7n{k z`6$)hVQ|~%*%Qq(kNcSZ`F1hLXqFz=E?BG=!6)s0VovUa1=nkLNUq_mk#qf#>%sId zW)p*qv`U4T*sK#PrDtVr`ny_&pDlMox}9z7h1X4Ywle)Q?_!W?pQe|v?A%<2QVtDo zi+)1`t-7$A`9&6SnmY|vF#XeLVUUrq_6Rwr|HI0-_{6g}ypR8`?n-|z=jnLQy2+uG z>7T9$gG`X=0x|Q#U4dIQUs!2fv@Q9T$nN8MV7XvR;Le3i{|t99$b4(wni6_?1#`93 z`m^aW$`*E;ckE7m{Nlx=roFXH|0H4n{xC${luC-+SLw zVmg88pYT2g8CRD1EBN-@vz#WN!^6U;6!|9X&;<|oCx1UqS*5`A&(?)OX7<6tVwuF% z{U*_~CLC$6H0r3f+VsFve})#v@l>bcnlg3)DjG#UMlG5wP>Vvw;5<#x+_ z%*Jy?QQAiLe&>{JcluXbo5@~T#Mdj$^v_0tK}NeZ^O>Sfr>J&aBJY&DPiCl0$+>zb zv1sG5w!KVD|HOnCWHOR+Og}wydHSi2`9{{hkDooiy>I!Qdf{fB&823hf07*xGC^-| z7=`?ZzY|?~M`?qe;-gcCH$QyAdWZ4pK1)HSe{ug9WbU}0yKZ$rS2DZV{O`k~3oiEL z{jio}VLQB;N%0)hKZ8C7nafIYGVwX{6|Q^h?Qz`h#-+INS$;)wvh=Z&zTcVt1!*zJ zT)8l_X~K<*yQ=?(r>TWH-*~w;$>gMVfdJF{NeJ~pWZY5OE+MUIc%`xXthAF;kw0j&n2gFGFIzqJZ*ZOwo}?QM4aiL z`#uJl{i^RzwCe2Ny7t$4ndgPS%%)0DTy1u?%J}=6cL$mNG3PPJyiVNhyiKpHbLGZ| zzvgXau1ac%&e{=nH74@VZ6Bt8svHb5Hji$vo3`mk-u~|DlE)hw)C1~2Kg!xrmD$0& zH;3t8${7Zk_xq0bDxWy9vEu<_P{W4b{r?237M|3Xbb5U6`3$Cij`tX30;i~ZFUkF( zw!81gt*3=&f3>JS?2VGV>=Y~C&CB$U^$df|#7o|L4y;=O(h4+28 z+)KL3*~IiuJ%T~znuVr#Q}ptbXZq*jz#wz^7UXG1{temj>({oAh(KAvFjndEXMK|*B<(?5kW2ASrTm1oNDl&Cz)cFk;OeWhr0 z?Ng%vOtYH`clN0={VT|0ka_gj=Be-xH%*3j_kEYTpL*sWe_Kud&=HC0FZR?j{o_z! zkO_4*Z%lHq&-->vzB}RB_h(iw9lDn#aFzex684bkpL_s=OjOO;w&VOe)|$5eshIp^ z0r!Q3>BhlbFHaZG$}DF3C-RIzX8o2QC#U$GYFv`KcUsoDP;0l+vw5GC&5s&zwl^{T zbDP8Ao_%e$i~UnMXEL3si?jOt zu9xYb8i!A`ds($ zClBWqXoNLn7t7uj+;q%z?w3rP(@g&|;uvI@+jE_?BwPD#+4^g|OPu_}F(=`v>L021 zm*1YWV*01Kh(V^fJon6k(+-bezVRz#RO#c$J7-Z!8IB(?k zN0wjXUN@h+!|Brti4$oyEpz#1?mu&s>0hK0gG~I>;K?tz(|AtUYxySx>U}@8|6%?D zTa%iV>Fsh%|HQ2rWF~x36)h-@`LgHdy~#g#`6{9tr{=Bpeqla6!_k&O#yscLT3w@) zm$_PVypCJ5PPZu`H(TE0yGwBIqva4|?+axVFmS)2B4{oTtk&QG$grvHrz zkZVtR%*XUEYZ-%#*wbkjG*%p8_PZRoEB(xhdW-LIlcs!#iq?O{62|n;EQvv;oauVG zQ+9vQna{Wm>ms!ac4uQ132 znP2%ev;Ih=`wis;^*8%h>nswy`qItPZ!Ys~38sHM=NM%Cle#~Yq)+~zDwAAv|CW1a z>gUR|CZB&75+jm>nEqM)W01)&?=s0)u=wGr)63Ks?&#md^LClRe$M`DE7&IgWBSKc z!XUFv!tdkB|5^w4$xSay^mbW%e|yjR*SD^%VEil5!}L$jf_kOG$viTWOZ#{%<-Vda-L>f_mh52 zX8M;@#~|~fSlL+OXWrczoBbB8U|s0-g!5Aw`~NRIJPk(HO#h@GG04bra*5U+@z@mH z^xNZId1GRRXl<{jc2~oJc#Q>2{{%}IWaLvcAIYA7aXdp#M(~Eolbjt#lP|Q*Yl|;B M&KAt{PtAw{00RXvpa1{> literal 0 HcmV?d00001 diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..6de70daa2dcb36824aeffb528ea00ae58a9cd592 GIT binary patch literal 3465 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr28MsU9{)QSQe;B!Xm+)4l|1=CW74HdR@e5$91nUd=V``uKk3(G#(&ug3^MDt z{5Uzq?^NTG)V z$nd^oxU|YlHLJ-XgH376QqN2KPA{`K#gM%-?1wYsKYup{8M{z!x6H?EJXaK@ZFKK< zPT6*+f3>xl?3G1)z0!>T95WbX9zC{sD*VGuli}Tc-=*%Sp83b$R+B$;L}L1jJ++Mg zIBqe>#6Jz5{DM1;=Y+kMe?p+%_fz{H<}a`{sacucF30#!(uqN4;wA4r2iC2bv|Qb2 zf2w5oyDks?!u!5k?j>F2Y-0SUb%sGk!rCL`oc<3hY<&{u*DLA9_>a+nL1yb6^+r9N6Z5V*ItIOtU{AXyJJsb_B+s@> z`!aJF|5f)e$jGN?K9W8E;&_IfjNlEECpkNgCSPcq*A`!NoGqB~pJoq(jCvZwmhV-s zXQY?kUs-jIS-#!8-tq9V?(?-(QnwlZ>2)#4C|$MqRiEn~{^a4@0*$bS>|)v5f}4(+ z&i#^UbDHsAVH1OlX6bS5g2j3feA4bG=HyOTaJ_bibxflh3~ki4n;`jQ<>T7-WLp-Y^RJ5q~GT@{ZC5J;g_- z4sU+=g7psL(|wkLjQ`Su7-U@gb{%`qQmSaK!apg#?77(bj!PGJv{ru!JQsh5@t^!8 z2ANqdX=SpfexG0ZWor9!F3VL@KNjAced=`YQ*V10#(x1F3^J+F5@)wI+is7ZB$s$3 z*XVZU%|9uY+A-1=U&4zS|Ecpa$b76SUwCB(;|A8n?h}?~TCA4uxpLT)MUG2)s`(1ky>yeAaj$5z z;huDV#(!RR3^K*#xffrypQ^uV^(pA1{%<*1kttf@PU{^HyBkkn{FgCCn9_ zfvf!gmavD6|CIO`WOAQJt7X5^?2_BPO1|0C{j21^={B5_d55}~+Kw>(GoHjCqgrX_ z=C_7TedFR)kMEfmCKq+`wpwo8*)=a=tvKU9rWytr9v%Il{h|WyOV{%lr)~JLYTBLP z?<=&AwnTR@%whajbB;mA=F#nS(>DFc+uvPX@_0jodO-c>M_C)HGCO$p<}m)tUBn<$ z^1D2G!Sw?QZ@czYm^(CfT)we>`Nx8wh2JNfoWuCfJcvQ2oauVGQ+9vQna{^hCr)ha zc)%Fcu;F+AKf$VnC-o(r9^ZRDgYlpH3I-W}j{n_3o6<7?mR5gdMuz;r`_B$0@5682>pfW02XPWVmM2 z&sfo{n4SA>uubObxgW%O-E^JNqv_HQ82`l@Fv#R3PKYjk={Cpi$m)Vu@|>OOx!DGS z(PeWq8U2ni{!_TdAd``lWBTct%hOMF%r~<3ef;eC?S0Gd)C)K3Y%Vo3{*yk!Ak*Bk z@=W=i5|u~Uu9@wuuM~~0eMRBgky~y}aWg3G_qhG}p>1U^>mhtzziD90U^jS%T;bX?}hs$4ode8W; zz=}bJPcrk)2?iF`1U-}dwEgiN0dicM*KGSZ`O2|fyo~>B*D%PK=bT!rYjpB5S8I;f zach>GCjN{w&ez^;|94o+m+_zeDFzv7l?pMjStnLX&&t~LceM;ZTkeK*JKNR^ubb{{ zW&CH8#UNuYAvr~Jc7p${Q=c{TJZfbUQ}~)D7Ie?dY_mAV_%E)ELB{pliIq{QlDhU|HW@%knt*3o2=JSTaoF;@AzN;$uTblNx?Zqu6I8E za=yX%PxT6eO#3vwgk|UEGL&*?cw6)v8fevp-OMkth|}C@u!8ZQju3+kqx_oRLhrqj z=Q_#vpU?ZfD$?0NQ|7!&I~f0kJ!6pBuloK(tIqzdYk#emd0zO-Y^wCc)n;d_ zjK9BmcaZTP^BxA7*#`@YWfE8Sn?%o=aHPG`sH56y(*sZa8Co367cu@5+`%9dWV%4i zyl_|GR?QbyS{H3gz9q8zxE@$8*b=yNA>%(YF$S4!5`G^~{?|IVPi}fyqPNT9``dfg zzrJ;C1>;|d9>#x09t<*XqqbQ8EtWo!B-Z^p>#6`V)4j!|qA96KtRAY4jQ?^n7-X2+ zbDgv#Tl;R=`fI#PoczNvC*i5;AF213-=4H${HN>1AT#ym{qyB&>nBhCx>!->0!!=8 z=uHb$W9EGLUK=}y@n3QqgA5mg#3kpFUzxRO-`3x~9OL{X>uUPnm;kxae;OVPGJGE#zi3aL)_&MHU$5jO z^ThW`s~%iQIxepG;nrftf37MFGGb4sUC>x@gxT+M;I8yDE9x!2$4#2@Au3w`6-yZ7 zKWhyJnadxS+_dv+S+c`@{?`)G=eu&8W=Py^eRWjB;EfdHKaXz=GQA#d5kGb3g$Zaa z-DS7>!t>4xs+NCdJo}S&$Vi0opFjzN4BMo;fB*bYt?N6#oy(*C?bin%Pq6n)a=DTq zp|XYXpRyl=jLH?UR{pa^?iL%CxD^Xt>3ZH!a8kpqU-_?Tc`oC>N)84Y#numk*En@v z^hu;TKHDNxxH9`q#Ds)jtQ`mDe`ov`x`07u#m|dkzOTOOS?2Dn^z>tSK4tB$rEY7F z1}1*Gyqxi0)Eowx`B4*Zee7zwD6;SEzUpr~@5*rVSa)vT;(sqFK9})dk`05*x8|)W zp{G|cS4*uwn=YelVYhk5?$pOGUQBA*Tg&)Qs*XYCj_bMWRtIz?vzyKTK0LbMVo%-= zYdIFS!<(5D&oTZpxx^q-Xya)r);C>Jf@OP-Waaleisf;yV=w<yJ~b%-_s$cy{tYjoggMj3(bXBhUT+!}w2@ zi$UhF!IGoZ0>Oss7S}zOoXW{qt*i00>3P~tY1a^O#(#cX3^H3EZsv{nA?)04v%cf$ z&C5<+?Q3_(7#!`-{Hy<-@gKhqgG_#Tmr2Hg#Sc%NUZ%cqNB<_Cx62InbM{|b!8Z9H z<3H{=2ARuBax(Eb^A)ap>+Nye?#89K@mYRFa|3xM+$jEYXiPj$R*c9CK z+v8n%V`7GAZLgge^b6g|FDCbtAIc7!X&1Nh-9E;D<%bvmQ0PA= literal 0 HcmV?d00001 diff --git a/testdata/TestReadBlock.zkv b/testdata/TestReadBlock.zkv new file mode 100644 index 0000000000000000000000000000000000000000..7916c5b25ea46be9f90ae34ec57b2de949cc318b GIT binary patch literal 644 zcmdPcs{faTVI~VhHvSvWMm6UP0lY$Vf^33z{t$V5>i=^%E-pR$nKq5>5*8R z!T7(0fsrdLF{d=usj?)sn30QtfkE0mSgW3NkMn*=N z{}cXyQ57vHjrsrO|DOLp|KIyR`Tq}IzKZDojsK_S{a@|<|AqPg=^2i;j4aG73``8F zSCbVX?(1Y>=!7_rffnvFfx55df6M=aicJ5z|6kAjf8qaHsZf!a{~KA%HT5eb{%`&N zB+T!BucRB;bu0`#*843XuA@)rsaD##`TbwRrv889|Hc1T{eS%bo_S$%(f>}~|E-q) zxBlPxzw7_Jg#T;B!R}*Y2;#6)gShV`3&TVjhMo;H^!BU%fB*l)|5ly<`~PqKzxMyH s|LbM`KQH|M%k2MD>Hib|uQvOCw#xYb_y2GHzx#g>>`D%XrF==j0L|{;qyPW_ literal 0 HcmV?d00001 diff --git a/utils.go b/utils.go index 97d63be..ae570b2 100644 --- a/utils.go +++ b/utils.go @@ -4,7 +4,9 @@ import ( "bytes" "crypto/sha256" "encoding/gob" + "errors" "io" + "os" ) func encode(value interface{}) ([]byte, error) { @@ -40,3 +42,13 @@ func skip(r io.Reader, count int64) (err error) { return err } + +func isFileExists(filePath string) (bool, error) { + if _, err := os.Stat(filePath); err == nil { + return true, nil + } else if errors.Is(err, os.ErrNotExist) { + return false, nil + } else { + return false, err + } +} diff --git a/zkv.go b/zkv.go index 1c4d543..8a2b7d6 100644 --- a/zkv.go +++ b/zkv.go @@ -37,7 +37,7 @@ type Store struct { func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() - database := &Store{ + store := &Store{ dataOffset: make(map[string]Offsets), bufferDataOffset: make(map[string]int64), buffer: new(bytes.Buffer), @@ -48,50 +48,30 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { if options.useIndexFile { idxFile, err := os.Open(filePath + indexFileExt) if err == nil { - err = gob.NewDecoder(idxFile).Decode(&database.dataOffset) - if err == nil { - return database, nil + err = gob.NewDecoder(idxFile).Decode(&store.dataOffset) + if err != nil { + return nil, err } + + return store, nil } } - // restore file data - readF, err := os.Open(filePath) - if os.IsNotExist(err) { - // Empty datebase - return database, nil - } else if err != nil { - return nil, fmt.Errorf("open file for indexing: %v", err) - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) + exists, err := isFileExists(filePath) if err != nil { - return nil, fmt.Errorf("decompressor initialization: %v", err) - } - defer decompressor.Close() - - offset := int64(0) - for { - n, record, err := readRecord(decompressor) - if err == io.EOF { - break - } - if err != nil { - return nil, fmt.Errorf("read record error: %v", err) - } - - switch record.Type { - case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset - case RecordTypeDelete: - delete(database.dataOffset, string(record.KeyHash[:])) - } - - offset += n + return nil, err } - return database, nil + if !exists { + return store, nil + } + + err = store.rebuildIndex() + if err != nil { + return nil, err + } + + return store, nil } func Open(filePath string) (*Store, error) { @@ -395,14 +375,7 @@ func (s *Store) flush() error { // Update index file only on data update if s.options.useIndexFile && l > 0 { - idxBuf := new(bytes.Buffer) - - err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) - if err != nil { - return err - } - - err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644) + err = s.saveIndex() if err != nil { return err } @@ -410,3 +383,126 @@ func (s *Store) flush() error { return nil } + +func readBlock(r *bufio.Reader) (line []byte, n int, err error) { + delim := []byte{0x28, 0xb5, 0x2f, 0xfd} + + line = make([]byte, len(delim)) + copy(line, delim) + + for { + s, err := r.ReadBytes(delim[len(delim)-1]) + line = append(line, []byte(s)...) + if err != nil { + if bytes.Equal(line, delim) { // contains only magic number + return []byte{}, 0, err + } else { + return line, len(s), err + } + } + + if bytes.Equal(line, append(delim, delim...)) { // first block + line = make([]byte, len(delim)) + copy(line, delim) + continue + } + + if bytes.HasSuffix(line, delim) { + return line[:len(line)-len(delim)], len(s), nil + } + } +} + +// RebuildIndex renews index from store file +func (s *Store) RebuildIndex() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.rebuildIndex() + if err != nil { + return err + } + + if s.options.useIndexFile { + return s.saveIndex() + } + + return nil +} + +func (s *Store) rebuildIndex() error { + f, err := os.Open(s.filePath) + if err != nil { + return err + } + defer f.Close() + + r := bufio.NewReader(f) + + var blockOffset int64 + + s.dataOffset = make(map[string]Offsets) + + for { + l, n, err := readBlock(r) + if err != nil { + if err != io.EOF { + return err + } else if err == io.EOF && len(l) == 0 { + break + } + } + + dec, err := zstd.NewReader(bytes.NewReader(l)) + + var recordOffset int64 + for { + n, record, err := readRecord(dec) + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + switch record.Type { + case RecordTypeSet: + s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset} + case RecordTypeDelete: + delete(s.dataOffset, string(record.KeyHash[:])) + } + recordOffset += n + } + + blockOffset += int64(n) + } + + idxBuf := new(bytes.Buffer) + + err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) + if err != nil { + return err + } + + err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644) + if err != nil { + return err + } + + return nil +} + +func (s *Store) saveIndex() error { + f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + + err = gob.NewEncoder(f).Encode(s.dataOffset) + if err != nil { + return err + } + + return f.Close() +} diff --git a/zkv_test.go b/zkv_test.go index 5229178..ec2d2de 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,6 +1,8 @@ package zkv import ( + "bufio" + "io" "os" "testing" @@ -355,3 +357,56 @@ func TestIndexFileBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } + +func TestReadBlock(t *testing.T) { + file, err := os.Open("testdata/TestReadBlock.zkv") + assert.NoError(t, err) + defer file.Close() + + r := bufio.NewReader(file) + + line, _, err := readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line) + + line, _, err = readBlock(r) + assert.Equal(t, line, []byte{}) + assert.Equal(t, io.EOF, err) +} + +func TestRebuildIndex(t *testing.T) { + const filePath = "TestRebuiltIndex.zkv" + const recordCount = 4 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + for i := 1; i <= recordCount; i++ { + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.Set(i, i) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + } + + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.RebuildIndex() + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } +}