1
0
mirror of https://github.com/nxshock/zkv.git synced 2025-04-20 09:21:50 +05:00

Compare commits

..

2 Commits

Author SHA1 Message Date
e166e07daa Do not block file for writes 2022-12-07 21:06:36 +05:00
80540a662a Add test for backup with deleted records 2022-12-07 19:20:38 +05:00
3 changed files with 80 additions and 29 deletions

View File

@ -66,6 +66,6 @@ File is log stuctured list of commands:
## TODO ## TODO
- [ ] Add delete records test for `Backup()` method
- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go)
- [ ] Implement optional separate index file to speedup store initialization - [ ] Implement optional separate index file to speedup store initialization
- [ ] Add recovery previous state of store file on write error

52
zkv.go
View File

@ -15,10 +15,8 @@ import (
type Store struct { type Store struct {
dataOffset map[string]int64 dataOffset map[string]int64
file *os.File
filePath string filePath string
offset int64 offset int64
encoder *zstd.Encoder
buffer *bytes.Buffer buffer *bytes.Buffer
bufferDataOffset map[string]int64 bufferDataOffset map[string]int64
@ -33,24 +31,10 @@ type Store struct {
func OpenWithOptions(filePath string, options Options) (*Store, error) { func OpenWithOptions(filePath string, options Options) (*Store, error) {
options.setDefaults() options.setDefaults()
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
f.Close()
return nil, fmt.Errorf("open store file: %v", err)
}
compressor, err := zstd.NewWriter(f)
if err != nil {
f.Close()
return nil, fmt.Errorf("compressor initialization: %v", err)
}
database := &Store{ database := &Store{
dataOffset: make(map[string]int64), dataOffset: make(map[string]int64),
bufferDataOffset: make(map[string]int64), bufferDataOffset: make(map[string]int64),
offset: 0, offset: 0,
file: f,
encoder: compressor,
buffer: new(bytes.Buffer), buffer: new(bytes.Buffer),
filePath: filePath, filePath: filePath,
options: options, options: options,
@ -58,15 +42,16 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
// restore file data // restore file data
readF, err := os.Open(filePath) readF, err := os.Open(filePath)
if err != nil { if os.IsNotExist(err) {
f.Close() // Empty datebase
return database, nil
} else if err != nil {
return nil, fmt.Errorf("open file for indexing: %v", err) return nil, fmt.Errorf("open file for indexing: %v", err)
} }
defer readF.Close() defer readF.Close()
decompressor, err := zstd.NewReader(readF) decompressor, err := zstd.NewReader(readF)
if err != nil { if err != nil {
f.Close()
return nil, fmt.Errorf("decompressor initialization: %v", err) return nil, fmt.Errorf("decompressor initialization: %v", err)
} }
defer decompressor.Close() defer decompressor.Close()
@ -78,7 +63,6 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
break break
} }
if err != nil { if err != nil {
f.Close()
return nil, fmt.Errorf("read record error: %v", err) return nil, fmt.Errorf("read record error: %v", err)
} }
@ -204,12 +188,7 @@ func (s *Store) Close() error {
return err return err
} }
err = s.encoder.Close() return nil
if err != nil {
return err
}
return s.file.Close()
} }
func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error {
@ -393,7 +372,18 @@ func (s *Store) get(key, value interface{}) error {
func (s *Store) flush() error { func (s *Store) flush() error {
l := int64(s.buffer.Len()) l := int64(s.buffer.Len())
_, err := s.buffer.WriteTo(s.encoder) f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open store file: %v", err)
}
encoder, err := zstd.NewWriter(f, zstd.WithEncoderLevel(s.options.CompressionLevel))
if err != nil {
f.Close()
return fmt.Errorf("open store file: %v", err)
}
_, err = s.buffer.WriteTo(encoder)
if err != nil { if err != nil {
return err return err
} }
@ -406,7 +396,13 @@ func (s *Store) flush() error {
s.offset += l s.offset += l
err = s.encoder.Flush() err = encoder.Close()
if err != nil {
// TODO: truncate file to previous state
return err
}
err = f.Close()
if err != nil { if err != nil {
return err return err
} }

View File

@ -274,3 +274,58 @@ func TestBackupBasic(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestBackupWithDeletedRecords(t *testing.T) {
const filePath = "TestBackupWithDeletedRecords.zkv"
const newFilePath = "TestBackupWithDeletedRecords2.zkv"
const recordCount = 100
defer os.Remove(filePath)
defer os.Remove(newFilePath)
db, err := Open(filePath)
assert.NoError(t, err)
for i := 1; i <= recordCount; i++ {
err = db.Set(i, i)
assert.NoError(t, err)
}
err = db.Flush()
assert.NoError(t, err)
for i := 1; i <= recordCount; i++ {
if i%2 == 1 {
continue
}
err = db.Delete(i)
assert.NoError(t, err)
}
err = db.Backup(newFilePath)
assert.NoError(t, err)
err = db.Close()
assert.NoError(t, err)
db, err = Open(newFilePath)
assert.NoError(t, err)
assert.Len(t, db.dataOffset, recordCount/2)
for i := 1; i <= recordCount; i++ {
var gotValue int
err = db.Get(i, &gotValue)
if i%2 == 0 {
assert.ErrorIs(t, err, ErrNotExists)
} else {
assert.NoError(t, err)
assert.Equal(t, i, gotValue)
}
}
err = db.Close()
assert.NoError(t, err)
}