mirror of
https://github.com/nxshock/zkv.git
synced 2025-04-20 09:21:50 +05:00
Compare commits
2 Commits
9f116ad35e
...
e166e07daa
Author | SHA1 | Date | |
---|---|---|---|
e166e07daa | |||
80540a662a |
@ -66,6 +66,6 @@ File is log stuctured list of commands:
|
|||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
- [ ] Add delete records test for `Backup()` method
|
|
||||||
- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go)
|
- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go)
|
||||||
- [ ] Implement optional separate index file to speedup store initialization
|
- [ ] Implement optional separate index file to speedup store initialization
|
||||||
|
- [ ] Add recovery previous state of store file on write error
|
||||||
|
52
zkv.go
52
zkv.go
@ -15,10 +15,8 @@ import (
|
|||||||
type Store struct {
|
type Store struct {
|
||||||
dataOffset map[string]int64
|
dataOffset map[string]int64
|
||||||
|
|
||||||
file *os.File
|
|
||||||
filePath string
|
filePath string
|
||||||
offset int64
|
offset int64
|
||||||
encoder *zstd.Encoder
|
|
||||||
|
|
||||||
buffer *bytes.Buffer
|
buffer *bytes.Buffer
|
||||||
bufferDataOffset map[string]int64
|
bufferDataOffset map[string]int64
|
||||||
@ -33,24 +31,10 @@ type Store struct {
|
|||||||
func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
||||||
options.setDefaults()
|
options.setDefaults()
|
||||||
|
|
||||||
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
|
||||||
if err != nil {
|
|
||||||
f.Close()
|
|
||||||
return nil, fmt.Errorf("open store file: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
compressor, err := zstd.NewWriter(f)
|
|
||||||
if err != nil {
|
|
||||||
f.Close()
|
|
||||||
return nil, fmt.Errorf("compressor initialization: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
database := &Store{
|
database := &Store{
|
||||||
dataOffset: make(map[string]int64),
|
dataOffset: make(map[string]int64),
|
||||||
bufferDataOffset: make(map[string]int64),
|
bufferDataOffset: make(map[string]int64),
|
||||||
offset: 0,
|
offset: 0,
|
||||||
file: f,
|
|
||||||
encoder: compressor,
|
|
||||||
buffer: new(bytes.Buffer),
|
buffer: new(bytes.Buffer),
|
||||||
filePath: filePath,
|
filePath: filePath,
|
||||||
options: options,
|
options: options,
|
||||||
@ -58,15 +42,16 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
|||||||
|
|
||||||
// restore file data
|
// restore file data
|
||||||
readF, err := os.Open(filePath)
|
readF, err := os.Open(filePath)
|
||||||
if err != nil {
|
if os.IsNotExist(err) {
|
||||||
f.Close()
|
// Empty datebase
|
||||||
|
return database, nil
|
||||||
|
} else if err != nil {
|
||||||
return nil, fmt.Errorf("open file for indexing: %v", err)
|
return nil, fmt.Errorf("open file for indexing: %v", err)
|
||||||
}
|
}
|
||||||
defer readF.Close()
|
defer readF.Close()
|
||||||
|
|
||||||
decompressor, err := zstd.NewReader(readF)
|
decompressor, err := zstd.NewReader(readF)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.Close()
|
|
||||||
return nil, fmt.Errorf("decompressor initialization: %v", err)
|
return nil, fmt.Errorf("decompressor initialization: %v", err)
|
||||||
}
|
}
|
||||||
defer decompressor.Close()
|
defer decompressor.Close()
|
||||||
@ -78,7 +63,6 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.Close()
|
|
||||||
return nil, fmt.Errorf("read record error: %v", err)
|
return nil, fmt.Errorf("read record error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,12 +188,7 @@ func (s *Store) Close() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.encoder.Close()
|
return nil
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.file.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error {
|
func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error {
|
||||||
@ -393,7 +372,18 @@ func (s *Store) get(key, value interface{}) error {
|
|||||||
func (s *Store) flush() error {
|
func (s *Store) flush() error {
|
||||||
l := int64(s.buffer.Len())
|
l := int64(s.buffer.Len())
|
||||||
|
|
||||||
_, err := s.buffer.WriteTo(s.encoder)
|
f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open store file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
encoder, err := zstd.NewWriter(f, zstd.WithEncoderLevel(s.options.CompressionLevel))
|
||||||
|
if err != nil {
|
||||||
|
f.Close()
|
||||||
|
return fmt.Errorf("open store file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.buffer.WriteTo(encoder)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -406,7 +396,13 @@ func (s *Store) flush() error {
|
|||||||
|
|
||||||
s.offset += l
|
s.offset += l
|
||||||
|
|
||||||
err = s.encoder.Flush()
|
err = encoder.Close()
|
||||||
|
if err != nil {
|
||||||
|
// TODO: truncate file to previous state
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = f.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
55
zkv_test.go
55
zkv_test.go
@ -274,3 +274,58 @@ func TestBackupBasic(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBackupWithDeletedRecords(t *testing.T) {
|
||||||
|
const filePath = "TestBackupWithDeletedRecords.zkv"
|
||||||
|
const newFilePath = "TestBackupWithDeletedRecords2.zkv"
|
||||||
|
const recordCount = 100
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(newFilePath)
|
||||||
|
|
||||||
|
db, err := Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
err = db.Set(i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Flush()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
if i%2 == 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Delete(i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Backup(newFilePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
db, err = Open(newFilePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Len(t, db.dataOffset, recordCount/2)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
if i%2 == 0 {
|
||||||
|
assert.ErrorIs(t, err, ErrNotExists)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user