mirror of
https://github.com/nxshock/zkv.git
synced 2025-04-19 09:11:51 +05:00
Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
583f60da9f | |||
832d12c43b | |||
822504e8a0 | |||
2791b39d48 | |||
d950b6546c | |||
5f0d33828f | |||
412ddb11a8 | |||
0458ac5152 | |||
28f43e56d5 | |||
82a36a1b9e | |||
533eddaed4 | |||
fe90a55322 | |||
f093b7feed | |||
23ee15dc23 | |||
8dfc73af1d | |||
e166e07daa | |||
80540a662a | |||
9f116ad35e | |||
20a99d9f35 | |||
aa1a2edda6 | |||
56c9dcce00 | |||
8ef0fd240b | |||
8ab9e96ef6 | |||
06a429ae4c | |||
350634de38 | |||
b5043e1319 | |||
db25842429 | |||
d40b88eebb | |||
4ec53665af |
108
README.md
Normal file
108
README.md
Normal file
@ -0,0 +1,108 @@
|
||||
# zkv
|
||||
|
||||
Simple key-value store for single-user applications.
|
||||
|
||||
## Pros
|
||||
|
||||
* Simple two file structure (data file and index file)
|
||||
* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd)
|
||||
* Threadsafe operations through `sync.RWMutex`
|
||||
|
||||
## Cons
|
||||
|
||||
* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`)
|
||||
* No transaction system
|
||||
* Index file is fully rewrited on every store commit
|
||||
* No way to recover disk space from deleted records
|
||||
* Write/Delete operations block Read and each other operations
|
||||
|
||||
## Usage
|
||||
|
||||
Create or open existing file:
|
||||
|
||||
```go
|
||||
db, err := zkv.Open("path to file")
|
||||
```
|
||||
|
||||
Data operations:
|
||||
|
||||
```go
|
||||
// Write data
|
||||
err = db.Set(key, value) // key and value can be any of type
|
||||
|
||||
// Read data
|
||||
var value ValueType
|
||||
err = db.Get(key, &value)
|
||||
|
||||
// Delete data
|
||||
err = db.Delete(key)
|
||||
```
|
||||
|
||||
Other methods:
|
||||
|
||||
```go
|
||||
// Flush data to disk
|
||||
err = db.Flush()
|
||||
|
||||
// Backup data to another file
|
||||
err = db.Backup("new/file/path")
|
||||
```
|
||||
|
||||
## Store options
|
||||
|
||||
```go
|
||||
type Options struct {
|
||||
// Maximum number of concurrent reads
|
||||
MaxParallelReads int
|
||||
|
||||
// Compression level
|
||||
CompressionLevel zstd.EncoderLevel
|
||||
|
||||
// Memory write buffer size in bytes
|
||||
MemoryBufferSize int
|
||||
|
||||
// Disk write buffer size in bytes
|
||||
DiskBufferSize int
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
## File structure
|
||||
|
||||
Record is `encoding/gob` structure:
|
||||
|
||||
| Field | Description | Size |
|
||||
| ---------- | ---------------------------------- | -------- |
|
||||
| Type | Record type | uint8 |
|
||||
| KeyHash | Key hash | 28 bytes |
|
||||
| ValueBytes | Value gob-encoded bytes | variable |
|
||||
|
||||
File is log stuctured list of commands:
|
||||
|
||||
| Field | Description | Size |
|
||||
| -------| ------------------------ | -------- |
|
||||
| Length | Record body bytes length | int64 |
|
||||
| Body | Gob-encoded record | variable |
|
||||
|
||||
Index file is simple gob-encoded map:
|
||||
|
||||
```go
|
||||
map[string]struct {
|
||||
BlockOffset int64
|
||||
RecordOffset int64
|
||||
}
|
||||
```
|
||||
|
||||
where map key is data key hash and value - data offset in data file.
|
||||
|
||||
## Resource consumption
|
||||
|
||||
Store requirements:
|
||||
|
||||
* around 300 Mb of RAM per 1 million of keys
|
||||
* around 34 Mb of disk space for index file per 1 million of keys
|
||||
|
||||
## TODO
|
||||
|
||||
- [ ] Add recovery previous state of store file on write error
|
||||
- [ ] Add method for index rebuild
|
BIN
TestDeleteBasic.zkv.idx
Normal file
BIN
TestDeleteBasic.zkv.idx
Normal file
Binary file not shown.
BIN
TestSmallWrites.zkv.idx
Normal file
BIN
TestSmallWrites.zkv.idx
Normal file
Binary file not shown.
17
defaults.go
Normal file
17
defaults.go
Normal file
@ -0,0 +1,17 @@
|
||||
package zkv
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
var defaultOptions = Options{
|
||||
MaxParallelReads: runtime.NumCPU(),
|
||||
CompressionLevel: zstd.SpeedDefault,
|
||||
MemoryBufferSize: 4 * 1024 * 1024,
|
||||
DiskBufferSize: 1 * 1024 * 1024,
|
||||
useIndexFile: true,
|
||||
}
|
||||
|
||||
const indexFileExt = ".idx"
|
5
errors.go
Normal file
5
errors.go
Normal file
@ -0,0 +1,5 @@
|
||||
package zkv
|
||||
|
||||
import "errors"
|
||||
|
||||
var ErrNotExists = errors.New("not exists")
|
10
go.mod
10
go.mod
@ -1,14 +1,14 @@
|
||||
module github.com/nxshock/zkv
|
||||
|
||||
go 1.17
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/klauspost/compress v1.14.2
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/klauspost/compress v1.16.4
|
||||
github.com/stretchr/testify v1.8.2
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
18
go.sum
18
go.sum
@ -1,13 +1,19 @@
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
|
||||
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU=
|
||||
github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
40
options.go
Normal file
40
options.go
Normal file
@ -0,0 +1,40 @@
|
||||
package zkv
|
||||
|
||||
import "github.com/klauspost/compress/zstd"
|
||||
|
||||
type Options struct {
|
||||
// Maximum number of concurrent reads
|
||||
MaxParallelReads int
|
||||
|
||||
// Compression level
|
||||
CompressionLevel zstd.EncoderLevel
|
||||
|
||||
// Memory write buffer size in bytes
|
||||
MemoryBufferSize int
|
||||
|
||||
// Disk write buffer size in bytes
|
||||
DiskBufferSize int
|
||||
|
||||
// Use index file
|
||||
useIndexFile bool
|
||||
}
|
||||
|
||||
func (o *Options) setDefaults() {
|
||||
o.useIndexFile = true // TODO: implement database search without index
|
||||
|
||||
if o.MaxParallelReads == 0 {
|
||||
o.MaxParallelReads = defaultOptions.MaxParallelReads
|
||||
}
|
||||
|
||||
if o.CompressionLevel == 0 {
|
||||
o.CompressionLevel = defaultOptions.CompressionLevel
|
||||
}
|
||||
|
||||
if o.MemoryBufferSize == 0 {
|
||||
o.MemoryBufferSize = defaultOptions.MemoryBufferSize
|
||||
}
|
||||
|
||||
if o.DiskBufferSize == 0 {
|
||||
o.DiskBufferSize = defaultOptions.DiskBufferSize
|
||||
}
|
||||
}
|
27
record.go
27
record.go
@ -2,6 +2,7 @@ package zkv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"encoding/gob"
|
||||
"io"
|
||||
@ -15,14 +16,22 @@ const (
|
||||
)
|
||||
|
||||
type Record struct {
|
||||
Type RecordType `json:"t"`
|
||||
KeyHash []byte `json:"h"`
|
||||
KeyBytes []byte `json:"k,omitempty"` // optional
|
||||
ValueBytes []byte `json:"v"`
|
||||
Type RecordType
|
||||
KeyHash [28]byte
|
||||
ValueBytes []byte
|
||||
}
|
||||
|
||||
func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) {
|
||||
record := &Record{
|
||||
Type: recordType,
|
||||
KeyHash: keyHash,
|
||||
ValueBytes: valueBytes}
|
||||
|
||||
return record, nil
|
||||
}
|
||||
|
||||
func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
|
||||
keyBytes, err := encode(key)
|
||||
keyHash, err := hashInterface(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -32,13 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
record := &Record{
|
||||
Type: recordType,
|
||||
KeyHash: hashBytes(keyBytes),
|
||||
KeyBytes: keyBytes,
|
||||
ValueBytes: valueBytes}
|
||||
|
||||
return record, nil
|
||||
return newRecordBytes(recordType, keyHash, valueBytes)
|
||||
}
|
||||
|
||||
func (r *Record) Marshal() ([]byte, error) {
|
||||
|
35
record_test.go
Normal file
35
record_test.go
Normal file
@ -0,0 +1,35 @@
|
||||
package zkv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRecord(t *testing.T) {
|
||||
buf := new(bytes.Buffer)
|
||||
|
||||
var records []Record
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
record, err := newRecord(RecordTypeSet, i, i)
|
||||
assert.NoError(t, err)
|
||||
|
||||
records = append(records, *record)
|
||||
|
||||
b, err := record.Marshal()
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = buf.Write(b)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
_, record, err := readRecord(buf)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, record.KeyHash, records[i].KeyHash)
|
||||
assert.Equal(t, record.ValueBytes, records[i].ValueBytes)
|
||||
}
|
||||
}
|
BIN
testdata/TestReadBlock.zkv
vendored
Normal file
BIN
testdata/TestReadBlock.zkv
vendored
Normal file
Binary file not shown.
26
utils.go
26
utils.go
@ -4,8 +4,9 @@ import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
func encode(value interface{}) ([]byte, error) {
|
||||
@ -18,20 +19,17 @@ func decode(b []byte, value interface{}) error {
|
||||
return gob.NewDecoder(bytes.NewReader(b)).Decode(value)
|
||||
}
|
||||
|
||||
func hashInterface(value interface{}) ([]byte, error) {
|
||||
func hashInterface(value interface{}) ([sha256.Size224]byte, error) {
|
||||
valueBytes, err := encode(value)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return [sha256.Size224]byte{}, err
|
||||
}
|
||||
|
||||
return hashBytes(valueBytes), nil
|
||||
}
|
||||
|
||||
func hashBytes(b []byte) []byte {
|
||||
bytes := sha256.Sum224(b)
|
||||
|
||||
return bytes[:]
|
||||
|
||||
func hashBytes(b []byte) [sha256.Size224]byte {
|
||||
return sha256.Sum224(b)
|
||||
}
|
||||
|
||||
func skip(r io.Reader, count int64) (err error) {
|
||||
@ -39,8 +37,18 @@ func skip(r io.Reader, count int64) (err error) {
|
||||
case io.Seeker:
|
||||
_, err = r.Seek(count, io.SeekCurrent)
|
||||
default:
|
||||
_, err = io.CopyN(ioutil.Discard, r, count)
|
||||
_, err = io.CopyN(io.Discard, r, count)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func isFileExists(filePath string) (bool, error) {
|
||||
if _, err := os.Stat(filePath); err == nil {
|
||||
return true, nil
|
||||
} else if errors.Is(err, os.ErrNotExist) {
|
||||
return false, nil
|
||||
} else {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
524
zkv.go
524
zkv.go
@ -1,9 +1,11 @@
|
||||
package zkv
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
@ -12,45 +14,97 @@ import (
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
SaveKeys bool
|
||||
type Offsets struct {
|
||||
BlockOffset int64
|
||||
RecordOffset int64
|
||||
}
|
||||
|
||||
type Database struct {
|
||||
dataOffset map[string]int64
|
||||
file *os.File
|
||||
compressor *zstd.Encoder
|
||||
type Store struct {
|
||||
dataOffset map[string]Offsets
|
||||
|
||||
filePath string
|
||||
offset int64
|
||||
|
||||
buffer *bytes.Buffer
|
||||
bufferDataOffset map[string]int64
|
||||
|
||||
options Options
|
||||
|
||||
mu sync.Mutex
|
||||
readOrderChan chan struct{}
|
||||
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (db *Database) Close() error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
||||
options.setDefaults()
|
||||
|
||||
err := db.compressor.Close()
|
||||
store := &Store{
|
||||
dataOffset: make(map[string]Offsets),
|
||||
bufferDataOffset: make(map[string]int64),
|
||||
buffer: new(bytes.Buffer),
|
||||
filePath: filePath,
|
||||
options: options,
|
||||
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
|
||||
|
||||
if options.useIndexFile {
|
||||
idxFile, err := os.Open(filePath + indexFileExt)
|
||||
if err == nil {
|
||||
err = gob.NewDecoder(idxFile).Decode(&store.dataOffset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
}
|
||||
|
||||
exists, err := isFileExists(filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return store, nil
|
||||
}
|
||||
|
||||
err = store.rebuildIndex()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func Open(filePath string) (*Store, error) {
|
||||
options := defaultOptions
|
||||
return OpenWithOptions(filePath, options)
|
||||
}
|
||||
|
||||
func (s *Store) Set(key, value interface{}) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.set(key, value)
|
||||
}
|
||||
|
||||
func (s *Store) Get(key, value interface{}) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.get(key, value)
|
||||
}
|
||||
|
||||
func (s *Store) Delete(key interface{}) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
keyHash, err := hashInterface(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.file.Close()
|
||||
}
|
||||
|
||||
func (db *Database) Set(key, value interface{}) error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
record, err := newRecord(RecordTypeSet, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !db.options.SaveKeys {
|
||||
record.KeyBytes = nil
|
||||
record := &Record{
|
||||
Type: RecordTypeDelete,
|
||||
KeyHash: keyHash,
|
||||
}
|
||||
|
||||
b, err := record.Marshal()
|
||||
@ -58,113 +112,397 @@ func (db *Database) Set(key, value interface{}) error {
|
||||
return err
|
||||
}
|
||||
|
||||
db.dataOffset[string(record.KeyHash)] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки
|
||||
delete(s.dataOffset, string(record.KeyHash[:]))
|
||||
delete(s.bufferDataOffset, string(record.KeyHash[:]))
|
||||
|
||||
_, err = db.compressor.Write(b)
|
||||
_, err = s.buffer.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки
|
||||
if s.buffer.Len() > s.options.MemoryBufferSize {
|
||||
err = s.flush()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) Get(key, value interface{}) error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
func (s *Store) Flush() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.flush()
|
||||
}
|
||||
|
||||
func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
err := s.flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newStore, err := OpenWithOptions(filePath, newFileOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for keyHashStr := range s.dataOffset {
|
||||
var keyHash [sha256.Size224]byte
|
||||
copy(keyHash[:], keyHashStr)
|
||||
|
||||
valueBytes, err := s.getGobBytes(keyHash)
|
||||
if err != nil {
|
||||
newStore.Close()
|
||||
return err
|
||||
}
|
||||
err = newStore.setBytes(keyHash, valueBytes)
|
||||
if err != nil {
|
||||
newStore.Close()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return newStore.Close()
|
||||
}
|
||||
|
||||
func (s *Store) Backup(filePath string) error {
|
||||
return s.BackupWithOptions(filePath, defaultOptions)
|
||||
}
|
||||
|
||||
func (s *Store) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
err := s.flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error {
|
||||
record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := record.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
|
||||
|
||||
_, err = s.buffer.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.buffer.Len() > s.options.MemoryBufferSize {
|
||||
err = s.flush()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) set(key, value interface{}) error {
|
||||
record, err := newRecord(RecordTypeSet, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := record.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
|
||||
|
||||
_, err = s.buffer.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.buffer.Len() > s.options.MemoryBufferSize {
|
||||
err = s.flush()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) {
|
||||
s.readOrderChan <- struct{}{}
|
||||
defer func() { <-s.readOrderChan }()
|
||||
|
||||
offset, exists := s.bufferDataOffset[string(keyHash[:])]
|
||||
if exists {
|
||||
reader := bytes.NewReader(s.buffer.Bytes())
|
||||
|
||||
err := skip(reader, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, record, err := readRecord(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return record.ValueBytes, nil
|
||||
}
|
||||
|
||||
offsets, exists := s.dataOffset[string(keyHash[:])]
|
||||
if !exists {
|
||||
return nil, ErrNotExists
|
||||
}
|
||||
|
||||
readF, err := os.Open(s.filePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer readF.Close()
|
||||
|
||||
_, err = readF.Seek(offsets.BlockOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
err = skip(decompressor, offsets.RecordOffset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, record, err := readRecord(decompressor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !bytes.Equal(record.KeyHash[:], keyHash[:]) {
|
||||
expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:])
|
||||
gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:])
|
||||
return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr)
|
||||
}
|
||||
|
||||
return record.ValueBytes, nil
|
||||
}
|
||||
|
||||
func (s *Store) get(key, value interface{}) error {
|
||||
s.readOrderChan <- struct{}{}
|
||||
defer func() { <-s.readOrderChan }()
|
||||
|
||||
hashToFind, err := hashInterface(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
offset, exists := db.dataOffset[string(hashToFind)]
|
||||
if !exists {
|
||||
return errors.New("not exists") // TODO: заменить на константную ошибку
|
||||
}
|
||||
|
||||
readF, err := os.Open(db.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer readF.Close()
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
err = skip(decompressor, offset)
|
||||
b, err := s.getGobBytes(hashToFind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, record, err := readRecord(decompressor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bytes.Compare(record.KeyHash, hashToFind) != 0 {
|
||||
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind), base64.StdEncoding.EncodeToString(record.KeyHash)) // TODO: заменить на константную ошибку
|
||||
}
|
||||
|
||||
return decode(record.ValueBytes, value)
|
||||
return decode(b, value)
|
||||
}
|
||||
|
||||
func Open(filePath string) (*Database, error) {
|
||||
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
func (s *Store) flush() error {
|
||||
l := int64(s.buffer.Len())
|
||||
|
||||
f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err)
|
||||
return fmt.Errorf("open store file: %v", err)
|
||||
}
|
||||
|
||||
compressor, err := zstd.NewWriter(f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err)
|
||||
}
|
||||
|
||||
database := &Database{
|
||||
dataOffset: make(map[string]int64),
|
||||
offset: 0,
|
||||
file: f,
|
||||
compressor: compressor,
|
||||
filePath: filePath}
|
||||
|
||||
// restore file data
|
||||
readF, err := os.Open(filePath)
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err)
|
||||
return fmt.Errorf("stat store file: %v", err)
|
||||
}
|
||||
defer readF.Close()
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize)
|
||||
|
||||
encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err)
|
||||
f.Close()
|
||||
return fmt.Errorf("init encoder: %v", err)
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
offset := int64(0)
|
||||
_, err = s.buffer.WriteTo(encoder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for key, val := range s.bufferDataOffset {
|
||||
s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val}
|
||||
}
|
||||
|
||||
s.bufferDataOffset = make(map[string]int64)
|
||||
|
||||
err = encoder.Close()
|
||||
if err != nil {
|
||||
// TODO: truncate file to previous state
|
||||
return err
|
||||
}
|
||||
|
||||
err = diskWriteBuffer.Flush()
|
||||
if err != nil {
|
||||
// TODO: truncate file to previous state
|
||||
return err
|
||||
}
|
||||
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update index file only on data update
|
||||
if s.options.useIndexFile && l > 0 {
|
||||
err = s.saveIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readBlock(r *bufio.Reader) (line []byte, n int, err error) {
|
||||
delim := []byte{0x28, 0xb5, 0x2f, 0xfd}
|
||||
|
||||
line = make([]byte, len(delim))
|
||||
copy(line, delim)
|
||||
|
||||
for {
|
||||
n, record, err := readRecord(decompressor)
|
||||
if err == io.EOF {
|
||||
s, err := r.ReadBytes(delim[len(delim)-1])
|
||||
line = append(line, []byte(s)...)
|
||||
if err != nil {
|
||||
if bytes.Equal(line, delim) { // contains only magic number
|
||||
return []byte{}, 0, err
|
||||
} else {
|
||||
return line, len(s), err
|
||||
}
|
||||
}
|
||||
|
||||
if bytes.Equal(line, append(delim, delim...)) { // first block
|
||||
line = make([]byte, len(delim))
|
||||
copy(line, delim)
|
||||
continue
|
||||
}
|
||||
|
||||
if bytes.HasSuffix(line, delim) {
|
||||
return line[:len(line)-len(delim)], len(s), nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RebuildIndex renews index from store file
|
||||
func (s *Store) RebuildIndex() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
err := s.rebuildIndex()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.options.useIndexFile {
|
||||
return s.saveIndex()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) rebuildIndex() error {
|
||||
f, err := os.Open(s.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r := bufio.NewReader(f)
|
||||
|
||||
var blockOffset int64
|
||||
|
||||
s.dataOffset = make(map[string]Offsets)
|
||||
|
||||
for {
|
||||
l, n, err := readBlock(r)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
} else if err == io.EOF && len(l) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
dec, err := zstd.NewReader(bytes.NewReader(l))
|
||||
|
||||
var recordOffset int64
|
||||
for {
|
||||
n, record, err := readRecord(dec)
|
||||
if err != nil {
|
||||
f.Close()
|
||||
return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
switch record.Type {
|
||||
case RecordTypeSet:
|
||||
database.dataOffset[string(record.KeyHash)] = offset
|
||||
s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset}
|
||||
case RecordTypeDelete:
|
||||
delete(database.dataOffset, string(record.KeyHash))
|
||||
delete(s.dataOffset, string(record.KeyHash[:]))
|
||||
}
|
||||
recordOffset += n
|
||||
}
|
||||
|
||||
offset += n
|
||||
blockOffset += int64(n)
|
||||
}
|
||||
|
||||
return database, nil
|
||||
idxBuf := new(bytes.Buffer)
|
||||
|
||||
err = gob.NewEncoder(idxBuf).Encode(s.dataOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) saveIndex() error {
|
||||
f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = gob.NewEncoder(f).Encode(s.dataOffset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
335
zkv_test.go
335
zkv_test.go
@ -1,6 +1,8 @@
|
||||
package zkv
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
@ -11,6 +13,7 @@ func TestReadWriteBasic(t *testing.T) {
|
||||
const filePath = "TestReadWriteBasic.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
@ -20,7 +23,16 @@ func TestReadWriteBasic(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
@ -47,6 +59,7 @@ func TestSmallWrites(t *testing.T) {
|
||||
const filePath = "TestSmallWrites.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
db, err := Open(filePath)
|
||||
@ -77,3 +90,323 @@ func TestSmallWrites(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDeleteBasic(t *testing.T) {
|
||||
const filePath = "TestDeleteBasic.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||
|
||||
err = db.Delete(50)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount-1)
|
||||
|
||||
var value int
|
||||
err = db.Get(50, &value)
|
||||
assert.Equal(t, 0, value)
|
||||
assert.ErrorIs(t, err, ErrNotExists)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// try to read
|
||||
db, err = Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount-1)
|
||||
assert.Len(t, db.bufferDataOffset, 0)
|
||||
|
||||
value = 0
|
||||
err = db.Get(50, &value)
|
||||
assert.Equal(t, 0, value)
|
||||
assert.ErrorIs(t, err, ErrNotExists)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestBufferBasic(t *testing.T) {
|
||||
const filePath = "TestBuffer.zkv"
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100})
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Set(1, make([]byte, 100))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, 0, db.dataOffset)
|
||||
assert.Len(t, db.bufferDataOffset, 0)
|
||||
assert.Equal(t, 0, db.buffer.Len())
|
||||
|
||||
var gotValue []byte
|
||||
err = db.Get(1, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, make([]byte, 100), gotValue)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestBufferRead(t *testing.T) {
|
||||
const filePath = "TestBufferRead.zkv"
|
||||
const recordCount = 2
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100})
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// try to read
|
||||
db, err = Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestBackupBasic(t *testing.T) {
|
||||
const filePath = "TestBackupBasic.zkv"
|
||||
const newFilePath = "TestBackupBasic2.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
defer os.Remove(newFilePath)
|
||||
defer os.Remove(newFilePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = db.Backup(newFilePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
db, err = Open(newFilePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestBackupWithDeletedRecords(t *testing.T) {
|
||||
const filePath = "TestBackupWithDeletedRecords.zkv"
|
||||
const newFilePath = "TestBackupWithDeletedRecords2.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
defer os.Remove(newFilePath)
|
||||
defer os.Remove(newFilePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = db.Flush()
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
if i%2 == 1 {
|
||||
continue
|
||||
}
|
||||
|
||||
err = db.Delete(i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
err = db.Backup(newFilePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
db, err = Open(newFilePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount/2)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
if i%2 == 0 {
|
||||
assert.ErrorIs(t, err, ErrNotExists)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestIndexFileBasic(t *testing.T) {
|
||||
const filePath = "TestReadWriteBasic.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// try to read
|
||||
db, err = Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestReadBlock(t *testing.T) {
|
||||
file, err := os.Open("testdata/TestReadBlock.zkv")
|
||||
assert.NoError(t, err)
|
||||
defer file.Close()
|
||||
|
||||
r := bufio.NewReader(file)
|
||||
|
||||
line, _, err := readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line)
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line)
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line)
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line)
|
||||
|
||||
line, _, err = readBlock(r)
|
||||
assert.Equal(t, line, []byte{})
|
||||
assert.Equal(t, io.EOF, err)
|
||||
}
|
||||
|
||||
func TestRebuildIndex(t *testing.T) {
|
||||
const filePath = "TestRebuiltIndex.zkv"
|
||||
const recordCount = 4
|
||||
defer os.Remove(filePath)
|
||||
defer os.Remove(filePath + indexFileExt)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
db, err := Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.RebuildIndex()
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user