1
0
mirror of https://github.com/nxshock/zkv.git synced 2024-11-27 11:21:02 +05:00

Add Backup() method

This commit is contained in:
nxshock 2022-12-05 21:26:54 +05:00
parent 20a99d9f35
commit 9f116ad35e
4 changed files with 211 additions and 9 deletions

View File

@ -42,6 +42,9 @@ Other methods:
```go ```go
// Flush data to disk // Flush data to disk
err = db.Flush() err = db.Flush()
// Backup data to another file
err = db.Backup("new/file/path")
``` ```
## File structure ## File structure
@ -63,6 +66,6 @@ File is log stuctured list of commands:
## TODO ## TODO
- [ ] Implement `Copy()` method to copy store without deleted records - [ ] Add delete records test for `Backup()` method
- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go)
- [ ] Implement optional separate index file to speedup store initialization - [ ] Implement optional separate index file to speedup store initialization

View File

@ -2,6 +2,7 @@ package zkv
import ( import (
"bytes" "bytes"
"crypto/sha256"
"encoding/binary" "encoding/binary"
"encoding/gob" "encoding/gob"
"io" "io"
@ -20,8 +21,17 @@ type Record struct {
ValueBytes []byte ValueBytes []byte
} }
func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) {
record := &Record{
Type: recordType,
KeyHash: keyHash,
ValueBytes: valueBytes}
return record, nil
}
func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
keyBytes, err := encode(key) keyHash, err := hashInterface(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -31,12 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
return nil, err return nil, err
} }
record := &Record{ return newRecordBytes(recordType, keyHash, valueBytes)
Type: recordType,
KeyHash: hashBytes(keyBytes),
ValueBytes: valueBytes}
return record, nil
} }
func (r *Record) Marshal() ([]byte, error) { func (r *Record) Marshal() ([]byte, error) {

129
zkv.go
View File

@ -2,6 +2,7 @@ package zkv
import ( import (
"bytes" "bytes"
"crypto/sha256"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io" "io"
@ -157,6 +158,43 @@ func (s *Store) Flush() error {
return s.flush() return s.flush()
} }
func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.flush()
if err != nil {
return err
}
newStore, err := OpenWithOptions(filePath, newFileOptions)
if err != nil {
return err
}
for keyHashStr := range s.dataOffset {
var keyHash [sha256.Size224]byte
copy(keyHash[:], keyHashStr)
valueBytes, err := s.getGobBytes(keyHash)
if err != nil {
newStore.Close()
return err
}
err = newStore.setBytes(keyHash, valueBytes)
if err != nil {
newStore.Close()
return err
}
}
return newStore.Close()
}
func (s *Store) Backup(filePath string) error {
return s.BackupWithOptions(filePath, defaultOptions)
}
func (s *Store) Close() error { func (s *Store) Close() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -174,6 +212,35 @@ func (s *Store) Close() error {
return s.file.Close() return s.file.Close()
} }
func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error {
record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes)
if err != nil {
return err
}
b, err := record.Marshal()
if err != nil {
return err
}
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
_, err = s.buffer.Write(b)
if err != nil {
return err
}
if s.buffer.Len() > s.options.BufferSize {
err = s.flush()
if err != nil {
return err
}
}
return nil
}
func (s *Store) set(key, value interface{}) error { func (s *Store) set(key, value interface{}) error {
record, err := newRecord(RecordTypeSet, key, value) record, err := newRecord(RecordTypeSet, key, value)
if err != nil { if err != nil {
@ -203,6 +270,64 @@ func (s *Store) set(key, value interface{}) error {
return nil return nil
} }
func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) {
s.readOrderChan <- struct{}{}
defer func() { <-s.readOrderChan }()
offset, exists := s.bufferDataOffset[string(keyHash[:])]
if exists {
reader := bytes.NewReader(s.buffer.Bytes())
err := skip(reader, offset)
if err != nil {
return nil, err
}
_, record, err := readRecord(reader)
if err != nil {
return nil, err
}
return record.ValueBytes, nil
}
offset, exists = s.dataOffset[string(keyHash[:])]
if !exists {
return nil, ErrNotExists
}
readF, err := os.Open(s.filePath)
if err != nil {
return nil, err
}
defer readF.Close()
decompressor, err := zstd.NewReader(readF)
if err != nil {
return nil, err
}
defer decompressor.Close()
err = skip(decompressor, offset)
if err != nil {
return nil, err
}
_, record, err := readRecord(decompressor)
if err != nil {
return nil, err
}
if !bytes.Equal(record.KeyHash[:], keyHash[:]) {
expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:])
gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:])
return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr)
}
return record.ValueBytes, nil
}
func (s *Store) get(key, value interface{}) error { func (s *Store) get(key, value interface{}) error {
s.readOrderChan <- struct{}{} s.readOrderChan <- struct{}{}
defer func() { <-s.readOrderChan }() defer func() { <-s.readOrderChan }()
@ -257,7 +382,9 @@ func (s *Store) get(key, value interface{}) error {
} }
if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { if !bytes.Equal(record.KeyHash[:], hashToFind[:]) {
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку expectedHashStr := base64.StdEncoding.EncodeToString(hashToFind[:])
gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:])
return fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr)
} }
return decode(record.ValueBytes, value) return decode(record.ValueBytes, value)

View File

@ -1,12 +1,40 @@
package zkv package zkv
import ( import (
"bytes"
"os" "os"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestRecord(t *testing.T) {
buf := new(bytes.Buffer)
var records []Record
for i := 0; i < 10; i++ {
record, err := newRecord(RecordTypeSet, i, i)
assert.NoError(t, err)
records = append(records, *record)
b, err := record.Marshal()
assert.NoError(t, err)
_, err = buf.Write(b)
assert.NoError(t, err)
}
for i := 0; i < 10; i++ {
_, record, err := readRecord(buf)
assert.NoError(t, err)
assert.Equal(t, record.KeyHash, records[i].KeyHash)
assert.Equal(t, record.ValueBytes, records[i].ValueBytes)
}
}
func TestReadWriteBasic(t *testing.T) { func TestReadWriteBasic(t *testing.T) {
const filePath = "TestReadWriteBasic.zkv" const filePath = "TestReadWriteBasic.zkv"
const recordCount = 100 const recordCount = 100
@ -207,3 +235,42 @@ func TestBufferRead(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
} }
func TestBackupBasic(t *testing.T) {
const filePath = "TestBackupBasic.zkv"
const newFilePath = "TestBackupBasic2.zkv"
const recordCount = 100
defer os.Remove(filePath)
defer os.Remove(newFilePath)
db, err := Open(filePath)
assert.NoError(t, err)
for i := 1; i <= recordCount; i++ {
err = db.Set(i, i)
assert.NoError(t, err)
}
err = db.Backup(newFilePath)
assert.NoError(t, err)
err = db.Close()
assert.NoError(t, err)
db, err = Open(newFilePath)
assert.NoError(t, err)
assert.Len(t, db.dataOffset, recordCount)
for i := 1; i <= recordCount; i++ {
var gotValue int
err = db.Get(i, &gotValue)
assert.NoError(t, err)
assert.Equal(t, i, gotValue)
}
err = db.Close()
assert.NoError(t, err)
}