diff --git a/README.md b/README.md deleted file mode 100644 index 4474fec..0000000 --- a/README.md +++ /dev/null @@ -1,108 +0,0 @@ -# zkv - -Simple key-value store for single-user applications. - -## Pros - -* Simple two file structure (data file and index file) -* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) -* Threadsafe operations through `sync.RWMutex` - -## Cons - -* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) -* No transaction system -* Index file is fully rewrited on every store commit -* No way to recover disk space from deleted records -* Write/Delete operations block Read and each other operations - -## Usage - -Create or open existing file: - -```go -db, err := zkv.Open("path to file") -``` - -Data operations: - -```go -// Write data -err = db.Set(key, value) // key and value can be any of type - -// Read data -var value ValueType -err = db.Get(key, &value) - -// Delete data -err = db.Delete(key) -``` - -Other methods: - -```go -// Flush data to disk -err = db.Flush() - -// Backup data to another file -err = db.Backup("new/file/path") -``` - -## Store options - -```go -type Options struct { - // Maximum number of concurrent reads - MaxParallelReads int - - // Compression level - CompressionLevel zstd.EncoderLevel - - // Memory write buffer size in bytes - MemoryBufferSize int - - // Disk write buffer size in bytes - DiskBufferSize int -} - -``` - -## File structure - -Record is `encoding/gob` structure: - -| Field | Description | Size | -| ---------- | ---------------------------------- | -------- | -| Type | Record type | uint8 | -| KeyHash | Key hash | 28 bytes | -| ValueBytes | Value gob-encoded bytes | variable | - -File is log stuctured list of commands: - -| Field | Description | Size | -| -------| ------------------------ | -------- | -| Length | Record body bytes length | int64 | -| Body | Gob-encoded record | variable | - -Index file is simple gob-encoded map: - -```go -map[string]struct { - BlockOffset int64 - RecordOffset int64 -} -``` - -where map key is data key hash and value - data offset in data file. - -## Resource consumption - -Store requirements: - -* around 300 Mb of RAM per 1 million of keys -* around 34 Mb of disk space for index file per 1 million of keys - -## TODO - -- [ ] Add recovery previous state of store file on write error -- [ ] Add method for index rebuild diff --git a/TestDeleteBasic.zkv.idx b/TestDeleteBasic.zkv.idx deleted file mode 100644 index b341db7..0000000 Binary files a/TestDeleteBasic.zkv.idx and /dev/null differ diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx deleted file mode 100644 index 6de70da..0000000 Binary files a/TestSmallWrites.zkv.idx and /dev/null differ diff --git a/defaults.go b/defaults.go deleted file mode 100644 index cb4a086..0000000 --- a/defaults.go +++ /dev/null @@ -1,17 +0,0 @@ -package zkv - -import ( - "runtime" - - "github.com/klauspost/compress/zstd" -) - -var defaultOptions = Options{ - MaxParallelReads: runtime.NumCPU(), - CompressionLevel: zstd.SpeedDefault, - MemoryBufferSize: 4 * 1024 * 1024, - DiskBufferSize: 1 * 1024 * 1024, - useIndexFile: true, -} - -const indexFileExt = ".idx" diff --git a/go.mod b/go.mod index 4dc95a4..392f6ec 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/nxshock/zkv go 1.19 require ( - github.com/klauspost/compress v1.16.4 - github.com/stretchr/testify v1.8.2 + github.com/klauspost/compress v1.15.12 + github.com/stretchr/testify v1.8.1 ) require ( diff --git a/go.sum b/go.sum index abc35b1..8609118 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU= -github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= +github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -10,8 +10,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= -github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/options.go b/options.go deleted file mode 100644 index 148992b..0000000 --- a/options.go +++ /dev/null @@ -1,40 +0,0 @@ -package zkv - -import "github.com/klauspost/compress/zstd" - -type Options struct { - // Maximum number of concurrent reads - MaxParallelReads int - - // Compression level - CompressionLevel zstd.EncoderLevel - - // Memory write buffer size in bytes - MemoryBufferSize int - - // Disk write buffer size in bytes - DiskBufferSize int - - // Use index file - useIndexFile bool -} - -func (o *Options) setDefaults() { - o.useIndexFile = true // TODO: implement database search without index - - if o.MaxParallelReads == 0 { - o.MaxParallelReads = defaultOptions.MaxParallelReads - } - - if o.CompressionLevel == 0 { - o.CompressionLevel = defaultOptions.CompressionLevel - } - - if o.MemoryBufferSize == 0 { - o.MemoryBufferSize = defaultOptions.MemoryBufferSize - } - - if o.DiskBufferSize == 0 { - o.DiskBufferSize = defaultOptions.DiskBufferSize - } -} diff --git a/record.go b/record.go index 0c3c941..eac0325 100644 --- a/record.go +++ b/record.go @@ -2,7 +2,6 @@ package zkv import ( "bytes" - "crypto/sha256" "encoding/binary" "encoding/gob" "io" @@ -17,21 +16,12 @@ const ( type Record struct { Type RecordType - KeyHash [28]byte + KeyHash []byte ValueBytes []byte } -func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) { - record := &Record{ - Type: recordType, - KeyHash: keyHash, - ValueBytes: valueBytes} - - return record, nil -} - func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { - keyHash, err := hashInterface(key) + keyBytes, err := encode(key) if err != nil { return nil, err } @@ -41,7 +31,12 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { return nil, err } - return newRecordBytes(recordType, keyHash, valueBytes) + record := &Record{ + Type: recordType, + KeyHash: hashBytes(keyBytes), + ValueBytes: valueBytes} + + return record, nil } func (r *Record) Marshal() ([]byte, error) { diff --git a/record_test.go b/record_test.go deleted file mode 100644 index 1e69354..0000000 --- a/record_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package zkv - -import ( - "bytes" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRecord(t *testing.T) { - buf := new(bytes.Buffer) - - var records []Record - - for i := 0; i < 10; i++ { - record, err := newRecord(RecordTypeSet, i, i) - assert.NoError(t, err) - - records = append(records, *record) - - b, err := record.Marshal() - assert.NoError(t, err) - - _, err = buf.Write(b) - assert.NoError(t, err) - } - - for i := 0; i < 10; i++ { - _, record, err := readRecord(buf) - assert.NoError(t, err) - - assert.Equal(t, record.KeyHash, records[i].KeyHash) - assert.Equal(t, record.ValueBytes, records[i].ValueBytes) - } -} diff --git a/testdata/TestReadBlock.zkv b/testdata/TestReadBlock.zkv deleted file mode 100644 index 7916c5b..0000000 Binary files a/testdata/TestReadBlock.zkv and /dev/null differ diff --git a/utils.go b/utils.go index ae570b2..10e0e15 100644 --- a/utils.go +++ b/utils.go @@ -4,9 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/gob" - "errors" "io" - "os" ) func encode(value interface{}) ([]byte, error) { @@ -19,17 +17,20 @@ func decode(b []byte, value interface{}) error { return gob.NewDecoder(bytes.NewReader(b)).Decode(value) } -func hashInterface(value interface{}) ([sha256.Size224]byte, error) { +func hashInterface(value interface{}) ([]byte, error) { valueBytes, err := encode(value) if err != nil { - return [sha256.Size224]byte{}, err + return nil, err } return hashBytes(valueBytes), nil } -func hashBytes(b []byte) [sha256.Size224]byte { - return sha256.Sum224(b) +func hashBytes(b []byte) []byte { + bytes := sha256.Sum224(b) + + return bytes[:] + } func skip(r io.Reader, count int64) (err error) { @@ -42,13 +43,3 @@ func skip(r io.Reader, count int64) (err error) { return err } - -func isFileExists(filePath string) (bool, error) { - if _, err := os.Stat(filePath); err == nil { - return true, nil - } else if errors.Is(err, os.ErrNotExist) { - return false, nil - } else { - return false, err - } -} diff --git a/zkv.go b/zkv.go index 8a2b7d6..d8b75fa 100644 --- a/zkv.go +++ b/zkv.go @@ -1,11 +1,8 @@ package zkv import ( - "bufio" "bytes" - "crypto/sha256" "encoding/base64" - "encoding/gob" "fmt" "io" "os" @@ -14,88 +11,156 @@ import ( "github.com/klauspost/compress/zstd" ) -type Offsets struct { - BlockOffset int64 - RecordOffset int64 +type Database struct { + dataOffset map[string]int64 + file *os.File + compressor *zstd.Encoder + filePath string + offset int64 + + mu sync.Mutex } -type Store struct { - dataOffset map[string]Offsets +func (db *Database) Close() error { + db.mu.Lock() + defer db.mu.Unlock() - filePath string - - buffer *bytes.Buffer - bufferDataOffset map[string]int64 - - options Options - - readOrderChan chan struct{} - - mu sync.RWMutex -} - -func OpenWithOptions(filePath string, options Options) (*Store, error) { - options.setDefaults() - - store := &Store{ - dataOffset: make(map[string]Offsets), - bufferDataOffset: make(map[string]int64), - buffer: new(bytes.Buffer), - filePath: filePath, - options: options, - readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - - if options.useIndexFile { - idxFile, err := os.Open(filePath + indexFileExt) - if err == nil { - err = gob.NewDecoder(idxFile).Decode(&store.dataOffset) - if err != nil { - return nil, err - } - - return store, nil - } - } - - exists, err := isFileExists(filePath) + err := db.compressor.Close() if err != nil { - return nil, err + return err } + return db.file.Close() +} + +func (db *Database) Set(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + db.dataOffset[string(record.KeyHash)] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки + + _, err = db.compressor.Write(b) + if err != nil { + return err + } + + db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки + + return nil +} + +func (db *Database) Get(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := db.dataOffset[string(hashToFind)] if !exists { - return store, nil + return ErrNotExists } - err = store.rebuildIndex() + readF, err := os.Open(db.filePath) if err != nil { - return nil, err + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err } - return store, nil + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if !bytes.Equal(record.KeyHash, hashToFind) { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind), base64.StdEncoding.EncodeToString(record.KeyHash)) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) } -func Open(filePath string) (*Store, error) { - options := defaultOptions - return OpenWithOptions(filePath, options) +func Open(filePath string) (*Database, error) { + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) + } + + compressor, err := zstd.NewWriter(f) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) + } + + database := &Database{ + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath} + + // restore file data + readF, err := os.Open(filePath) + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) + } + defer decompressor.Close() + + offset := int64(0) + for { + n, record, err := readRecord(decompressor) + if err == io.EOF { + break + } + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) + } + + switch record.Type { + case RecordTypeSet: + database.dataOffset[string(record.KeyHash)] = offset + case RecordTypeDelete: + delete(database.dataOffset, string(record.KeyHash)) + } + + offset += n + } + + return database, nil } -func (s *Store) Set(key, value interface{}) error { - s.mu.Lock() - defer s.mu.Unlock() - - return s.set(key, value) -} - -func (s *Store) Get(key, value interface{}) error { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.get(key, value) -} - -func (s *Store) Delete(key interface{}) error { - s.mu.Lock() - defer s.mu.Unlock() +func (db *Database) Delete(key interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() keyHash, err := hashInterface(key) if err != nil { @@ -112,397 +177,14 @@ func (s *Store) Delete(key interface{}) error { return err } - delete(s.dataOffset, string(record.KeyHash[:])) - delete(s.bufferDataOffset, string(record.KeyHash[:])) + delete(db.dataOffset, string(record.KeyHash)) - _, err = s.buffer.Write(b) + _, err = db.compressor.Write(b) if err != nil { return err } - if s.buffer.Len() > s.options.MemoryBufferSize { - err = s.flush() - - if err != nil { - return err - } - } + db.offset += int64(len(b)) return nil } - -func (s *Store) Flush() error { - s.mu.Lock() - defer s.mu.Unlock() - - return s.flush() -} - -func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error { - s.mu.Lock() - defer s.mu.Unlock() - - err := s.flush() - if err != nil { - return err - } - - newStore, err := OpenWithOptions(filePath, newFileOptions) - if err != nil { - return err - } - - for keyHashStr := range s.dataOffset { - var keyHash [sha256.Size224]byte - copy(keyHash[:], keyHashStr) - - valueBytes, err := s.getGobBytes(keyHash) - if err != nil { - newStore.Close() - return err - } - err = newStore.setBytes(keyHash, valueBytes) - if err != nil { - newStore.Close() - return err - } - } - - return newStore.Close() -} - -func (s *Store) Backup(filePath string) error { - return s.BackupWithOptions(filePath, defaultOptions) -} - -func (s *Store) Close() error { - s.mu.Lock() - defer s.mu.Unlock() - - err := s.flush() - if err != nil { - return err - } - - return nil -} - -func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { - record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes) - if err != nil { - return err - } - - b, err := record.Marshal() - if err != nil { - return err - } - - s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) - - _, err = s.buffer.Write(b) - if err != nil { - return err - } - - if s.buffer.Len() > s.options.MemoryBufferSize { - err = s.flush() - - if err != nil { - return err - } - } - - return nil -} - -func (s *Store) set(key, value interface{}) error { - record, err := newRecord(RecordTypeSet, key, value) - if err != nil { - return err - } - - b, err := record.Marshal() - if err != nil { - return err - } - - s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) - - _, err = s.buffer.Write(b) - if err != nil { - return err - } - - if s.buffer.Len() > s.options.MemoryBufferSize { - err = s.flush() - - if err != nil { - return err - } - } - - return nil -} - -func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { - s.readOrderChan <- struct{}{} - defer func() { <-s.readOrderChan }() - - offset, exists := s.bufferDataOffset[string(keyHash[:])] - if exists { - reader := bytes.NewReader(s.buffer.Bytes()) - - err := skip(reader, offset) - if err != nil { - return nil, err - } - - _, record, err := readRecord(reader) - if err != nil { - return nil, err - } - - return record.ValueBytes, nil - } - - offsets, exists := s.dataOffset[string(keyHash[:])] - if !exists { - return nil, ErrNotExists - } - - readF, err := os.Open(s.filePath) - if err != nil { - return nil, err - } - defer readF.Close() - - _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) - if err != nil { - return nil, err - } - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return nil, err - } - defer decompressor.Close() - - err = skip(decompressor, offsets.RecordOffset) - if err != nil { - return nil, err - } - - _, record, err := readRecord(decompressor) - if err != nil { - return nil, err - } - - if !bytes.Equal(record.KeyHash[:], keyHash[:]) { - expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:]) - gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) - return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) - } - - return record.ValueBytes, nil -} - -func (s *Store) get(key, value interface{}) error { - s.readOrderChan <- struct{}{} - defer func() { <-s.readOrderChan }() - - hashToFind, err := hashInterface(key) - if err != nil { - return err - } - - b, err := s.getGobBytes(hashToFind) - if err != nil { - return err - } - - return decode(b, value) -} - -func (s *Store) flush() error { - l := int64(s.buffer.Len()) - - f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return fmt.Errorf("open store file: %v", err) - } - stat, err := f.Stat() - if err != nil { - f.Close() - return fmt.Errorf("stat store file: %v", err) - } - - diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) - - encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) - if err != nil { - f.Close() - return fmt.Errorf("init encoder: %v", err) - } - - _, err = s.buffer.WriteTo(encoder) - if err != nil { - return err - } - - for key, val := range s.bufferDataOffset { - s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} - } - - s.bufferDataOffset = make(map[string]int64) - - err = encoder.Close() - if err != nil { - // TODO: truncate file to previous state - return err - } - - err = diskWriteBuffer.Flush() - if err != nil { - // TODO: truncate file to previous state - return err - } - - err = f.Close() - if err != nil { - return err - } - - // Update index file only on data update - if s.options.useIndexFile && l > 0 { - err = s.saveIndex() - if err != nil { - return err - } - } - - return nil -} - -func readBlock(r *bufio.Reader) (line []byte, n int, err error) { - delim := []byte{0x28, 0xb5, 0x2f, 0xfd} - - line = make([]byte, len(delim)) - copy(line, delim) - - for { - s, err := r.ReadBytes(delim[len(delim)-1]) - line = append(line, []byte(s)...) - if err != nil { - if bytes.Equal(line, delim) { // contains only magic number - return []byte{}, 0, err - } else { - return line, len(s), err - } - } - - if bytes.Equal(line, append(delim, delim...)) { // first block - line = make([]byte, len(delim)) - copy(line, delim) - continue - } - - if bytes.HasSuffix(line, delim) { - return line[:len(line)-len(delim)], len(s), nil - } - } -} - -// RebuildIndex renews index from store file -func (s *Store) RebuildIndex() error { - s.mu.Lock() - defer s.mu.Unlock() - - err := s.rebuildIndex() - if err != nil { - return err - } - - if s.options.useIndexFile { - return s.saveIndex() - } - - return nil -} - -func (s *Store) rebuildIndex() error { - f, err := os.Open(s.filePath) - if err != nil { - return err - } - defer f.Close() - - r := bufio.NewReader(f) - - var blockOffset int64 - - s.dataOffset = make(map[string]Offsets) - - for { - l, n, err := readBlock(r) - if err != nil { - if err != io.EOF { - return err - } else if err == io.EOF && len(l) == 0 { - break - } - } - - dec, err := zstd.NewReader(bytes.NewReader(l)) - - var recordOffset int64 - for { - n, record, err := readRecord(dec) - if err != nil { - if err == io.EOF { - break - } else { - return err - } - } - - switch record.Type { - case RecordTypeSet: - s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset} - case RecordTypeDelete: - delete(s.dataOffset, string(record.KeyHash[:])) - } - recordOffset += n - } - - blockOffset += int64(n) - } - - idxBuf := new(bytes.Buffer) - - err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) - if err != nil { - return err - } - - err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644) - if err != nil { - return err - } - - return nil -} - -func (s *Store) saveIndex() error { - f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return err - } - - err = gob.NewEncoder(f).Encode(s.dataOffset) - if err != nil { - return err - } - - return f.Close() -} diff --git a/zkv_test.go b/zkv_test.go index ec2d2de..36f6aac 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,8 +1,6 @@ package zkv import ( - "bufio" - "io" "os" "testing" @@ -13,7 +11,6 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -23,16 +20,7 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } + assert.Len(t, db.dataOffset, recordCount) err = db.Close() assert.NoError(t, err) @@ -59,7 +47,6 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -95,7 +82,6 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -105,15 +91,12 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount) + assert.Len(t, db.dataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount-1) - + assert.Len(t, db.dataOffset, recordCount-1) var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -127,8 +110,6 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) - assert.Len(t, db.bufferDataOffset, 0) - value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -137,276 +118,3 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } - -func TestBufferBasic(t *testing.T) { - const filePath = "TestBuffer.zkv" - defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) - - db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) - assert.NoError(t, err) - - err = db.Set(1, make([]byte, 100)) - assert.NoError(t, err) - - assert.NotEqual(t, 0, db.dataOffset) - assert.Len(t, db.bufferDataOffset, 0) - assert.Equal(t, 0, db.buffer.Len()) - - var gotValue []byte - err = db.Get(1, &gotValue) - assert.NoError(t, err) - - assert.Equal(t, make([]byte, 100), gotValue) - - err = db.Close() - assert.NoError(t, err) -} - -func TestBufferRead(t *testing.T) { - const filePath = "TestBufferRead.zkv" - const recordCount = 2 - defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) - - db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - err = db.Set(i, i) - assert.NoError(t, err) - } - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) - - // try to read - db, err = Open(filePath) - assert.NoError(t, err) - - assert.Len(t, db.dataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) - -} - -func TestBackupBasic(t *testing.T) { - const filePath = "TestBackupBasic.zkv" - const newFilePath = "TestBackupBasic2.zkv" - const recordCount = 100 - defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) - defer os.Remove(newFilePath) - defer os.Remove(newFilePath + indexFileExt) - - db, err := Open(filePath) - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - err = db.Set(i, i) - assert.NoError(t, err) - } - - err = db.Backup(newFilePath) - assert.NoError(t, err) - - err = db.Close() - assert.NoError(t, err) - - db, err = Open(newFilePath) - assert.NoError(t, err) - - assert.Len(t, db.dataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) - -} - -func TestBackupWithDeletedRecords(t *testing.T) { - const filePath = "TestBackupWithDeletedRecords.zkv" - const newFilePath = "TestBackupWithDeletedRecords2.zkv" - const recordCount = 100 - defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) - defer os.Remove(newFilePath) - defer os.Remove(newFilePath + indexFileExt) - - db, err := Open(filePath) - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - err = db.Set(i, i) - assert.NoError(t, err) - } - - err = db.Flush() - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - if i%2 == 1 { - continue - } - - err = db.Delete(i) - assert.NoError(t, err) - } - - err = db.Backup(newFilePath) - assert.NoError(t, err) - - err = db.Close() - assert.NoError(t, err) - - db, err = Open(newFilePath) - assert.NoError(t, err) - - assert.Len(t, db.dataOffset, recordCount/2) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - if i%2 == 0 { - assert.ErrorIs(t, err, ErrNotExists) - } else { - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - } - - err = db.Close() - assert.NoError(t, err) -} - -func TestIndexFileBasic(t *testing.T) { - const filePath = "TestReadWriteBasic.zkv" - const recordCount = 100 - defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) - - db, err := Open(filePath) - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - err = db.Set(i, i) - assert.NoError(t, err) - } - - assert.Len(t, db.dataOffset, 0) - assert.Len(t, db.bufferDataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) - - // try to read - db, err = Open(filePath) - assert.NoError(t, err) - - assert.Len(t, db.dataOffset, recordCount) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } - - err = db.Close() - assert.NoError(t, err) -} - -func TestReadBlock(t *testing.T) { - file, err := os.Open("testdata/TestReadBlock.zkv") - assert.NoError(t, err) - defer file.Close() - - r := bufio.NewReader(file) - - line, _, err := readBlock(r) - assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line) - line, _, err = readBlock(r) - assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line) - line, _, err = readBlock(r) - assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line) - line, _, err = readBlock(r) - assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line) - - line, _, err = readBlock(r) - assert.Equal(t, line, []byte{}) - assert.Equal(t, io.EOF, err) -} - -func TestRebuildIndex(t *testing.T) { - const filePath = "TestRebuiltIndex.zkv" - const recordCount = 4 - defer os.Remove(filePath) - defer os.Remove(filePath + indexFileExt) - - for i := 1; i <= recordCount; i++ { - db, err := Open(filePath) - assert.NoError(t, err) - - err = db.Set(i, i) - assert.NoError(t, err) - - err = db.Close() - assert.NoError(t, err) - } - - db, err := Open(filePath) - assert.NoError(t, err) - - err = db.RebuildIndex() - assert.NoError(t, err) - - for i := 1; i <= recordCount; i++ { - var gotValue int - - err = db.Get(i, &gotValue) - assert.NoError(t, err) - assert.Equal(t, i, gotValue) - } -}