mirror of
https://github.com/nxshock/zkv.git
synced 2025-04-20 09:21:50 +05:00
Compare commits
25 Commits
Author | SHA1 | Date | |
---|---|---|---|
583f60da9f | |||
832d12c43b | |||
822504e8a0 | |||
2791b39d48 | |||
d950b6546c | |||
5f0d33828f | |||
412ddb11a8 | |||
0458ac5152 | |||
28f43e56d5 | |||
82a36a1b9e | |||
533eddaed4 | |||
fe90a55322 | |||
f093b7feed | |||
23ee15dc23 | |||
8dfc73af1d | |||
e166e07daa | |||
80540a662a | |||
9f116ad35e | |||
20a99d9f35 | |||
aa1a2edda6 | |||
56c9dcce00 | |||
8ef0fd240b | |||
8ab9e96ef6 | |||
06a429ae4c | |||
350634de38 |
63
README.md
63
README.md
@ -4,14 +4,15 @@ Simple key-value store for single-user applications.
|
|||||||
|
|
||||||
## Pros
|
## Pros
|
||||||
|
|
||||||
* Simple file structure
|
* Simple two file structure (data file and index file)
|
||||||
* Internal compression
|
* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd)
|
||||||
* Threadsafe operations through `sync.RWMutex`
|
* Threadsafe operations through `sync.RWMutex`
|
||||||
|
|
||||||
## Cons
|
## Cons
|
||||||
|
|
||||||
* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`)
|
* Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`)
|
||||||
* Need to read the whole file on store open to create file index
|
* No transaction system
|
||||||
|
* Index file is fully rewrited on every store commit
|
||||||
* No way to recover disk space from deleted records
|
* No way to recover disk space from deleted records
|
||||||
* Write/Delete operations block Read and each other operations
|
* Write/Delete operations block Read and each other operations
|
||||||
|
|
||||||
@ -20,7 +21,7 @@ Simple key-value store for single-user applications.
|
|||||||
Create or open existing file:
|
Create or open existing file:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
db, err := Open("path to file")
|
db, err := zkv.Open("path to file")
|
||||||
```
|
```
|
||||||
|
|
||||||
Data operations:
|
Data operations:
|
||||||
@ -31,12 +32,41 @@ err = db.Set(key, value) // key and value can be any of type
|
|||||||
|
|
||||||
// Read data
|
// Read data
|
||||||
var value ValueType
|
var value ValueType
|
||||||
err = db.Get(key)
|
err = db.Get(key, &value)
|
||||||
|
|
||||||
// Delete data
|
// Delete data
|
||||||
err = db.Delete(key)
|
err = db.Delete(key)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Other methods:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Flush data to disk
|
||||||
|
err = db.Flush()
|
||||||
|
|
||||||
|
// Backup data to another file
|
||||||
|
err = db.Backup("new/file/path")
|
||||||
|
```
|
||||||
|
|
||||||
|
## Store options
|
||||||
|
|
||||||
|
```go
|
||||||
|
type Options struct {
|
||||||
|
// Maximum number of concurrent reads
|
||||||
|
MaxParallelReads int
|
||||||
|
|
||||||
|
// Compression level
|
||||||
|
CompressionLevel zstd.EncoderLevel
|
||||||
|
|
||||||
|
// Memory write buffer size in bytes
|
||||||
|
MemoryBufferSize int
|
||||||
|
|
||||||
|
// Disk write buffer size in bytes
|
||||||
|
DiskBufferSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
## File structure
|
## File structure
|
||||||
|
|
||||||
Record is `encoding/gob` structure:
|
Record is `encoding/gob` structure:
|
||||||
@ -53,3 +83,26 @@ File is log stuctured list of commands:
|
|||||||
| -------| ------------------------ | -------- |
|
| -------| ------------------------ | -------- |
|
||||||
| Length | Record body bytes length | int64 |
|
| Length | Record body bytes length | int64 |
|
||||||
| Body | Gob-encoded record | variable |
|
| Body | Gob-encoded record | variable |
|
||||||
|
|
||||||
|
Index file is simple gob-encoded map:
|
||||||
|
|
||||||
|
```go
|
||||||
|
map[string]struct {
|
||||||
|
BlockOffset int64
|
||||||
|
RecordOffset int64
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
where map key is data key hash and value - data offset in data file.
|
||||||
|
|
||||||
|
## Resource consumption
|
||||||
|
|
||||||
|
Store requirements:
|
||||||
|
|
||||||
|
* around 300 Mb of RAM per 1 million of keys
|
||||||
|
* around 34 Mb of disk space for index file per 1 million of keys
|
||||||
|
|
||||||
|
## TODO
|
||||||
|
|
||||||
|
- [ ] Add recovery previous state of store file on write error
|
||||||
|
- [ ] Add method for index rebuild
|
||||||
|
BIN
TestDeleteBasic.zkv.idx
Normal file
BIN
TestDeleteBasic.zkv.idx
Normal file
Binary file not shown.
BIN
TestSmallWrites.zkv.idx
Normal file
BIN
TestSmallWrites.zkv.idx
Normal file
Binary file not shown.
17
defaults.go
Normal file
17
defaults.go
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
package zkv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"runtime"
|
||||||
|
|
||||||
|
"github.com/klauspost/compress/zstd"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defaultOptions = Options{
|
||||||
|
MaxParallelReads: runtime.NumCPU(),
|
||||||
|
CompressionLevel: zstd.SpeedDefault,
|
||||||
|
MemoryBufferSize: 4 * 1024 * 1024,
|
||||||
|
DiskBufferSize: 1 * 1024 * 1024,
|
||||||
|
useIndexFile: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
const indexFileExt = ".idx"
|
4
go.mod
4
go.mod
@ -3,8 +3,8 @@ module github.com/nxshock/zkv
|
|||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.15.12
|
github.com/klauspost/compress v1.16.4
|
||||||
github.com/stretchr/testify v1.8.1
|
github.com/stretchr/testify v1.8.2
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
8
go.sum
8
go.sum
@ -1,8 +1,8 @@
|
|||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
|
github.com/klauspost/compress v1.16.4 h1:91KN02FnsOYhuunwU4ssRe8lc2JosWmizWa91B5v1PU=
|
||||||
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
|
github.com/klauspost/compress v1.16.4/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
@ -10,8 +10,8 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
|
|||||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
40
options.go
Normal file
40
options.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package zkv
|
||||||
|
|
||||||
|
import "github.com/klauspost/compress/zstd"
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
// Maximum number of concurrent reads
|
||||||
|
MaxParallelReads int
|
||||||
|
|
||||||
|
// Compression level
|
||||||
|
CompressionLevel zstd.EncoderLevel
|
||||||
|
|
||||||
|
// Memory write buffer size in bytes
|
||||||
|
MemoryBufferSize int
|
||||||
|
|
||||||
|
// Disk write buffer size in bytes
|
||||||
|
DiskBufferSize int
|
||||||
|
|
||||||
|
// Use index file
|
||||||
|
useIndexFile bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *Options) setDefaults() {
|
||||||
|
o.useIndexFile = true // TODO: implement database search without index
|
||||||
|
|
||||||
|
if o.MaxParallelReads == 0 {
|
||||||
|
o.MaxParallelReads = defaultOptions.MaxParallelReads
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.CompressionLevel == 0 {
|
||||||
|
o.CompressionLevel = defaultOptions.CompressionLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.MemoryBufferSize == 0 {
|
||||||
|
o.MemoryBufferSize = defaultOptions.MemoryBufferSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if o.DiskBufferSize == 0 {
|
||||||
|
o.DiskBufferSize = defaultOptions.DiskBufferSize
|
||||||
|
}
|
||||||
|
}
|
19
record.go
19
record.go
@ -2,6 +2,7 @@ package zkv
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
"io"
|
"io"
|
||||||
@ -20,8 +21,17 @@ type Record struct {
|
|||||||
ValueBytes []byte
|
ValueBytes []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newRecordBytes(recordType RecordType, keyHash [sha256.Size224]byte, valueBytes []byte) (*Record, error) {
|
||||||
|
record := &Record{
|
||||||
|
Type: recordType,
|
||||||
|
KeyHash: keyHash,
|
||||||
|
ValueBytes: valueBytes}
|
||||||
|
|
||||||
|
return record, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
|
func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
|
||||||
keyBytes, err := encode(key)
|
keyHash, err := hashInterface(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -31,12 +41,7 @@ func newRecord(recordType RecordType, key, value interface{}) (*Record, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
record := &Record{
|
return newRecordBytes(recordType, keyHash, valueBytes)
|
||||||
Type: recordType,
|
|
||||||
KeyHash: hashBytes(keyBytes),
|
|
||||||
ValueBytes: valueBytes}
|
|
||||||
|
|
||||||
return record, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Record) Marshal() ([]byte, error) {
|
func (r *Record) Marshal() ([]byte, error) {
|
||||||
|
35
record_test.go
Normal file
35
record_test.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package zkv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRecord(t *testing.T) {
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
|
||||||
|
var records []Record
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
record, err := newRecord(RecordTypeSet, i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
records = append(records, *record)
|
||||||
|
|
||||||
|
b, err := record.Marshal()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = buf.Write(b)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
_, record, err := readRecord(buf)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, record.KeyHash, records[i].KeyHash)
|
||||||
|
assert.Equal(t, record.ValueBytes, records[i].ValueBytes)
|
||||||
|
}
|
||||||
|
}
|
BIN
testdata/TestReadBlock.zkv
vendored
Normal file
BIN
testdata/TestReadBlock.zkv
vendored
Normal file
Binary file not shown.
12
utils.go
12
utils.go
@ -4,7 +4,9 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
func encode(value interface{}) ([]byte, error) {
|
func encode(value interface{}) ([]byte, error) {
|
||||||
@ -40,3 +42,13 @@ func skip(r io.Reader, count int64) (err error) {
|
|||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isFileExists(filePath string) (bool, error) {
|
||||||
|
if _, err := os.Stat(filePath); err == nil {
|
||||||
|
return true, nil
|
||||||
|
} else if errors.Is(err, os.ErrNotExist) {
|
||||||
|
return false, nil
|
||||||
|
} else {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
584
zkv.go
584
zkv.go
@ -1,8 +1,11 @@
|
|||||||
package zkv
|
package zkv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
|
"encoding/gob"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
@ -11,156 +14,88 @@ import (
|
|||||||
"github.com/klauspost/compress/zstd"
|
"github.com/klauspost/compress/zstd"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database struct {
|
type Offsets struct {
|
||||||
dataOffset map[string]int64
|
BlockOffset int64
|
||||||
file *os.File
|
RecordOffset int64
|
||||||
compressor *zstd.Encoder
|
}
|
||||||
filePath string
|
|
||||||
offset int64
|
type Store struct {
|
||||||
|
dataOffset map[string]Offsets
|
||||||
|
|
||||||
|
filePath string
|
||||||
|
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
bufferDataOffset map[string]int64
|
||||||
|
|
||||||
|
options Options
|
||||||
|
|
||||||
|
readOrderChan chan struct{}
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) Close() error {
|
func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
||||||
db.mu.Lock()
|
options.setDefaults()
|
||||||
defer db.mu.Unlock()
|
|
||||||
|
|
||||||
err := db.compressor.Close()
|
store := &Store{
|
||||||
if err != nil {
|
dataOffset: make(map[string]Offsets),
|
||||||
return err
|
bufferDataOffset: make(map[string]int64),
|
||||||
|
buffer: new(bytes.Buffer),
|
||||||
|
filePath: filePath,
|
||||||
|
options: options,
|
||||||
|
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
|
||||||
|
|
||||||
|
if options.useIndexFile {
|
||||||
|
idxFile, err := os.Open(filePath + indexFileExt)
|
||||||
|
if err == nil {
|
||||||
|
err = gob.NewDecoder(idxFile).Decode(&store.dataOffset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return store, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.file.Close()
|
exists, err := isFileExists(filePath)
|
||||||
}
|
|
||||||
|
|
||||||
func (db *Database) Set(key, value interface{}) error {
|
|
||||||
db.mu.Lock()
|
|
||||||
defer db.mu.Unlock()
|
|
||||||
|
|
||||||
record, err := newRecord(RecordTypeSet, key, value)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := record.Marshal()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки
|
|
||||||
|
|
||||||
_, err = db.compressor.Write(b)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *Database) Get(key, value interface{}) error {
|
|
||||||
db.mu.RLock()
|
|
||||||
defer db.mu.RUnlock()
|
|
||||||
|
|
||||||
hashToFind, err := hashInterface(key)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
offset, exists := db.dataOffset[string(hashToFind[:])]
|
|
||||||
if !exists {
|
if !exists {
|
||||||
return ErrNotExists
|
return store, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
readF, err := os.Open(db.filePath)
|
err = store.rebuildIndex()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
|
||||||
defer readF.Close()
|
|
||||||
|
|
||||||
decompressor, err := zstd.NewReader(readF)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer decompressor.Close()
|
|
||||||
|
|
||||||
err = skip(decompressor, offset)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, record, err := readRecord(decompressor)
|
return store, nil
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !bytes.Equal(record.KeyHash[:], hashToFind[:]) {
|
|
||||||
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку
|
|
||||||
}
|
|
||||||
|
|
||||||
return decode(record.ValueBytes, value)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Open(filePath string) (*Database, error) {
|
func Open(filePath string) (*Store, error) {
|
||||||
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
options := defaultOptions
|
||||||
if err != nil {
|
return OpenWithOptions(filePath, options)
|
||||||
return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
compressor, err := zstd.NewWriter(f)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
database := &Database{
|
|
||||||
dataOffset: make(map[string]int64),
|
|
||||||
offset: 0,
|
|
||||||
file: f,
|
|
||||||
compressor: compressor,
|
|
||||||
filePath: filePath}
|
|
||||||
|
|
||||||
// restore file data
|
|
||||||
readF, err := os.Open(filePath)
|
|
||||||
if err != nil {
|
|
||||||
f.Close()
|
|
||||||
return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err)
|
|
||||||
}
|
|
||||||
defer readF.Close()
|
|
||||||
|
|
||||||
decompressor, err := zstd.NewReader(readF)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err)
|
|
||||||
}
|
|
||||||
defer decompressor.Close()
|
|
||||||
|
|
||||||
offset := int64(0)
|
|
||||||
for {
|
|
||||||
n, record, err := readRecord(decompressor)
|
|
||||||
if err == io.EOF {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
f.Close()
|
|
||||||
return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
switch record.Type {
|
|
||||||
case RecordTypeSet:
|
|
||||||
database.dataOffset[string(record.KeyHash[:])] = offset
|
|
||||||
case RecordTypeDelete:
|
|
||||||
delete(database.dataOffset, string(record.KeyHash[:]))
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += n
|
|
||||||
}
|
|
||||||
|
|
||||||
return database, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *Database) Delete(key interface{}) error {
|
func (s *Store) Set(key, value interface{}) error {
|
||||||
db.mu.Lock()
|
s.mu.Lock()
|
||||||
defer db.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
return s.set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Get(key, value interface{}) error {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
return s.get(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Delete(key interface{}) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
keyHash, err := hashInterface(key)
|
keyHash, err := hashInterface(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -177,14 +112,397 @@ func (db *Database) Delete(key interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(db.dataOffset, string(record.KeyHash[:]))
|
delete(s.dataOffset, string(record.KeyHash[:]))
|
||||||
|
delete(s.bufferDataOffset, string(record.KeyHash[:]))
|
||||||
|
|
||||||
_, err = db.compressor.Write(b)
|
_, err = s.buffer.Write(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
db.offset += int64(len(b))
|
if s.buffer.Len() > s.options.MemoryBufferSize {
|
||||||
|
err = s.flush()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) Flush() error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
return s.flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) BackupWithOptions(filePath string, newFileOptions Options) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
err := s.flush()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
newStore, err := OpenWithOptions(filePath, newFileOptions)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for keyHashStr := range s.dataOffset {
|
||||||
|
var keyHash [sha256.Size224]byte
|
||||||
|
copy(keyHash[:], keyHashStr)
|
||||||
|
|
||||||
|
valueBytes, err := s.getGobBytes(keyHash)
|
||||||
|
if err != nil {
|
||||||
|
newStore.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = newStore.setBytes(keyHash, valueBytes)
|
||||||
|
if err != nil {
|
||||||
|
newStore.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return newStore.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Backup(filePath string) error {
|
||||||
|
return s.BackupWithOptions(filePath, defaultOptions)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) Close() error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
err := s.flush()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error {
|
||||||
|
record, err := newRecordBytes(RecordTypeSet, keyHash, valueBytes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := record.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
|
||||||
|
|
||||||
|
_, err = s.buffer.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.buffer.Len() > s.options.MemoryBufferSize {
|
||||||
|
err = s.flush()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) set(key, value interface{}) error {
|
||||||
|
record, err := newRecord(RecordTypeSet, key, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := record.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
|
||||||
|
|
||||||
|
_, err = s.buffer.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.buffer.Len() > s.options.MemoryBufferSize {
|
||||||
|
err = s.flush()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) {
|
||||||
|
s.readOrderChan <- struct{}{}
|
||||||
|
defer func() { <-s.readOrderChan }()
|
||||||
|
|
||||||
|
offset, exists := s.bufferDataOffset[string(keyHash[:])]
|
||||||
|
if exists {
|
||||||
|
reader := bytes.NewReader(s.buffer.Bytes())
|
||||||
|
|
||||||
|
err := skip(reader, offset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, record, err := readRecord(reader)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return record.ValueBytes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
offsets, exists := s.dataOffset[string(keyHash[:])]
|
||||||
|
if !exists {
|
||||||
|
return nil, ErrNotExists
|
||||||
|
}
|
||||||
|
|
||||||
|
readF, err := os.Open(s.filePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer readF.Close()
|
||||||
|
|
||||||
|
_, err = readF.Seek(offsets.BlockOffset, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
decompressor, err := zstd.NewReader(readF)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer decompressor.Close()
|
||||||
|
|
||||||
|
err = skip(decompressor, offsets.RecordOffset)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, record, err := readRecord(decompressor)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(record.KeyHash[:], keyHash[:]) {
|
||||||
|
expectedHashStr := base64.StdEncoding.EncodeToString(keyHash[:])
|
||||||
|
gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:])
|
||||||
|
return nil, fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return record.ValueBytes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) get(key, value interface{}) error {
|
||||||
|
s.readOrderChan <- struct{}{}
|
||||||
|
defer func() { <-s.readOrderChan }()
|
||||||
|
|
||||||
|
hashToFind, err := hashInterface(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := s.getGobBytes(hashToFind)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return decode(b, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) flush() error {
|
||||||
|
l := int64(s.buffer.Len())
|
||||||
|
|
||||||
|
f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open store file: %v", err)
|
||||||
|
}
|
||||||
|
stat, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
f.Close()
|
||||||
|
return fmt.Errorf("stat store file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize)
|
||||||
|
|
||||||
|
encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel))
|
||||||
|
if err != nil {
|
||||||
|
f.Close()
|
||||||
|
return fmt.Errorf("init encoder: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.buffer.WriteTo(encoder)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, val := range s.bufferDataOffset {
|
||||||
|
s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.bufferDataOffset = make(map[string]int64)
|
||||||
|
|
||||||
|
err = encoder.Close()
|
||||||
|
if err != nil {
|
||||||
|
// TODO: truncate file to previous state
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = diskWriteBuffer.Flush()
|
||||||
|
if err != nil {
|
||||||
|
// TODO: truncate file to previous state
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = f.Close()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update index file only on data update
|
||||||
|
if s.options.useIndexFile && l > 0 {
|
||||||
|
err = s.saveIndex()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func readBlock(r *bufio.Reader) (line []byte, n int, err error) {
|
||||||
|
delim := []byte{0x28, 0xb5, 0x2f, 0xfd}
|
||||||
|
|
||||||
|
line = make([]byte, len(delim))
|
||||||
|
copy(line, delim)
|
||||||
|
|
||||||
|
for {
|
||||||
|
s, err := r.ReadBytes(delim[len(delim)-1])
|
||||||
|
line = append(line, []byte(s)...)
|
||||||
|
if err != nil {
|
||||||
|
if bytes.Equal(line, delim) { // contains only magic number
|
||||||
|
return []byte{}, 0, err
|
||||||
|
} else {
|
||||||
|
return line, len(s), err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.Equal(line, append(delim, delim...)) { // first block
|
||||||
|
line = make([]byte, len(delim))
|
||||||
|
copy(line, delim)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if bytes.HasSuffix(line, delim) {
|
||||||
|
return line[:len(line)-len(delim)], len(s), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RebuildIndex renews index from store file
|
||||||
|
func (s *Store) RebuildIndex() error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
err := s.rebuildIndex()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.options.useIndexFile {
|
||||||
|
return s.saveIndex()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) rebuildIndex() error {
|
||||||
|
f, err := os.Open(s.filePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
r := bufio.NewReader(f)
|
||||||
|
|
||||||
|
var blockOffset int64
|
||||||
|
|
||||||
|
s.dataOffset = make(map[string]Offsets)
|
||||||
|
|
||||||
|
for {
|
||||||
|
l, n, err := readBlock(r)
|
||||||
|
if err != nil {
|
||||||
|
if err != io.EOF {
|
||||||
|
return err
|
||||||
|
} else if err == io.EOF && len(l) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dec, err := zstd.NewReader(bytes.NewReader(l))
|
||||||
|
|
||||||
|
var recordOffset int64
|
||||||
|
for {
|
||||||
|
n, record, err := readRecord(dec)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
switch record.Type {
|
||||||
|
case RecordTypeSet:
|
||||||
|
s.dataOffset[string(record.KeyHash[:])] = Offsets{BlockOffset: blockOffset, RecordOffset: recordOffset}
|
||||||
|
case RecordTypeDelete:
|
||||||
|
delete(s.dataOffset, string(record.KeyHash[:]))
|
||||||
|
}
|
||||||
|
recordOffset += n
|
||||||
|
}
|
||||||
|
|
||||||
|
blockOffset += int64(n)
|
||||||
|
}
|
||||||
|
|
||||||
|
idxBuf := new(bytes.Buffer)
|
||||||
|
|
||||||
|
err = gob.NewEncoder(idxBuf).Encode(s.dataOffset)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = os.WriteFile(s.filePath+indexFileExt, idxBuf.Bytes(), 0644)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) saveIndex() error {
|
||||||
|
f, err := os.OpenFile(s.filePath+indexFileExt, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = gob.NewEncoder(f).Encode(s.dataOffset)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.Close()
|
||||||
|
}
|
||||||
|
298
zkv_test.go
298
zkv_test.go
@ -1,6 +1,8 @@
|
|||||||
package zkv
|
package zkv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -11,6 +13,7 @@ func TestReadWriteBasic(t *testing.T) {
|
|||||||
const filePath = "TestReadWriteBasic.zkv"
|
const filePath = "TestReadWriteBasic.zkv"
|
||||||
const recordCount = 100
|
const recordCount = 100
|
||||||
defer os.Remove(filePath)
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
db, err := Open(filePath)
|
db, err := Open(filePath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -20,7 +23,16 @@ func TestReadWriteBasic(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Len(t, db.dataOffset, recordCount)
|
assert.Len(t, db.dataOffset, 0)
|
||||||
|
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
err = db.Close()
|
err = db.Close()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -47,6 +59,7 @@ func TestSmallWrites(t *testing.T) {
|
|||||||
const filePath = "TestSmallWrites.zkv"
|
const filePath = "TestSmallWrites.zkv"
|
||||||
const recordCount = 100
|
const recordCount = 100
|
||||||
defer os.Remove(filePath)
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
for i := 1; i <= recordCount; i++ {
|
for i := 1; i <= recordCount; i++ {
|
||||||
db, err := Open(filePath)
|
db, err := Open(filePath)
|
||||||
@ -82,6 +95,7 @@ func TestDeleteBasic(t *testing.T) {
|
|||||||
const filePath = "TestDeleteBasic.zkv"
|
const filePath = "TestDeleteBasic.zkv"
|
||||||
const recordCount = 100
|
const recordCount = 100
|
||||||
defer os.Remove(filePath)
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
db, err := Open(filePath)
|
db, err := Open(filePath)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
@ -91,12 +105,15 @@ func TestDeleteBasic(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Len(t, db.dataOffset, recordCount)
|
assert.Len(t, db.dataOffset, 0)
|
||||||
|
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||||
|
|
||||||
err = db.Delete(50)
|
err = db.Delete(50)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Len(t, db.dataOffset, recordCount-1)
|
assert.Len(t, db.dataOffset, 0)
|
||||||
|
assert.Len(t, db.bufferDataOffset, recordCount-1)
|
||||||
|
|
||||||
var value int
|
var value int
|
||||||
err = db.Get(50, &value)
|
err = db.Get(50, &value)
|
||||||
assert.Equal(t, 0, value)
|
assert.Equal(t, 0, value)
|
||||||
@ -110,6 +127,8 @@ func TestDeleteBasic(t *testing.T) {
|
|||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
assert.Len(t, db.dataOffset, recordCount-1)
|
assert.Len(t, db.dataOffset, recordCount-1)
|
||||||
|
assert.Len(t, db.bufferDataOffset, 0)
|
||||||
|
|
||||||
value = 0
|
value = 0
|
||||||
err = db.Get(50, &value)
|
err = db.Get(50, &value)
|
||||||
assert.Equal(t, 0, value)
|
assert.Equal(t, 0, value)
|
||||||
@ -118,3 +137,276 @@ func TestDeleteBasic(t *testing.T) {
|
|||||||
err = db.Close()
|
err = db.Close()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBufferBasic(t *testing.T) {
|
||||||
|
const filePath = "TestBuffer.zkv"
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
|
db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Set(1, make([]byte, 100))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.NotEqual(t, 0, db.dataOffset)
|
||||||
|
assert.Len(t, db.bufferDataOffset, 0)
|
||||||
|
assert.Equal(t, 0, db.buffer.Len())
|
||||||
|
|
||||||
|
var gotValue []byte
|
||||||
|
err = db.Get(1, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, make([]byte, 100), gotValue)
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBufferRead(t *testing.T) {
|
||||||
|
const filePath = "TestBufferRead.zkv"
|
||||||
|
const recordCount = 2
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
|
db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
err = db.Set(i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// try to read
|
||||||
|
db, err = Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Len(t, db.dataOffset, recordCount)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackupBasic(t *testing.T) {
|
||||||
|
const filePath = "TestBackupBasic.zkv"
|
||||||
|
const newFilePath = "TestBackupBasic2.zkv"
|
||||||
|
const recordCount = 100
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
defer os.Remove(newFilePath)
|
||||||
|
defer os.Remove(newFilePath + indexFileExt)
|
||||||
|
|
||||||
|
db, err := Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
err = db.Set(i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Backup(newFilePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
db, err = Open(newFilePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Len(t, db.dataOffset, recordCount)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBackupWithDeletedRecords(t *testing.T) {
|
||||||
|
const filePath = "TestBackupWithDeletedRecords.zkv"
|
||||||
|
const newFilePath = "TestBackupWithDeletedRecords2.zkv"
|
||||||
|
const recordCount = 100
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
defer os.Remove(newFilePath)
|
||||||
|
defer os.Remove(newFilePath + indexFileExt)
|
||||||
|
|
||||||
|
db, err := Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
err = db.Set(i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Flush()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
if i%2 == 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Delete(i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Backup(newFilePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
db, err = Open(newFilePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Len(t, db.dataOffset, recordCount/2)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
if i%2 == 0 {
|
||||||
|
assert.ErrorIs(t, err, ErrNotExists)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIndexFileBasic(t *testing.T) {
|
||||||
|
const filePath = "TestReadWriteBasic.zkv"
|
||||||
|
const recordCount = 100
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
|
db, err := Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
err = db.Set(i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Len(t, db.dataOffset, 0)
|
||||||
|
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// try to read
|
||||||
|
db, err = Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Len(t, db.dataOffset, recordCount)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReadBlock(t *testing.T) {
|
||||||
|
file, err := os.Open("testdata/TestReadBlock.zkv")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
r := bufio.NewReader(file)
|
||||||
|
|
||||||
|
line, _, err := readBlock(r)
|
||||||
|
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x90, 0xff, 0xf4, 0x25, 0x15, 0x70, 0x75, 0x5c, 0xff, 0xf4, 0xff, 0xbc, 0xff, 0xf9, 0xff, 0xde, 0xff, 0x93, 0xff, 0xf8, 0x0d, 0x0e, 0x78, 0x5b, 0xff, 0x81, 0xff, 0x95, 0x6e, 0xff, 0xab, 0x4b, 0xff, 0xe8, 0x37, 0xff, 0x97, 0x68, 0x41, 0x3d, 0x01, 0x04, 0x03, 0x04, 0x00, 0x02, 0x00, 0x25, 0xd5, 0x63, 0x21}, line)
|
||||||
|
line, _, err = readBlock(r)
|
||||||
|
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x89, 0x04, 0x00, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x34, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0x84, 0xff, 0x84, 0xff, 0xc1, 0x21, 0x02, 0xff, 0x8b, 0xff, 0xd7, 0x6d, 0xff, 0xd0, 0xff, 0xad, 0x1a, 0x55, 0x14, 0x5c, 0xff, 0xb1, 0x04, 0x37, 0x29, 0x2f, 0x78, 0x18, 0xff, 0xb5, 0xff, 0xe4, 0x56, 0x4e, 0xff, 0x8d, 0x19, 0x46, 0x01, 0x04, 0x03, 0x04, 0x00, 0x04, 0x00, 0x0c, 0x3b, 0xbf, 0x39}, line)
|
||||||
|
line, _, err = readBlock(r)
|
||||||
|
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0x99, 0x04, 0x00, 0x8b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x36, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0x25, 0x79, 0x3e, 0x46, 0x4e, 0xff, 0xac, 0x06, 0x27, 0xff, 0xb1, 0xff, 0xa3, 0xff, 0xaa, 0xff, 0xe3, 0xff, 0xde, 0x37, 0x71, 0x63, 0x72, 0xff, 0x89, 0x0d, 0xff, 0x85, 0x39, 0xff, 0xb5, 0xff, 0xb9, 0xff, 0x8a, 0xff, 0x9e, 0x60, 0xff, 0xad, 0x17, 0x01, 0x04, 0x03, 0x04, 0x00, 0x06, 0x00, 0x52, 0x08, 0x3e, 0x26}, line)
|
||||||
|
line, _, err = readBlock(r)
|
||||||
|
assert.Equal(t, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x00, 0xc9, 0x04, 0x00, 0x91, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x39, 0xff, 0x81, 0x03, 0x01, 0x01, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x01, 0xff, 0x82, 0x00, 0x01, 0x03, 0x01, 0x04, 0x54, 0x79, 0x70, 0x65, 0x01, 0x06, 0x00, 0x01, 0x07, 0x4b, 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x01, 0xff, 0x84, 0x00, 0x01, 0x0a, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x79, 0x74, 0x65, 0x73, 0x01, 0x0a, 0x00, 0x00, 0x00, 0x19, 0xff, 0x83, 0x01, 0x01, 0x01, 0x09, 0x5b, 0x32, 0x38, 0x5d, 0x75, 0x69, 0x6e, 0x74, 0x38, 0x01, 0xff, 0x84, 0x00, 0x01, 0x06, 0x01, 0x38, 0x00, 0x00, 0x3c, 0xff, 0x82, 0x01, 0x01, 0x01, 0x1c, 0xff, 0xbf, 0x25, 0xff, 0xef, 0xff, 0xc8, 0xff, 0x85, 0x2c, 0xff, 0xbf, 0xff, 0xb5, 0xff, 0xad, 0xff, 0xfa, 0xff, 0xaf, 0x1c, 0xff, 0xe7, 0x71, 0xff, 0xfa, 0x36, 0xff, 0x95, 0x1b, 0xff, 0x91, 0xff, 0xab, 0x36, 0xff, 0xcd, 0x7a, 0x33, 0xff, 0xf7, 0xff, 0xec, 0xff, 0xee, 0xff, 0xc1, 0x01, 0x04, 0x03, 0x04, 0x00, 0x08, 0x00, 0xa5, 0x0e, 0x62, 0x53}, line)
|
||||||
|
|
||||||
|
line, _, err = readBlock(r)
|
||||||
|
assert.Equal(t, line, []byte{})
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRebuildIndex(t *testing.T) {
|
||||||
|
const filePath = "TestRebuiltIndex.zkv"
|
||||||
|
const recordCount = 4
|
||||||
|
defer os.Remove(filePath)
|
||||||
|
defer os.Remove(filePath + indexFileExt)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
db, err := Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Set(i, i)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Close()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := Open(filePath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.RebuildIndex()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 1; i <= recordCount; i++ {
|
||||||
|
var gotValue int
|
||||||
|
|
||||||
|
err = db.Get(i, &gotValue)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, i, gotValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user