From 350634de38c1afd514292b89be1e8037da3e5cea Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:40:36 +0500 Subject: [PATCH 1/6] Add option for limit a number of parallel reads --- defaults.go | 5 +++++ options.go | 13 +++++++++++++ zkv.go | 32 ++++++++++++++++++++++++++------ 3 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 defaults.go create mode 100644 options.go diff --git a/defaults.go b/defaults.go new file mode 100644 index 0000000..2e3437f --- /dev/null +++ b/defaults.go @@ -0,0 +1,5 @@ +package zkv + +var defaultOptions = Options{ + MaxParallelReads: 64, +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..7105683 --- /dev/null +++ b/options.go @@ -0,0 +1,13 @@ +package zkv + +type Options struct { + MaxParallelReads uint +} + +func (o *Options) Validate() error { + if o.MaxParallelReads == 0 { + o.MaxParallelReads = defaultOptions.MaxParallelReads + } + + return nil +} diff --git a/zkv.go b/zkv.go index 4df780b..f12364c 100644 --- a/zkv.go +++ b/zkv.go @@ -18,6 +18,10 @@ type Database struct { filePath string offset int64 + options Options + + readOrderChan chan struct{} + mu sync.RWMutex } @@ -63,6 +67,9 @@ func (db *Database) Get(key, value interface{}) error { db.mu.RLock() defer db.mu.RUnlock() + db.readOrderChan <- struct{}{} + defer func() { <-db.readOrderChan }() + hashToFind, err := hashInterface(key) if err != nil { return err @@ -102,23 +109,31 @@ func (db *Database) Get(key, value interface{}) error { return decode(record.ValueBytes, value) } -func Open(filePath string) (*Database, error) { +func OpenWithOptions(filePath string, options Options) (*Database, error) { f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) } + if options.Validate() != nil { + return nil, err + } + compressor, err := zstd.NewWriter(f) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath} + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -130,6 +145,7 @@ func Open(filePath string) (*Database, error) { decompressor, err := zstd.NewReader(readF) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) } defer decompressor.Close() @@ -158,6 +174,10 @@ func Open(filePath string) (*Database, error) { return database, nil } +func Open(filePath string) (*Database, error) { + return OpenWithOptions(filePath, defaultOptions) +} + func (db *Database) Delete(key interface{}) error { db.mu.Lock() defer db.mu.Unlock() From 06a429ae4c1940093982360b4d9f86026bd831f8 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:41:11 +0500 Subject: [PATCH 2/6] Add some notes --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f250cd2..9b64c19 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ Simple key-value store for single-user applications. ## Pros -* Simple file structure -* Internal compression +* Simple one file structure +* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons From 8ab9e96ef662addb87522840fb1d2481bac3a754 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:55:42 +0500 Subject: [PATCH 3/6] Add compression level option --- defaults.go | 3 +++ options.go | 12 ++++++++++-- zkv.go | 6 ++---- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/defaults.go b/defaults.go index 2e3437f..959736c 100644 --- a/defaults.go +++ b/defaults.go @@ -1,5 +1,8 @@ package zkv +import "github.com/klauspost/compress/zstd" + var defaultOptions = Options{ MaxParallelReads: 64, + CompressionLevel: zstd.SpeedDefault, } diff --git a/options.go b/options.go index 7105683..dbbd27c 100644 --- a/options.go +++ b/options.go @@ -1,13 +1,21 @@ package zkv +import "github.com/klauspost/compress/zstd" + type Options struct { + // Maximum number of concurrent reads MaxParallelReads uint + + // Compression level + CompressionLevel zstd.EncoderLevel } -func (o *Options) Validate() error { +func (o *Options) setDefaults() { if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } - return nil + if o.CompressionLevel == 0 { + o.CompressionLevel = defaultOptions.CompressionLevel + } } diff --git a/zkv.go b/zkv.go index f12364c..1dbc52b 100644 --- a/zkv.go +++ b/zkv.go @@ -110,16 +110,14 @@ func (db *Database) Get(key, value interface{}) error { } func OpenWithOptions(filePath string, options Options) (*Database, error) { + options.setDefaults() + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { f.Close() return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) } - if options.Validate() != nil { - return nil, err - } - compressor, err := zstd.NewWriter(f) if err != nil { f.Close() From 8ef0fd240b740c355c13337a5d158bded0f37ee1 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 20:57:50 +0500 Subject: [PATCH 4/6] Update README.md --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 9b64c19..0d39f1e 100644 --- a/README.md +++ b/README.md @@ -31,12 +31,19 @@ err = db.Set(key, value) // key and value can be any of type // Read data var value ValueType -err = db.Get(key) +err = db.Get(key, &value) // Delete data err = db.Delete(key) ``` +Other methods: + +```go +// Flush data to disk +err = db.Flush() +``` + ## File structure Record is `encoding/gob` structure: From 56c9dcce00d88bc0bd78ebaa70e038036e0c567c Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 20:59:17 +0500 Subject: [PATCH 5/6] Implement write buffer --- defaults.go | 9 +- options.go | 5 +- zkv.go | 290 +++++++++++++++++++++++++++++++++------------------- zkv_test.go | 95 ++++++++++++++++- 4 files changed, 288 insertions(+), 111 deletions(-) diff --git a/defaults.go b/defaults.go index 959736c..ad476f8 100644 --- a/defaults.go +++ b/defaults.go @@ -1,8 +1,13 @@ package zkv -import "github.com/klauspost/compress/zstd" +import ( + "runtime" + + "github.com/klauspost/compress/zstd" +) var defaultOptions = Options{ - MaxParallelReads: 64, + MaxParallelReads: runtime.NumCPU(), CompressionLevel: zstd.SpeedDefault, + BufferSize: 4 * 1024 * 1024, } diff --git a/options.go b/options.go index dbbd27c..69b3c28 100644 --- a/options.go +++ b/options.go @@ -4,10 +4,13 @@ import "github.com/klauspost/compress/zstd" type Options struct { // Maximum number of concurrent reads - MaxParallelReads uint + MaxParallelReads int // Compression level CompressionLevel zstd.EncoderLevel + + // Write buffer size in bytes + BufferSize int } func (o *Options) setDefaults() { diff --git a/zkv.go b/zkv.go index 1dbc52b..7cc346d 100644 --- a/zkv.go +++ b/zkv.go @@ -11,12 +11,16 @@ import ( "github.com/klauspost/compress/zstd" ) -type Database struct { +type Store struct { dataOffset map[string]int64 - file *os.File - compressor *zstd.Encoder - filePath string - offset int64 + + file *os.File + filePath string + offset int64 + encoder *zstd.Encoder + + buffer *bytes.Buffer + bufferDataOffset map[string]int64 options Options @@ -25,91 +29,7 @@ type Database struct { mu sync.RWMutex } -func (db *Database) Close() error { - db.mu.Lock() - defer db.mu.Unlock() - - err := db.compressor.Close() - if err != nil { - return err - } - - return db.file.Close() -} - -func (db *Database) Set(key, value interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() - - record, err := newRecord(RecordTypeSet, key, value) - if err != nil { - return err - } - - b, err := record.Marshal() - if err != nil { - return err - } - - db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки - - _, err = db.compressor.Write(b) - if err != nil { - return err - } - - db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки - - return nil -} - -func (db *Database) Get(key, value interface{}) error { - db.mu.RLock() - defer db.mu.RUnlock() - - db.readOrderChan <- struct{}{} - defer func() { <-db.readOrderChan }() - - hashToFind, err := hashInterface(key) - if err != nil { - return err - } - - offset, exists := db.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(db.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) - if err != nil { - return err - } - - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку - } - - return decode(record.ValueBytes, value) -} - -func OpenWithOptions(filePath string, options Options) (*Database, error) { +func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) @@ -124,14 +44,16 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) { return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } - database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath, - options: options, - readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} + database := &Store{ + dataOffset: make(map[string]int64), + bufferDataOffset: make(map[string]int64), + offset: 0, + file: f, + encoder: compressor, + buffer: new(bytes.Buffer), + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -172,13 +94,27 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) { return database, nil } -func Open(filePath string) (*Database, error) { +func Open(filePath string) (*Store, error) { return OpenWithOptions(filePath, defaultOptions) } -func (db *Database) Delete(key interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() +func (s *Store) Set(key, value interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.set(key, value) +} + +func (s *Store) Get(key, value interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.get(key, value) +} + +func (s *Store) Delete(key interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() keyHash, err := hashInterface(key) if err != nil { @@ -195,14 +131,158 @@ func (db *Database) Delete(key interface{}) error { return err } - delete(db.dataOffset, string(record.KeyHash[:])) + delete(s.dataOffset, string(record.KeyHash[:])) + delete(s.bufferDataOffset, string(record.KeyHash[:])) - _, err = db.compressor.Write(b) + _, err = s.buffer.Write(b) if err != nil { return err } - db.offset += int64(len(b)) + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.flush() +} + +func (s *Store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + err = s.encoder.Close() + if err != nil { + return err + } + + return s.file.Close() +} + +func (s *Store) set(key, value interface{}) error { + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) get(key, value interface{}) error { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := s.bufferDataOffset[string(hashToFind[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err = skip(reader, offset) + if err != nil { + return err + } + + _, record, err := readRecord(reader) + if err != nil { + return err + } + + return decode(record.ValueBytes, value) + } + + offset, exists = s.dataOffset[string(hashToFind[:])] + if !exists { + return ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) +} + +func (s *Store) flush() error { + l := int64(s.buffer.Len()) + + _, err := s.buffer.WriteTo(s.encoder) + if err != nil { + return err + } + + for key, val := range s.bufferDataOffset { + s.dataOffset[key] = val + s.offset + } + + s.bufferDataOffset = make(map[string]int64) + + s.offset += l + + err = s.encoder.Flush() + if err != nil { + return err + } return nil } diff --git a/zkv_test.go b/zkv_test.go index 36f6aac..a497a3c 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -20,7 +20,16 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } err = db.Close() assert.NoError(t, err) @@ -91,12 +100,15 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount-1) + var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -110,6 +122,8 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.bufferDataOffset, 0) + value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -118,3 +132,78 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } + +func TestBufferBasic(t *testing.T) { + const filePath = "TestBuffer.zkv" + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + err = db.Set(1, make([]byte, 100)) + assert.NoError(t, err) + + assert.NotEqual(t, 0, db.dataOffset) + assert.Len(t, db.bufferDataOffset, 0) + assert.Equal(t, 0, db.buffer.Len()) + + var gotValue []byte + err = db.Get(1, &gotValue) + assert.NoError(t, err) + + assert.Equal(t, make([]byte, 100), gotValue) + + err = db.Close() + assert.NoError(t, err) +} + +func TestBufferRead(t *testing.T) { + const filePath = "TestBufferRead.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +} From aa1a2edda6ed00969769d91397c022cae71c6f23 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 21:03:27 +0500 Subject: [PATCH 6/6] Add TODOs --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 0d39f1e..0a85958 100644 --- a/README.md +++ b/README.md @@ -60,3 +60,9 @@ File is log stuctured list of commands: | -------| ------------------------ | -------- | | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | + +## TODO + +- [ ] Implement `Copy()` method to copy store without deleted records +- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) +- [ ] Implement optional separate index file to speedup store initialization