diff --git a/TestBufferRead.zkv.idx b/TestBufferRead.zkv.idx new file mode 100644 index 0000000..ced4e78 Binary files /dev/null and b/TestBufferRead.zkv.idx differ diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx new file mode 100644 index 0000000..15a6d38 Binary files /dev/null and b/TestSmallWrites.zkv.idx differ diff --git a/defaults.go b/defaults.go index d263685..cb4a086 100644 --- a/defaults.go +++ b/defaults.go @@ -11,7 +11,7 @@ var defaultOptions = Options{ CompressionLevel: zstd.SpeedDefault, MemoryBufferSize: 4 * 1024 * 1024, DiskBufferSize: 1 * 1024 * 1024, - UseIndexFile: false, + useIndexFile: true, } const indexFileExt = ".idx" diff --git a/options.go b/options.go index f42f14f..148992b 100644 --- a/options.go +++ b/options.go @@ -16,10 +16,12 @@ type Options struct { DiskBufferSize int // Use index file - UseIndexFile bool + useIndexFile bool } func (o *Options) setDefaults() { + o.useIndexFile = true // TODO: implement database search without index + if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } diff --git a/zkv.go b/zkv.go index de1263d..1c4d543 100644 --- a/zkv.go +++ b/zkv.go @@ -14,11 +14,15 @@ import ( "github.com/klauspost/compress/zstd" ) +type Offsets struct { + BlockOffset int64 + RecordOffset int64 +} + type Store struct { - dataOffset map[string]int64 + dataOffset map[string]Offsets filePath string - offset int64 buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -34,15 +38,14 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() database := &Store{ - dataOffset: make(map[string]int64), + dataOffset: make(map[string]Offsets), bufferDataOffset: make(map[string]int64), - offset: 0, buffer: new(bytes.Buffer), filePath: filePath, options: options, readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - if options.UseIndexFile { + if options.useIndexFile { idxFile, err := os.Open(filePath + indexFileExt) if err == nil { err = gob.NewDecoder(idxFile).Decode(&database.dataOffset) @@ -80,7 +83,7 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { switch record.Type { case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset + database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset case RecordTypeDelete: delete(database.dataOffset, string(record.KeyHash[:])) } @@ -283,7 +286,7 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { return record.ValueBytes, nil } - offset, exists = s.dataOffset[string(keyHash[:])] + offsets, exists := s.dataOffset[string(keyHash[:])] if !exists { return nil, ErrNotExists } @@ -294,13 +297,18 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } defer readF.Close() + _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) + if err != nil { + return nil, err + } + decompressor, err := zstd.NewReader(readF) if err != nil { return nil, err } defer decompressor.Close() - err = skip(decompressor, offset) + err = skip(decompressor, offsets.RecordOffset) if err != nil { return nil, err } @@ -317,7 +325,6 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } return record.ValueBytes, nil - } func (s *Store) get(key, value interface{}) error { @@ -344,13 +351,18 @@ func (s *Store) flush() error { if err != nil { return fmt.Errorf("open store file: %v", err) } + stat, err := f.Stat() + if err != nil { + f.Close() + return fmt.Errorf("stat store file: %v", err) + } diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) if err != nil { f.Close() - return fmt.Errorf("open store file: %v", err) + return fmt.Errorf("init encoder: %v", err) } _, err = s.buffer.WriteTo(encoder) @@ -359,13 +371,11 @@ func (s *Store) flush() error { } for key, val := range s.bufferDataOffset { - s.dataOffset[key] = val + s.offset + s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} } s.bufferDataOffset = make(map[string]int64) - s.offset += l - err = encoder.Close() if err != nil { // TODO: truncate file to previous state @@ -384,7 +394,7 @@ func (s *Store) flush() error { } // Update index file only on data update - if s.options.UseIndexFile && l > 0 { + if s.options.useIndexFile && l > 0 { idxBuf := new(bytes.Buffer) err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) diff --git a/zkv_test.go b/zkv_test.go index b10a2a2..4ec0d41 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -39,6 +39,7 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -84,6 +85,7 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -119,6 +121,7 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -164,6 +167,7 @@ func TestDeleteBasic(t *testing.T) { func TestBufferBasic(t *testing.T) { const filePath = "TestBuffer.zkv" defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -187,8 +191,9 @@ func TestBufferBasic(t *testing.T) { func TestBufferRead(t *testing.T) { const filePath = "TestBufferRead.zkv" - const recordCount = 100 + const recordCount = 2 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -241,7 +246,9 @@ func TestBackupBasic(t *testing.T) { const newFilePath = "TestBackupBasic2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -280,7 +287,9 @@ func TestBackupWithDeletedRecords(t *testing.T) { const newFilePath = "TestBackupWithDeletedRecords2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -335,7 +344,7 @@ func TestIndexFileBasic(t *testing.T) { defer os.Remove(filePath) defer os.Remove(filePath + indexFileExt) - db, err := OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err := Open(filePath) assert.NoError(t, err) for i := 1; i <= recordCount; i++ { @@ -358,7 +367,7 @@ func TestIndexFileBasic(t *testing.T) { assert.NoError(t, err) // try to read - db, err = OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err = Open(filePath) assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount)