diff --git a/README.md b/README.md index f250cd2..0a85958 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ Simple key-value store for single-user applications. ## Pros -* Simple file structure -* Internal compression +* Simple one file structure +* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons @@ -31,12 +31,19 @@ err = db.Set(key, value) // key and value can be any of type // Read data var value ValueType -err = db.Get(key) +err = db.Get(key, &value) // Delete data err = db.Delete(key) ``` +Other methods: + +```go +// Flush data to disk +err = db.Flush() +``` + ## File structure Record is `encoding/gob` structure: @@ -53,3 +60,9 @@ File is log stuctured list of commands: | -------| ------------------------ | -------- | | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | + +## TODO + +- [ ] Implement `Copy()` method to copy store without deleted records +- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) +- [ ] Implement optional separate index file to speedup store initialization diff --git a/defaults.go b/defaults.go new file mode 100644 index 0000000..ad476f8 --- /dev/null +++ b/defaults.go @@ -0,0 +1,13 @@ +package zkv + +import ( + "runtime" + + "github.com/klauspost/compress/zstd" +) + +var defaultOptions = Options{ + MaxParallelReads: runtime.NumCPU(), + CompressionLevel: zstd.SpeedDefault, + BufferSize: 4 * 1024 * 1024, +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..69b3c28 --- /dev/null +++ b/options.go @@ -0,0 +1,24 @@ +package zkv + +import "github.com/klauspost/compress/zstd" + +type Options struct { + // Maximum number of concurrent reads + MaxParallelReads int + + // Compression level + CompressionLevel zstd.EncoderLevel + + // Write buffer size in bytes + BufferSize int +} + +func (o *Options) setDefaults() { + if o.MaxParallelReads == 0 { + o.MaxParallelReads = defaultOptions.MaxParallelReads + } + + if o.CompressionLevel == 0 { + o.CompressionLevel = defaultOptions.CompressionLevel + } +} diff --git a/zkv.go b/zkv.go index 4df780b..7cc346d 100644 --- a/zkv.go +++ b/zkv.go @@ -11,114 +11,49 @@ import ( "github.com/klauspost/compress/zstd" ) -type Database struct { +type Store struct { dataOffset map[string]int64 - file *os.File - compressor *zstd.Encoder - filePath string - offset int64 + + file *os.File + filePath string + offset int64 + encoder *zstd.Encoder + + buffer *bytes.Buffer + bufferDataOffset map[string]int64 + + options Options + + readOrderChan chan struct{} mu sync.RWMutex } -func (db *Database) Close() error { - db.mu.Lock() - defer db.mu.Unlock() +func OpenWithOptions(filePath string, options Options) (*Store, error) { + options.setDefaults() - err := db.compressor.Close() - if err != nil { - return err - } - - return db.file.Close() -} - -func (db *Database) Set(key, value interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() - - record, err := newRecord(RecordTypeSet, key, value) - if err != nil { - return err - } - - b, err := record.Marshal() - if err != nil { - return err - } - - db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки - - _, err = db.compressor.Write(b) - if err != nil { - return err - } - - db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки - - return nil -} - -func (db *Database) Get(key, value interface{}) error { - db.mu.RLock() - defer db.mu.RUnlock() - - hashToFind, err := hashInterface(key) - if err != nil { - return err - } - - offset, exists := db.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(db.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) - if err != nil { - return err - } - - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку - } - - return decode(record.ValueBytes, value) -} - -func Open(filePath string) (*Database, error) { f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) } compressor, err := zstd.NewWriter(f) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } - database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath} + database := &Store{ + dataOffset: make(map[string]int64), + bufferDataOffset: make(map[string]int64), + offset: 0, + file: f, + encoder: compressor, + buffer: new(bytes.Buffer), + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -130,6 +65,7 @@ func Open(filePath string) (*Database, error) { decompressor, err := zstd.NewReader(readF) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) } defer decompressor.Close() @@ -158,9 +94,27 @@ func Open(filePath string) (*Database, error) { return database, nil } -func (db *Database) Delete(key interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() +func Open(filePath string) (*Store, error) { + return OpenWithOptions(filePath, defaultOptions) +} + +func (s *Store) Set(key, value interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.set(key, value) +} + +func (s *Store) Get(key, value interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.get(key, value) +} + +func (s *Store) Delete(key interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() keyHash, err := hashInterface(key) if err != nil { @@ -177,14 +131,158 @@ func (db *Database) Delete(key interface{}) error { return err } - delete(db.dataOffset, string(record.KeyHash[:])) + delete(s.dataOffset, string(record.KeyHash[:])) + delete(s.bufferDataOffset, string(record.KeyHash[:])) - _, err = db.compressor.Write(b) + _, err = s.buffer.Write(b) if err != nil { return err } - db.offset += int64(len(b)) + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.flush() +} + +func (s *Store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + err = s.encoder.Close() + if err != nil { + return err + } + + return s.file.Close() +} + +func (s *Store) set(key, value interface{}) error { + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) get(key, value interface{}) error { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := s.bufferDataOffset[string(hashToFind[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err = skip(reader, offset) + if err != nil { + return err + } + + _, record, err := readRecord(reader) + if err != nil { + return err + } + + return decode(record.ValueBytes, value) + } + + offset, exists = s.dataOffset[string(hashToFind[:])] + if !exists { + return ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) +} + +func (s *Store) flush() error { + l := int64(s.buffer.Len()) + + _, err := s.buffer.WriteTo(s.encoder) + if err != nil { + return err + } + + for key, val := range s.bufferDataOffset { + s.dataOffset[key] = val + s.offset + } + + s.bufferDataOffset = make(map[string]int64) + + s.offset += l + + err = s.encoder.Flush() + if err != nil { + return err + } return nil } diff --git a/zkv_test.go b/zkv_test.go index 36f6aac..a497a3c 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -20,7 +20,16 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } err = db.Close() assert.NoError(t, err) @@ -91,12 +100,15 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount-1) + var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -110,6 +122,8 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.bufferDataOffset, 0) + value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -118,3 +132,78 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } + +func TestBufferBasic(t *testing.T) { + const filePath = "TestBuffer.zkv" + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + err = db.Set(1, make([]byte, 100)) + assert.NoError(t, err) + + assert.NotEqual(t, 0, db.dataOffset) + assert.Len(t, db.bufferDataOffset, 0) + assert.Equal(t, 0, db.buffer.Len()) + + var gotValue []byte + err = db.Get(1, &gotValue) + assert.NoError(t, err) + + assert.Equal(t, make([]byte, 100), gotValue) + + err = db.Close() + assert.NoError(t, err) +} + +func TestBufferRead(t *testing.T) { + const filePath = "TestBufferRead.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +}