mirror of
https://github.com/nxshock/zkv.git
synced 2025-07-01 00:13:37 +05:00
Add RebuildIndex() func
This commit is contained in:
parent
832d12c43b
commit
583f60da9f
6 changed files with 209 additions and 46 deletions
BIN
TestDeleteBasic.zkv.idx
Normal file
BIN
TestDeleteBasic.zkv.idx
Normal file
Binary file not shown.
BIN
TestSmallWrites.zkv.idx
Normal file
BIN
TestSmallWrites.zkv.idx
Normal file
Binary file not shown.
BIN
testdata/TestReadBlock.zkv
vendored
Normal file
BIN
testdata/TestReadBlock.zkv
vendored
Normal file
Binary file not shown.
12
utils.go
12
utils.go
|
@ -4,7 +4,9 @@ import (
|
|||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
func encode(value interface{}) ([]byte, error) {
|
||||
|
@ -40,3 +42,13 @@ func skip(r io.Reader, count int64) (err error) {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func isFileExists(filePath string) (bool, error) {
|
||||
if _, err := os.Stat(filePath); err == nil {
|
||||
return true, nil
|
||||
} else if errors.Is(err, os.ErrNotExist) {
|
||||
return false, nil
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
|
188
zkv.go
188
zkv.go
|
@ -37,7 +37,7 @@ type Store struct {
|
|||
func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
||||
options.setDefaults()
|
||||
|
||||
database := &Store{
|
||||
store := &Store{
|
||||
dataOffset: make(map[string]Offsets),
|
||||
bufferDataOffset: make(map[string]int64),
|
||||
buffer: new(bytes.Buffer),
|
||||
|
@ -48,50 +48,30 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
|||
if options.useIndexFile {
|
||||
idxFile, err := os.Open(filePath + indexFileExt)
|
||||
if err == nil {
|
||||
err = gob.NewDecoder(idxFile).Decode(&database.dataOffset)
|
||||
if err == nil {
|
||||
return database, nil
|
||||
err = gob.NewDecoder(idxFile).Decode(&store.dataOffset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
}
|
||||
|
||||
// restore file data
|
||||
readF, err := os.Open(filePath)
|
||||
if os.IsNotExist(err) {
|
||||
// Empty datebase
|
||||
return database, nil
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("open file for indexing: %v", err)
|
||||
}
|
||||
defer readF.Close()
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
exists, err := isFileExists(filePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decompressor initialization: %v", err)
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
offset := int64(0)
|
||||
for {
|
||||
n, record, err := readRecord(decompressor)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read record error: %v", err)
|
||||
}
|
||||
|
||||
switch record.Type {
|
||||
case RecordTypeSet:
|
||||
database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset
|
||||
case RecordTypeDelete:
|
||||
delete(database.dataOffset, string(record.KeyHash[:]))
|
||||
}
|
||||
|
||||
offset += n
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return database, nil
|
||||
if !exists {
|
||||
return store, nil
|
||||
}
|
||||
|
||||
err = store.rebuildIndex()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func Open(filePath string) (*Store, error) {
|
||||
|
@ -395,14 +375,7 @@ func (s *Store) flush() error {
|
|||
|
||||
// Update index file only on data update
|
||||
if s.options.useIndexFile && l > 0 {
|
||||
idxBuf := new(bytes.Buffer)
|
||||
|
||||
err = gob.NewEncoder(idxBuf).Encode(s.dataOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644)
|
||||
err = s.saveIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -410,3 +383,126 @@ func (s *Store) flush() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readBlock(r *bufio.Reader) (line []byte, n int, err error) {
|
||||
delim := []byte{0x28, 0xb5, 0x2f, 0xfd}
|
||||
|
||||
line = make([]byte, len(delim))
|
||||
copy(line, delim)
|
||||
|
||||
for {
|
||||
s, err := r.ReadBytes(delim[len(delim)-1])
|
||||
line = append(line, []byte(s)...)
|
||||
if err != nil {
|
||||
if bytes.Equal(line, delim) { // contains only magic number
|
||||
return []byte{}, 0, err
|
||||
} else {
|
||||
return line, len(s), err
|
||||
}
|
||||
}
|
||||
|
||||
if bytes.Equal(line, append(delim, delim...)) { // first block
|
||||
line = make([]byte, len(delim))
|
||||
copy(line, delim)
|
||||
continue
|
||||
}
|
||||
|
||||
if bytes.HasSuffix(line, delim) {
|
||||
return line[:len(line)-len(delim)], len(s), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RebuildIndex renews index from store file
|
||||
func (s *Store) RebuildIndex() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
err := s.rebuildIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.options.useIndexFile {
|
||||
return s.saveIndex()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) rebuildIndex() error {
|
||||
f, err := os.Open(s.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r := bufio.NewReader(f)
|
||||
|
||||
var blockOffset int64
|
||||
|
||||
s.dataOffset = make(map[string]Offsets)
|
||||
|
||||
for {
|
||||
l, n, err := readBlock(r)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
} else if err == io.EOF && len(l) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
dec, err := zstd.NewReader(bytes.NewReader(l))
|
||||
|
||||
var recordOffset int64
|
||||
for {
|
||||
n, record, err := readRecord(dec)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch record.Type {
|
||||
case RecordTypeSet:
|
||||
s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset}
|
||||
case RecordTypeDelete:
|
||||
delete(s.dataOffset, string(record.KeyHash[:]))
|
||||
}
|
||||
recordOffset += n
|
||||
}
|
||||
|
||||
blockOffset += int64(n)
|
||||
}
|
||||
|
||||
idxBuf := new(bytes.Buffer)
|
||||
|
||||
err = gob.NewEncoder(idxBuf).Encode(s.dataOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) saveIndex() error {
|
||||
f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = gob.NewEncoder(f).Encode(s.dataOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
|
55
zkv_test.go
55
zkv_test.go
|
@ -1,6 +1,8 @@
|
|||
package zkv
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
|
@ -355,3 +357,56 @@ func TestIndexFileBasic(t *testing.T) {
|
|||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestReadBlock(t *testing.T) {
|
||||
file, err := os.Open("testdata/TestReadBlock.zkv")
|
||||
assert.NoError(t, err)
|
||||
defer file.Close()
|
||||
|
||||
r := bufio.NewReader(file)
|
||||
|
||||
line, _, err := readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line)
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line)
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line)
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line)
|
||||
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, line, []byte{})
|
||||
assert.Equal(t, io.EOF, err)
|
||||
}
|
||||
|
||||
func TestRebuildIndex(t *testing.T) {
|
||||
const filePath = "TestRebuiltIndex.zkv"
|
||||
const recordCount = 4
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.RebuildIndex()
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue