diff --git a/README.md b/README.md index 0d39f1e..4474fec 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,15 @@ Simple key-value store for single-user applications. ## Pros -* Simple one file structure +* Simple two file structure (data file and index file) * Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons * Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) -* Need to read the whole file on store open to create file index +* No transaction system +* Index file is fully rewrited on every store commit * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations @@ -20,7 +21,7 @@ Simple key-value store for single-user applications. Create or open existing file: ```go -db, err := Open("path to file") +db, err := zkv.Open("path to file") ``` Data operations: @@ -42,6 +43,28 @@ Other methods: ```go // Flush data to disk err = db.Flush() + +// Backup data to another file +err = db.Backup("new/file/path") +``` + +## Store options + +```go +type Options struct { + // Maximum number of concurrent reads + MaxParallelReads int + + // Compression level + CompressionLevel zstd.EncoderLevel + + // Memory write buffer size in bytes + MemoryBufferSize int + + // Disk write buffer size in bytes + DiskBufferSize int +} + ``` ## File structure @@ -60,3 +83,26 @@ File is log stuctured list of commands: | -------| ------------------------ | -------- | | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | + +Index file is simple gob-encoded map: + +```go +map[string]struct { + BlockOffset int64 + RecordOffset int64 +} +``` + +where map key is data key hash and value - data offset in data file. + +## Resource consumption + +Store requirements: + +* around 300 Mb of RAM per 1 million of keys +* around 34 Mb of disk space for index file per 1 million of keys + +## TODO + +- [ ] Add recovery previous state of store file on write error +- [ ] Add method for index rebuild diff --git a/TestDeleteBasic.zkv.idx b/TestDeleteBasic.zkv.idx new file mode 100644 index 0000000..b341db7 Binary files /dev/null and b/TestDeleteBasic.zkv.idx differ diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx new file mode 100644 index 0000000..6de70da Binary files /dev/null and b/TestSmallWrites.zkv.idx differ diff --git a/defaults.go b/defaults.go index ad476f8..cb4a086 100644 --- a/defaults.go +++ b/defaults.go @@ -9,5 +9,9 @@ import ( var defaultOptions = Options{ MaxParallelReads: runtime.NumCPU(), CompressionLevel: zstd.SpeedDefault, - BufferSize: 4 * 1024 * 1024, + MemoryBufferSize: 4 * 1024 * 1024, + DiskBufferSize: 1 * 1024 * 1024, + useIndexFile: true, } + +const indexFileExt = ".idx" diff --git a/go.mod b/go.mod index 392f6ec..4dc95a4 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/nxshock/zkv go 1.19 require ( - github.com/klauspost/compress v1.15.12 - github.com/stretchr/testify v1.8.1 + github.com/klauspost/compress v1.16.4 + github.com/stretchr/testify v1.8.2 ) require ( diff --git a/go.sum b/go.sum index 8609118..abc35b1 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= -github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -10,8 +10,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go index 69b3c28..148992b 100644 --- a/options.go +++ b/options.go @@ -9,11 +9,19 @@ type Options struct { // Compression level CompressionLevel zstd.EncoderLevel - // Write buffer size in bytes - BufferSize int + // Memory write buffer size in bytes + MemoryBufferSize int + + // Disk write buffer size in bytes + DiskBufferSize int + + // Use index file + useIndexFile bool } func (o *Options) setDefaults() { + o.useIndexFile = true // TODO: implement database search without index + if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } @@ -21,4 +29,12 @@ func (o *Options) setDefaults() { if o.CompressionLevel == 0 { o.CompressionLevel = defaultOptions.CompressionLevel } + + if o.MemoryBufferSize == 0 { + o.MemoryBufferSize = defaultOptions.MemoryBufferSize + } + + if o.DiskBufferSize == 0 { + o.DiskBufferSize = defaultOptions.DiskBufferSize + } } diff --git a/record.go b/record.go index 85a8c2b..0c3c941 100644 --- a/record.go +++ b/record.go @@ -2,6 +2,7 @@ package zkv import ( "bytes" + "crypto/sha256" "encoding/binary" "encoding/gob" "io" @@ -20,8 +21,17 @@ type Record struct { ValueBytes []byte } +func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) { + record := &Record{ + Type: recordType, + KeyHash: keyHash, + ValueBytes: valueBytes} + + return record, nil +} + func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { - keyBytes, err := encode(key) + keyHash, err := hashInterface(key) if err != nil { return nil, err } @@ -31,12 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { return nil, err } - record := &Record{ - Type: recordType, - KeyHash: hashBytes(keyBytes), - ValueBytes: valueBytes} - - return record, nil + return newRecordBytes(recordType, keyHash, valueBytes) } func (r *Record) Marshal() ([]byte, error) { diff --git a/record_test.go b/record_test.go new file mode 100644 index 0000000..1e69354 --- /dev/null +++ b/record_test.go @@ -0,0 +1,35 @@ +package zkv + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRecord(t *testing.T) { + buf := new(bytes.Buffer) + + var records []Record + + for i := 0; i < 10; i++ { + record, err := newRecord(RecordTypeSet, i, i) + assert.NoError(t, err) + + records = append(records, *record) + + b, err := record.Marshal() + assert.NoError(t, err) + + _, err = buf.Write(b) + assert.NoError(t, err) + } + + for i := 0; i < 10; i++ { + _, record, err := readRecord(buf) + assert.NoError(t, err) + + assert.Equal(t, record.KeyHash, records[i].KeyHash) + assert.Equal(t, record.ValueBytes, records[i].ValueBytes) + } +} diff --git a/testdata/TestReadBlock.zkv b/testdata/TestReadBlock.zkv new file mode 100644 index 0000000..7916c5b Binary files /dev/null and b/testdata/TestReadBlock.zkv differ diff --git a/utils.go b/utils.go index 97d63be..ae570b2 100644 --- a/utils.go +++ b/utils.go @@ -4,7 +4,9 @@ import ( "bytes" "crypto/sha256" "encoding/gob" + "errors" "io" + "os" ) func encode(value interface{}) ([]byte, error) { @@ -40,3 +42,13 @@ func skip(r io.Reader, count int64) (err error) { return err } + +func isFileExists(filePath string) (bool, error) { + if _, err := os.Stat(filePath); err == nil { + return true, nil + } else if errors.Is(err, os.ErrNotExist) { + return false, nil + } else { + return false, err + } +} diff --git a/zkv.go b/zkv.go index 7cc346d..8a2b7d6 100644 --- a/zkv.go +++ b/zkv.go @@ -1,8 +1,11 @@ package zkv import ( + "bufio" "bytes" + "crypto/sha256" "encoding/base64" + "encoding/gob" "fmt" "io" "os" @@ -11,13 +14,15 @@ import ( "github.com/klauspost/compress/zstd" ) -type Store struct { - dataOffset map[string]int64 +type Offsets struct { + BlockOffset int64 + RecordOffset int64 +} + +type Store struct { + dataOffset map[string]Offsets - file *os.File filePath string - offset int64 - encoder *zstd.Encoder buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -32,70 +37,46 @@ type Store struct { func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() - f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) - } - - compressor, err := zstd.NewWriter(f) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) - } - - database := &Store{ - dataOffset: make(map[string]int64), + store := &Store{ + dataOffset: make(map[string]Offsets), bufferDataOffset: make(map[string]int64), - offset: 0, - file: f, - encoder: compressor, buffer: new(bytes.Buffer), filePath: filePath, options: options, readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - // restore file data - readF, err := os.Open(filePath) + if options.useIndexFile { + idxFile, err := os.Open(filePath + indexFileExt) + if err == nil { + err = gob.NewDecoder(idxFile).Decode(&store.dataOffset) + if err != nil { + return nil, err + } + + return store, nil + } + } + + exists, err := isFileExists(filePath) if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) + return nil, err } - defer readF.Close() - decompressor, err := zstd.NewReader(readF) + if !exists { + return store, nil + } + + err = store.rebuildIndex() if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) - } - defer decompressor.Close() - - offset := int64(0) - for { - n, record, err := readRecord(decompressor) - if err == io.EOF { - break - } - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) - } - - switch record.Type { - case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset - case RecordTypeDelete: - delete(database.dataOffset, string(record.KeyHash[:])) - } - - offset += n + return nil, err } - return database, nil + return store, nil } func Open(filePath string) (*Store, error) { - return OpenWithOptions(filePath, defaultOptions) + options := defaultOptions + return OpenWithOptions(filePath, options) } func (s *Store) Set(key, value interface{}) error { @@ -139,7 +120,7 @@ func (s *Store) Delete(key interface{}) error { return err } - if s.buffer.Len() > s.options.BufferSize { + if s.buffer.Len() > s.options.MemoryBufferSize { err = s.flush() if err != nil { @@ -157,6 +138,43 @@ func (s *Store) Flush() error { return s.flush() } +func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + newStore, err := OpenWithOptions(filePath, newFileOptions) + if err != nil { + return err + } + + for keyHashStr := range s.dataOffset { + var keyHash [sha256.Size224]byte + copy(keyHash[:], keyHashStr) + + valueBytes, err := s.getGobBytes(keyHash) + if err != nil { + newStore.Close() + return err + } + err = newStore.setBytes(keyHash, valueBytes) + if err != nil { + newStore.Close() + return err + } + } + + return newStore.Close() +} + +func (s *Store) Backup(filePath string) error { + return s.BackupWithOptions(filePath, defaultOptions) +} + func (s *Store) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -166,12 +184,36 @@ func (s *Store) Close() error { return err } - err = s.encoder.Close() + return nil +} + +func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { + record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes) if err != nil { return err } - return s.file.Close() + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.MemoryBufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil } func (s *Store) set(key, value interface{}) error { @@ -192,7 +234,7 @@ func (s *Store) set(key, value interface{}) error { return err } - if s.buffer.Len() > s.options.BufferSize { + if s.buffer.Len() > s.options.MemoryBufferSize { err = s.flush() if err != nil { @@ -203,6 +245,68 @@ func (s *Store) set(key, value interface{}) error { return nil } +func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + offset, exists := s.bufferDataOffset[string(keyHash[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err := skip(reader, offset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(reader) + if err != nil { + return nil, err + } + + return record.ValueBytes, nil + } + + offsets, exists := s.dataOffset[string(keyHash[:])] + if !exists { + return nil, ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return nil, err + } + defer readF.Close() + + _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) + if err != nil { + return nil, err + } + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, err + } + defer decompressor.Close() + + err = skip(decompressor, offsets.RecordOffset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return nil, err + } + + if !bytes.Equal(record.KeyHash[:], keyHash[:]) { + expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:]) + gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) + return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) + } + + return record.ValueBytes, nil +} + func (s *Store) get(key, value interface{}) error { s.readOrderChan <- struct{}{} defer func() { <-s.readOrderChan }() @@ -212,77 +316,193 @@ func (s *Store) get(key, value interface{}) error { return err } - offset, exists := s.bufferDataOffset[string(hashToFind[:])] - if exists { - reader := bytes.NewReader(s.buffer.Bytes()) - - err = skip(reader, offset) - if err != nil { - return err - } - - _, record, err := readRecord(reader) - if err != nil { - return err - } - - return decode(record.ValueBytes, value) - } - - offset, exists = s.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(s.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) + b, err := s.getGobBytes(hashToFind) if err != nil { return err } - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку - } - - return decode(record.ValueBytes, value) + return decode(b, value) } func (s *Store) flush() error { l := int64(s.buffer.Len()) - _, err := s.buffer.WriteTo(s.encoder) + f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open store file: %v", err) + } + stat, err := f.Stat() + if err != nil { + f.Close() + return fmt.Errorf("stat store file: %v", err) + } + + diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) + + encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) + if err != nil { + f.Close() + return fmt.Errorf("init encoder: %v", err) + } + + _, err = s.buffer.WriteTo(encoder) if err != nil { return err } for key, val := range s.bufferDataOffset { - s.dataOffset[key] = val + s.offset + s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} } s.bufferDataOffset = make(map[string]int64) - s.offset += l + err = encoder.Close() + if err != nil { + // TODO: truncate file to previous state + return err + } - err = s.encoder.Flush() + err = diskWriteBuffer.Flush() + if err != nil { + // TODO: truncate file to previous state + return err + } + + err = f.Close() + if err != nil { + return err + } + + // Update index file only on data update + if s.options.useIndexFile && l > 0 { + err = s.saveIndex() + if err != nil { + return err + } + } + + return nil +} + +func readBlock(r *bufio.Reader) (line []byte, n int, err error) { + delim := []byte{0x28, 0xb5, 0x2f, 0xfd} + + line = make([]byte, len(delim)) + copy(line, delim) + + for { + s, err := r.ReadBytes(delim[len(delim)-1]) + line = append(line, []byte(s)...) + if err != nil { + if bytes.Equal(line, delim) { // contains only magic number + return []byte{}, 0, err + } else { + return line, len(s), err + } + } + + if bytes.Equal(line, append(delim, delim...)) { // first block + line = make([]byte, len(delim)) + copy(line, delim) + continue + } + + if bytes.HasSuffix(line, delim) { + return line[:len(line)-len(delim)], len(s), nil + } + } +} + +// RebuildIndex renews index from store file +func (s *Store) RebuildIndex() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.rebuildIndex() + if err != nil { + return err + } + + if s.options.useIndexFile { + return s.saveIndex() + } + + return nil +} + +func (s *Store) rebuildIndex() error { + f, err := os.Open(s.filePath) + if err != nil { + return err + } + defer f.Close() + + r := bufio.NewReader(f) + + var blockOffset int64 + + s.dataOffset = make(map[string]Offsets) + + for { + l, n, err := readBlock(r) + if err != nil { + if err != io.EOF { + return err + } else if err == io.EOF && len(l) == 0 { + break + } + } + + dec, err := zstd.NewReader(bytes.NewReader(l)) + + var recordOffset int64 + for { + n, record, err := readRecord(dec) + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + switch record.Type { + case RecordTypeSet: + s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset} + case RecordTypeDelete: + delete(s.dataOffset, string(record.KeyHash[:])) + } + recordOffset += n + } + + blockOffset += int64(n) + } + + idxBuf := new(bytes.Buffer) + + err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) + if err != nil { + return err + } + + err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644) if err != nil { return err } return nil } + +func (s *Store) saveIndex() error { + f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + + err = gob.NewEncoder(f).Encode(s.dataOffset) + if err != nil { + return err + } + + return f.Close() +} diff --git a/zkv_test.go b/zkv_test.go index a497a3c..ec2d2de 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,6 +1,8 @@ package zkv import ( + "bufio" + "io" "os" "testing" @@ -11,6 +13,7 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -56,6 +59,7 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -91,6 +95,7 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -136,8 +141,9 @@ func TestDeleteBasic(t *testing.T) { func TestBufferBasic(t *testing.T) { const filePath = "TestBuffer.zkv" defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) - db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) err = db.Set(1, make([]byte, 100)) @@ -159,10 +165,11 @@ func TestBufferBasic(t *testing.T) { func TestBufferRead(t *testing.T) { const filePath = "TestBufferRead.zkv" - const recordCount = 100 + const recordCount = 2 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) - db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) for i := 1; i <= recordCount; i++ { @@ -207,3 +214,199 @@ func TestBufferRead(t *testing.T) { assert.NoError(t, err) } + +func TestBackupBasic(t *testing.T) { + const filePath = "TestBackupBasic.zkv" + const newFilePath = "TestBackupBasic2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +} + +func TestBackupWithDeletedRecords(t *testing.T) { + const filePath = "TestBackupWithDeletedRecords.zkv" + const newFilePath = "TestBackupWithDeletedRecords2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Flush() + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + if i%2 == 1 { + continue + } + + err = db.Delete(i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount/2) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + if i%2 == 0 { + assert.ErrorIs(t, err, ErrNotExists) + } else { + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + } + + err = db.Close() + assert.NoError(t, err) +} + +func TestIndexFileBasic(t *testing.T) { + const filePath = "TestReadWriteBasic.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) +} + +func TestReadBlock(t *testing.T) { + file, err := os.Open("testdata/TestReadBlock.zkv") + assert.NoError(t, err) + defer file.Close() + + r := bufio.NewReader(file) + + line, _, err := readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line) + + line, _, err = readBlock(r) + assert.Equal(t, line, []byte{}) + assert.Equal(t, io.EOF, err) +} + +func TestRebuildIndex(t *testing.T) { + const filePath = "TestRebuiltIndex.zkv" + const recordCount = 4 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + for i := 1; i <= recordCount; i++ { + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.Set(i, i) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + } + + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.RebuildIndex() + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } +}