diff --git a/README.md b/README.md index f250cd2..4474fec 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,15 @@ Simple key-value store for single-user applications. ## Pros -* Simple file structure -* Internal compression +* Simple two file structure (data file and index file) +* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons * Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) -* Need to read the whole file on store open to create file index +* No transaction system +* Index file is fully rewrited on every store commit * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations @@ -20,7 +21,7 @@ Simple key-value store for single-user applications. Create or open existing file: ```go -db, err := Open("path to file") +db, err := zkv.Open("path to file") ``` Data operations: @@ -31,12 +32,41 @@ err = db.Set(key, value) // key and value can be any of type // Read data var value ValueType -err = db.Get(key) +err = db.Get(key, &value) // Delete data err = db.Delete(key) ``` +Other methods: + +```go +// Flush data to disk +err = db.Flush() + +// Backup data to another file +err = db.Backup("new/file/path") +``` + +## Store options + +```go +type Options struct { + // Maximum number of concurrent reads + MaxParallelReads int + + // Compression level + CompressionLevel zstd.EncoderLevel + + // Memory write buffer size in bytes + MemoryBufferSize int + + // Disk write buffer size in bytes + DiskBufferSize int +} + +``` + ## File structure Record is `encoding/gob` structure: @@ -53,3 +83,26 @@ File is log stuctured list of commands: | -------| ------------------------ | -------- | | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | + +Index file is simple gob-encoded map: + +```go +map[string]struct { + BlockOffset int64 + RecordOffset int64 +} +``` + +where map key is data key hash and value - data offset in data file. + +## Resource consumption + +Store requirements: + +* around 300 Mb of RAM per 1 million of keys +* around 34 Mb of disk space for index file per 1 million of keys + +## TODO + +- [ ] Add recovery previous state of store file on write error +- [ ] Add method for index rebuild diff --git a/TestDeleteBasic.zkv.idx b/TestDeleteBasic.zkv.idx new file mode 100644 index 0000000..b341db7 Binary files /dev/null and b/TestDeleteBasic.zkv.idx differ diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx new file mode 100644 index 0000000..6de70da Binary files /dev/null and b/TestSmallWrites.zkv.idx differ diff --git a/defaults.go b/defaults.go new file mode 100644 index 0000000..cb4a086 --- /dev/null +++ b/defaults.go @@ -0,0 +1,17 @@ +package zkv + +import ( + "runtime" + + "github.com/klauspost/compress/zstd" +) + +var defaultOptions = Options{ + MaxParallelReads: runtime.NumCPU(), + CompressionLevel: zstd.SpeedDefault, + MemoryBufferSize: 4 * 1024 * 1024, + DiskBufferSize: 1 * 1024 * 1024, + useIndexFile: true, +} + +const indexFileExt = ".idx" diff --git a/go.mod b/go.mod index 392f6ec..4dc95a4 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/nxshock/zkv go 1.19 require ( - github.com/klauspost/compress v1.15.12 - github.com/stretchr/testify v1.8.1 + github.com/klauspost/compress v1.16.4 + github.com/stretchr/testify v1.8.2 ) require ( diff --git a/go.sum b/go.sum index 8609118..abc35b1 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= -github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= +github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -10,8 +10,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go new file mode 100644 index 0000000..148992b --- /dev/null +++ b/options.go @@ -0,0 +1,40 @@ +package zkv + +import "github.com/klauspost/compress/zstd" + +type Options struct { + // Maximum number of concurrent reads + MaxParallelReads int + + // Compression level + CompressionLevel zstd.EncoderLevel + + // Memory write buffer size in bytes + MemoryBufferSize int + + // Disk write buffer size in bytes + DiskBufferSize int + + // Use index file + useIndexFile bool +} + +func (o *Options) setDefaults() { + o.useIndexFile = true // TODO: implement database search without index + + if o.MaxParallelReads == 0 { + o.MaxParallelReads = defaultOptions.MaxParallelReads + } + + if o.CompressionLevel == 0 { + o.CompressionLevel = defaultOptions.CompressionLevel + } + + if o.MemoryBufferSize == 0 { + o.MemoryBufferSize = defaultOptions.MemoryBufferSize + } + + if o.DiskBufferSize == 0 { + o.DiskBufferSize = defaultOptions.DiskBufferSize + } +} diff --git a/record.go b/record.go index 85a8c2b..0c3c941 100644 --- a/record.go +++ b/record.go @@ -2,6 +2,7 @@ package zkv import ( "bytes" + "crypto/sha256" "encoding/binary" "encoding/gob" "io" @@ -20,8 +21,17 @@ type Record struct { ValueBytes []byte } +func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) { + record := &Record{ + Type: recordType, + KeyHash: keyHash, + ValueBytes: valueBytes} + + return record, nil +} + func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { - keyBytes, err := encode(key) + keyHash, err := hashInterface(key) if err != nil { return nil, err } @@ -31,12 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { return nil, err } - record := &Record{ - Type: recordType, - KeyHash: hashBytes(keyBytes), - ValueBytes: valueBytes} - - return record, nil + return newRecordBytes(recordType, keyHash, valueBytes) } func (r *Record) Marshal() ([]byte, error) { diff --git a/record_test.go b/record_test.go new file mode 100644 index 0000000..1e69354 --- /dev/null +++ b/record_test.go @@ -0,0 +1,35 @@ +package zkv + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRecord(t *testing.T) { + buf := new(bytes.Buffer) + + var records []Record + + for i := 0; i < 10; i++ { + record, err := newRecord(RecordTypeSet, i, i) + assert.NoError(t, err) + + records = append(records, *record) + + b, err := record.Marshal() + assert.NoError(t, err) + + _, err = buf.Write(b) + assert.NoError(t, err) + } + + for i := 0; i < 10; i++ { + _, record, err := readRecord(buf) + assert.NoError(t, err) + + assert.Equal(t, record.KeyHash, records[i].KeyHash) + assert.Equal(t, record.ValueBytes, records[i].ValueBytes) + } +} diff --git a/testdata/TestReadBlock.zkv b/testdata/TestReadBlock.zkv new file mode 100644 index 0000000..7916c5b Binary files /dev/null and b/testdata/TestReadBlock.zkv differ diff --git a/utils.go b/utils.go index 97d63be..ae570b2 100644 --- a/utils.go +++ b/utils.go @@ -4,7 +4,9 @@ import ( "bytes" "crypto/sha256" "encoding/gob" + "errors" "io" + "os" ) func encode(value interface{}) ([]byte, error) { @@ -40,3 +42,13 @@ func skip(r io.Reader, count int64) (err error) { return err } + +func isFileExists(filePath string) (bool, error) { + if _, err := os.Stat(filePath); err == nil { + return true, nil + } else if errors.Is(err, os.ErrNotExist) { + return false, nil + } else { + return false, err + } +} diff --git a/zkv.go b/zkv.go index 4df780b..8a2b7d6 100644 --- a/zkv.go +++ b/zkv.go @@ -1,8 +1,11 @@ package zkv import ( + "bufio" "bytes" + "crypto/sha256" "encoding/base64" + "encoding/gob" "fmt" "io" "os" @@ -11,156 +14,88 @@ import ( "github.com/klauspost/compress/zstd" ) -type Database struct { - dataOffset map[string]int64 - file *os.File - compressor *zstd.Encoder - filePath string - offset int64 +type Offsets struct { + BlockOffset int64 + RecordOffset int64 +} + +type Store struct { + dataOffset map[string]Offsets + + filePath string + + buffer *bytes.Buffer + bufferDataOffset map[string]int64 + + options Options + + readOrderChan chan struct{} mu sync.RWMutex } -func (db *Database) Close() error { - db.mu.Lock() - defer db.mu.Unlock() +func OpenWithOptions(filePath string, options Options) (*Store, error) { + options.setDefaults() - err := db.compressor.Close() - if err != nil { - return err + store := &Store{ + dataOffset: make(map[string]Offsets), + bufferDataOffset: make(map[string]int64), + buffer: new(bytes.Buffer), + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} + + if options.useIndexFile { + idxFile, err := os.Open(filePath + indexFileExt) + if err == nil { + err = gob.NewDecoder(idxFile).Decode(&store.dataOffset) + if err != nil { + return nil, err + } + + return store, nil + } } - return db.file.Close() -} - -func (db *Database) Set(key, value interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() - - record, err := newRecord(RecordTypeSet, key, value) + exists, err := isFileExists(filePath) if err != nil { - return err + return nil, err } - b, err := record.Marshal() - if err != nil { - return err - } - - db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки - - _, err = db.compressor.Write(b) - if err != nil { - return err - } - - db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки - - return nil -} - -func (db *Database) Get(key, value interface{}) error { - db.mu.RLock() - defer db.mu.RUnlock() - - hashToFind, err := hashInterface(key) - if err != nil { - return err - } - - offset, exists := db.dataOffset[string(hashToFind[:])] if !exists { - return ErrNotExists + return store, nil } - readF, err := os.Open(db.filePath) + err = store.rebuildIndex() if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) - if err != nil { - return err + return nil, err } - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку - } - - return decode(record.ValueBytes, value) + return store, nil } -func Open(filePath string) (*Database, error) { - f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) - } - - compressor, err := zstd.NewWriter(f) - if err != nil { - return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) - } - - database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath} - - // restore file data - readF, err := os.Open(filePath) - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) - } - defer decompressor.Close() - - offset := int64(0) - for { - n, record, err := readRecord(decompressor) - if err == io.EOF { - break - } - if err != nil { - f.Close() - return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) - } - - switch record.Type { - case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset - case RecordTypeDelete: - delete(database.dataOffset, string(record.KeyHash[:])) - } - - offset += n - } - - return database, nil +func Open(filePath string) (*Store, error) { + options := defaultOptions + return OpenWithOptions(filePath, options) } -func (db *Database) Delete(key interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() +func (s *Store) Set(key, value interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.set(key, value) +} + +func (s *Store) Get(key, value interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.get(key, value) +} + +func (s *Store) Delete(key interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() keyHash, err := hashInterface(key) if err != nil { @@ -177,14 +112,397 @@ func (db *Database) Delete(key interface{}) error { return err } - delete(db.dataOffset, string(record.KeyHash[:])) + delete(s.dataOffset, string(record.KeyHash[:])) + delete(s.bufferDataOffset, string(record.KeyHash[:])) - _, err = db.compressor.Write(b) + _, err = s.buffer.Write(b) if err != nil { return err } - db.offset += int64(len(b)) + if s.buffer.Len() > s.options.MemoryBufferSize { + err = s.flush() + + if err != nil { + return err + } + } return nil } + +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.flush() +} + +func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + newStore, err := OpenWithOptions(filePath, newFileOptions) + if err != nil { + return err + } + + for keyHashStr := range s.dataOffset { + var keyHash [sha256.Size224]byte + copy(keyHash[:], keyHashStr) + + valueBytes, err := s.getGobBytes(keyHash) + if err != nil { + newStore.Close() + return err + } + err = newStore.setBytes(keyHash, valueBytes) + if err != nil { + newStore.Close() + return err + } + } + + return newStore.Close() +} + +func (s *Store) Backup(filePath string) error { + return s.BackupWithOptions(filePath, defaultOptions) +} + +func (s *Store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + return nil +} + +func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { + record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.MemoryBufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) set(key, value interface{}) error { + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.MemoryBufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + offset, exists := s.bufferDataOffset[string(keyHash[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err := skip(reader, offset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(reader) + if err != nil { + return nil, err + } + + return record.ValueBytes, nil + } + + offsets, exists := s.dataOffset[string(keyHash[:])] + if !exists { + return nil, ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return nil, err + } + defer readF.Close() + + _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) + if err != nil { + return nil, err + } + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, err + } + defer decompressor.Close() + + err = skip(decompressor, offsets.RecordOffset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return nil, err + } + + if !bytes.Equal(record.KeyHash[:], keyHash[:]) { + expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:]) + gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) + return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) + } + + return record.ValueBytes, nil +} + +func (s *Store) get(key, value interface{}) error { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + b, err := s.getGobBytes(hashToFind) + if err != nil { + return err + } + + return decode(b, value) +} + +func (s *Store) flush() error { + l := int64(s.buffer.Len()) + + f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open store file: %v", err) + } + stat, err := f.Stat() + if err != nil { + f.Close() + return fmt.Errorf("stat store file: %v", err) + } + + diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) + + encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) + if err != nil { + f.Close() + return fmt.Errorf("init encoder: %v", err) + } + + _, err = s.buffer.WriteTo(encoder) + if err != nil { + return err + } + + for key, val := range s.bufferDataOffset { + s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} + } + + s.bufferDataOffset = make(map[string]int64) + + err = encoder.Close() + if err != nil { + // TODO: truncate file to previous state + return err + } + + err = diskWriteBuffer.Flush() + if err != nil { + // TODO: truncate file to previous state + return err + } + + err = f.Close() + if err != nil { + return err + } + + // Update index file only on data update + if s.options.useIndexFile && l > 0 { + err = s.saveIndex() + if err != nil { + return err + } + } + + return nil +} + +func readBlock(r *bufio.Reader) (line []byte, n int, err error) { + delim := []byte{0x28, 0xb5, 0x2f, 0xfd} + + line = make([]byte, len(delim)) + copy(line, delim) + + for { + s, err := r.ReadBytes(delim[len(delim)-1]) + line = append(line, []byte(s)...) + if err != nil { + if bytes.Equal(line, delim) { // contains only magic number + return []byte{}, 0, err + } else { + return line, len(s), err + } + } + + if bytes.Equal(line, append(delim, delim...)) { // first block + line = make([]byte, len(delim)) + copy(line, delim) + continue + } + + if bytes.HasSuffix(line, delim) { + return line[:len(line)-len(delim)], len(s), nil + } + } +} + +// RebuildIndex renews index from store file +func (s *Store) RebuildIndex() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.rebuildIndex() + if err != nil { + return err + } + + if s.options.useIndexFile { + return s.saveIndex() + } + + return nil +} + +func (s *Store) rebuildIndex() error { + f, err := os.Open(s.filePath) + if err != nil { + return err + } + defer f.Close() + + r := bufio.NewReader(f) + + var blockOffset int64 + + s.dataOffset = make(map[string]Offsets) + + for { + l, n, err := readBlock(r) + if err != nil { + if err != io.EOF { + return err + } else if err == io.EOF && len(l) == 0 { + break + } + } + + dec, err := zstd.NewReader(bytes.NewReader(l)) + + var recordOffset int64 + for { + n, record, err := readRecord(dec) + if err != nil { + if err == io.EOF { + break + } else { + return err + } + } + + switch record.Type { + case RecordTypeSet: + s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset} + case RecordTypeDelete: + delete(s.dataOffset, string(record.KeyHash[:])) + } + recordOffset += n + } + + blockOffset += int64(n) + } + + idxBuf := new(bytes.Buffer) + + err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) + if err != nil { + return err + } + + err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644) + if err != nil { + return err + } + + return nil +} + +func (s *Store) saveIndex() error { + f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + + err = gob.NewEncoder(f).Encode(s.dataOffset) + if err != nil { + return err + } + + return f.Close() +} diff --git a/zkv_test.go b/zkv_test.go index 36f6aac..ec2d2de 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,6 +1,8 @@ package zkv import ( + "bufio" + "io" "os" "testing" @@ -11,6 +13,7 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -20,7 +23,16 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } err = db.Close() assert.NoError(t, err) @@ -47,6 +59,7 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -82,6 +95,7 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -91,12 +105,15 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount-1) + var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -110,6 +127,8 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.bufferDataOffset, 0) + value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -118,3 +137,276 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } + +func TestBufferBasic(t *testing.T) { + const filePath = "TestBuffer.zkv" + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) + assert.NoError(t, err) + + err = db.Set(1, make([]byte, 100)) + assert.NoError(t, err) + + assert.NotEqual(t, 0, db.dataOffset) + assert.Len(t, db.bufferDataOffset, 0) + assert.Equal(t, 0, db.buffer.Len()) + + var gotValue []byte + err = db.Get(1, &gotValue) + assert.NoError(t, err) + + assert.Equal(t, make([]byte, 100), gotValue) + + err = db.Close() + assert.NoError(t, err) +} + +func TestBufferRead(t *testing.T) { + const filePath = "TestBufferRead.zkv" + const recordCount = 2 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +} + +func TestBackupBasic(t *testing.T) { + const filePath = "TestBackupBasic.zkv" + const newFilePath = "TestBackupBasic2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +} + +func TestBackupWithDeletedRecords(t *testing.T) { + const filePath = "TestBackupWithDeletedRecords.zkv" + const newFilePath = "TestBackupWithDeletedRecords2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Flush() + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + if i%2 == 1 { + continue + } + + err = db.Delete(i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount/2) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + if i%2 == 0 { + assert.ErrorIs(t, err, ErrNotExists) + } else { + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + } + + err = db.Close() + assert.NoError(t, err) +} + +func TestIndexFileBasic(t *testing.T) { + const filePath = "TestReadWriteBasic.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) +} + +func TestReadBlock(t *testing.T) { + file, err := os.Open("testdata/TestReadBlock.zkv") + assert.NoError(t, err) + defer file.Close() + + r := bufio.NewReader(file) + + line, _, err := readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line) + line, _, err = readBlock(r) + assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line) + + line, _, err = readBlock(r) + assert.Equal(t, line, []byte{}) + assert.Equal(t, io.EOF, err) +} + +func TestRebuildIndex(t *testing.T) { + const filePath = "TestRebuiltIndex.zkv" + const recordCount = 4 + defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) + + for i := 1; i <= recordCount; i++ { + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.Set(i, i) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + } + + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.RebuildIndex() + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } +}