diff --git a/README.md b/README.md index 0a85958..f250cd2 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ Simple key-value store for single-user applications. ## Pros -* Simple one file structure -* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) +* Simple file structure +* Internal compression * Threadsafe operations through `sync.RWMutex` ## Cons @@ -31,19 +31,12 @@ err = db.Set(key, value) // key and value can be any of type // Read data var value ValueType -err = db.Get(key, &value) +err = db.Get(key) // Delete data err = db.Delete(key) ``` -Other methods: - -```go -// Flush data to disk -err = db.Flush() -``` - ## File structure Record is `encoding/gob` structure: @@ -60,9 +53,3 @@ File is log stuctured list of commands: | -------| ------------------------ | -------- | | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | - -## TODO - -- [ ] Implement `Copy()` method to copy store without deleted records -- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) -- [ ] Implement optional separate index file to speedup store initialization diff --git a/defaults.go b/defaults.go deleted file mode 100644 index ad476f8..0000000 --- a/defaults.go +++ /dev/null @@ -1,13 +0,0 @@ -package zkv - -import ( - "runtime" - - "github.com/klauspost/compress/zstd" -) - -var defaultOptions = Options{ - MaxParallelReads: runtime.NumCPU(), - CompressionLevel: zstd.SpeedDefault, - BufferSize: 4 * 1024 * 1024, -} diff --git a/options.go b/options.go deleted file mode 100644 index 69b3c28..0000000 --- a/options.go +++ /dev/null @@ -1,24 +0,0 @@ -package zkv - -import "github.com/klauspost/compress/zstd" - -type Options struct { - // Maximum number of concurrent reads - MaxParallelReads int - - // Compression level - CompressionLevel zstd.EncoderLevel - - // Write buffer size in bytes - BufferSize int -} - -func (o *Options) setDefaults() { - if o.MaxParallelReads == 0 { - o.MaxParallelReads = defaultOptions.MaxParallelReads - } - - if o.CompressionLevel == 0 { - o.CompressionLevel = defaultOptions.CompressionLevel - } -} diff --git a/zkv.go b/zkv.go index 7cc346d..4df780b 100644 --- a/zkv.go +++ b/zkv.go @@ -11,170 +11,32 @@ import ( "github.com/klauspost/compress/zstd" ) -type Store struct { +type Database struct { dataOffset map[string]int64 - - file *os.File - filePath string - offset int64 - encoder *zstd.Encoder - - buffer *bytes.Buffer - bufferDataOffset map[string]int64 - - options Options - - readOrderChan chan struct{} + file *os.File + compressor *zstd.Encoder + filePath string + offset int64 mu sync.RWMutex } -func OpenWithOptions(filePath string, options Options) (*Store, error) { - options.setDefaults() +func (db *Database) Close() error { + db.mu.Lock() + defer db.mu.Unlock() - f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) - } - - compressor, err := zstd.NewWriter(f) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) - } - - database := &Store{ - dataOffset: make(map[string]int64), - bufferDataOffset: make(map[string]int64), - offset: 0, - file: f, - encoder: compressor, - buffer: new(bytes.Buffer), - filePath: filePath, - options: options, - readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - - // restore file data - readF, err := os.Open(filePath) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) - } - defer decompressor.Close() - - offset := int64(0) - for { - n, record, err := readRecord(decompressor) - if err == io.EOF { - break - } - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) - } - - switch record.Type { - case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset - case RecordTypeDelete: - delete(database.dataOffset, string(record.KeyHash[:])) - } - - offset += n - } - - return database, nil -} - -func Open(filePath string) (*Store, error) { - return OpenWithOptions(filePath, defaultOptions) -} - -func (s *Store) Set(key, value interface{}) error { - s.mu.Lock() - defer s.mu.Unlock() - - return s.set(key, value) -} - -func (s *Store) Get(key, value interface{}) error { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.get(key, value) -} - -func (s *Store) Delete(key interface{}) error { - s.mu.Lock() - defer s.mu.Unlock() - - keyHash, err := hashInterface(key) + err := db.compressor.Close() if err != nil { return err } - record := &Record{ - Type: RecordTypeDelete, - KeyHash: keyHash, - } - - b, err := record.Marshal() - if err != nil { - return err - } - - delete(s.dataOffset, string(record.KeyHash[:])) - delete(s.bufferDataOffset, string(record.KeyHash[:])) - - _, err = s.buffer.Write(b) - if err != nil { - return err - } - - if s.buffer.Len() > s.options.BufferSize { - err = s.flush() - - if err != nil { - return err - } - } - - return nil + return db.file.Close() } -func (s *Store) Flush() error { - s.mu.Lock() - defer s.mu.Unlock() +func (db *Database) Set(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() - return s.flush() -} - -func (s *Store) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - - err := s.flush() - if err != nil { - return err - } - - err = s.encoder.Close() - if err != nil { - return err - } - - return s.file.Close() -} - -func (s *Store) set(key, value interface{}) error { record, err := newRecord(RecordTypeSet, key, value) if err != nil { return err @@ -185,56 +47,33 @@ func (s *Store) set(key, value interface{}) error { return err } - s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки - _, err = s.buffer.Write(b) + _, err = db.compressor.Write(b) if err != nil { return err } - if s.buffer.Len() > s.options.BufferSize { - err = s.flush() - - if err != nil { - return err - } - } + db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки return nil } -func (s *Store) get(key, value interface{}) error { - s.readOrderChan <- struct{}{} - defer func() { <-s.readOrderChan }() +func (db *Database) Get(key, value interface{}) error { + db.mu.RLock() + defer db.mu.RUnlock() hashToFind, err := hashInterface(key) if err != nil { return err } - offset, exists := s.bufferDataOffset[string(hashToFind[:])] - if exists { - reader := bytes.NewReader(s.buffer.Bytes()) - - err = skip(reader, offset) - if err != nil { - return err - } - - _, record, err := readRecord(reader) - if err != nil { - return err - } - - return decode(record.ValueBytes, value) - } - - offset, exists = s.dataOffset[string(hashToFind[:])] + offset, exists := db.dataOffset[string(hashToFind[:])] if !exists { return ErrNotExists } - readF, err := os.Open(s.filePath) + readF, err := os.Open(db.filePath) if err != nil { return err } @@ -263,26 +102,89 @@ func (s *Store) get(key, value interface{}) error { return decode(record.ValueBytes, value) } -func (s *Store) flush() error { - l := int64(s.buffer.Len()) +func Open(filePath string) (*Database, error) { + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) + } - _, err := s.buffer.WriteTo(s.encoder) + compressor, err := zstd.NewWriter(f) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) + } + + database := &Database{ + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath} + + // restore file data + readF, err := os.Open(filePath) + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) + } + defer decompressor.Close() + + offset := int64(0) + for { + n, record, err := readRecord(decompressor) + if err == io.EOF { + break + } + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) + } + + switch record.Type { + case RecordTypeSet: + database.dataOffset[string(record.KeyHash[:])] = offset + case RecordTypeDelete: + delete(database.dataOffset, string(record.KeyHash[:])) + } + + offset += n + } + + return database, nil +} + +func (db *Database) Delete(key interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + keyHash, err := hashInterface(key) if err != nil { return err } - for key, val := range s.bufferDataOffset { - s.dataOffset[key] = val + s.offset + record := &Record{ + Type: RecordTypeDelete, + KeyHash: keyHash, } - s.bufferDataOffset = make(map[string]int64) - - s.offset += l - - err = s.encoder.Flush() + b, err := record.Marshal() if err != nil { return err } + delete(db.dataOffset, string(record.KeyHash[:])) + + _, err = db.compressor.Write(b) + if err != nil { + return err + } + + db.offset += int64(len(b)) + return nil } diff --git a/zkv_test.go b/zkv_test.go index a497a3c..36f6aac 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -20,16 +20,7 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } + assert.Len(t, db.dataOffset, recordCount) err = db.Close() assert.NoError(t, err) @@ -100,15 +91,12 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount) + assert.Len(t, db.dataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount-1) - + assert.Len(t, db.dataOffset, recordCount-1) var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -122,8 +110,6 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) - assert.Len(t, db.bufferDataOffset, 0) - value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -132,78 +118,3 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } - -func TestBufferBasic(t *testing.T) { - const filePath = "TestBuffer.zkv" - defer os.Remove(filePath) - - db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) - assert.NoError(t, err) - - err = db.Set(1, make([]byte, 100)) - assert.NoError(t, err) - - assert.NotEqual(t, 0, db.dataOffset) - assert.Len(t, db.bufferDataOffset, 0) - assert.Equal(t, 0, db.buffer.Len()) - - var gotValue []byte - err = db.Get(1, &gotValue) - assert.NoError(t, err) - - assert.Equal(t, make([]byte, 100), gotValue) - - err = db.Close() - assert.NoError(t, err) -} - -func TestBufferRead(t *testing.T) { - const filePath = "TestBufferRead.zkv" - const recordCount = 100 - defer os.Remove(filePath) - - db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - err = db.Set(i, i) - assert.NoError(t, err) - } - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) - - // try to read - db, err = Open(filePath) - assert.NoError(t, err) - - assert.Len(t, db.dataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) - -}