diff --git a/README.md b/README.md index 0a85958..12eed2b 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,9 @@ Other methods: ```go // Flush data to disk err = db.Flush() + +// Backup data to another file +err = db.Backup("new/file/path") ``` ## File structure @@ -63,6 +66,6 @@ File is log stuctured list of commands: ## TODO -- [ ] Implement `Copy()` method to copy store without deleted records +- [ ] Add delete records test for `Backup()` method - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Implement optional separate index file to speedup store initialization diff --git a/record.go b/record.go index 85a8c2b..0c3c941 100644 --- a/record.go +++ b/record.go @@ -2,6 +2,7 @@ package zkv import ( "bytes" + "crypto/sha256" "encoding/binary" "encoding/gob" "io" @@ -20,8 +21,17 @@ type Record struct { ValueBytes []byte } +func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) { + record := &Record{ + Type: recordType, + KeyHash: keyHash, + ValueBytes: valueBytes} + + return record, nil +} + func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { - keyBytes, err := encode(key) + keyHash, err := hashInterface(key) if err != nil { return nil, err } @@ -31,12 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { return nil, err } - record := &Record{ - Type: recordType, - KeyHash: hashBytes(keyBytes), - ValueBytes: valueBytes} - - return record, nil + return newRecordBytes(recordType, keyHash, valueBytes) } func (r *Record) Marshal() ([]byte, error) { diff --git a/zkv.go b/zkv.go index e207e3b..d1bedae 100644 --- a/zkv.go +++ b/zkv.go @@ -2,6 +2,7 @@ package zkv import ( "bytes" + "crypto/sha256" "encoding/base64" "fmt" "io" @@ -157,6 +158,43 @@ func (s *Store) Flush() error { return s.flush() } +func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + newStore, err := OpenWithOptions(filePath, newFileOptions) + if err != nil { + return err + } + + for keyHashStr := range s.dataOffset { + var keyHash [sha256.Size224]byte + copy(keyHash[:], keyHashStr) + + valueBytes, err := s.getGobBytes(keyHash) + if err != nil { + newStore.Close() + return err + } + err = newStore.setBytes(keyHash, valueBytes) + if err != nil { + newStore.Close() + return err + } + } + + return newStore.Close() +} + +func (s *Store) Backup(filePath string) error { + return s.BackupWithOptions(filePath, defaultOptions) +} + func (s *Store) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -174,6 +212,35 @@ func (s *Store) Close() error { return s.file.Close() } +func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { + record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + func (s *Store) set(key, value interface{}) error { record, err := newRecord(RecordTypeSet, key, value) if err != nil { @@ -203,6 +270,64 @@ func (s *Store) set(key, value interface{}) error { return nil } +func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + offset, exists := s.bufferDataOffset[string(keyHash[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err := skip(reader, offset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(reader) + if err != nil { + return nil, err + } + + return record.ValueBytes, nil + } + + offset, exists = s.dataOffset[string(keyHash[:])] + if !exists { + return nil, ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return nil, err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return nil, err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return nil, err + } + + if !bytes.Equal(record.KeyHash[:], keyHash[:]) { + expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:]) + gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) + return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) + } + + return record.ValueBytes, nil + +} + func (s *Store) get(key, value interface{}) error { s.readOrderChan <- struct{}{} defer func() { <-s.readOrderChan }() @@ -257,7 +382,9 @@ func (s *Store) get(key, value interface{}) error { } if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку + expectedHashStr := base64.StdEncoding.EncodeToString(hashToFind[:]) + gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) + return fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) } return decode(record.ValueBytes, value) diff --git a/zkv_test.go b/zkv_test.go index a497a3c..1fc3983 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -1,12 +1,40 @@ package zkv import ( + "bytes" "os" "testing" "github.com/stretchr/testify/assert" ) +func TestRecord(t *testing.T) { + buf := new(bytes.Buffer) + + var records []Record + + for i := 0; i < 10; i++ { + record, err := newRecord(RecordTypeSet, i, i) + assert.NoError(t, err) + + records = append(records, *record) + + b, err := record.Marshal() + assert.NoError(t, err) + + _, err = buf.Write(b) + assert.NoError(t, err) + } + + for i := 0; i < 10; i++ { + _, record, err := readRecord(buf) + assert.NoError(t, err) + + assert.Equal(t, record.KeyHash, records[i].KeyHash) + assert.Equal(t, record.ValueBytes, records[i].ValueBytes) + } +} + func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 @@ -207,3 +235,42 @@ func TestBufferRead(t *testing.T) { assert.NoError(t, err) } + +func TestBackupBasic(t *testing.T) { + const filePath = "TestBackupBasic.zkv" + const newFilePath = "TestBackupBasic2.zkv" + const recordCount = 100 + defer os.Remove(filePath) + defer os.Remove(newFilePath) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + err = db.Backup(newFilePath) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + + db, err = Open(newFilePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +}