1
0
mirror of https://github.com/nxshock/zkv.git synced 2024-11-27 11:21:02 +05:00

Compare commits

...

6 Commits

Author SHA1 Message Date
aa1a2edda6 Add TODOs 2022-12-03 21:03:27 +05:00
56c9dcce00 Implement write buffer 2022-12-03 20:59:17 +05:00
8ef0fd240b Update README.md 2022-12-03 20:57:50 +05:00
8ab9e96ef6 Add compression level option 2022-12-03 12:55:42 +05:00
06a429ae4c Add some notes 2022-12-03 12:41:11 +05:00
350634de38 Add option for limit a number of parallel reads 2022-12-03 12:40:36 +05:00
5 changed files with 341 additions and 104 deletions

View File

@ -4,8 +4,8 @@ Simple key-value store for single-user applications.
## Pros
* Simple file structure
* Internal compression
* Simple one file structure
* Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd)
* Threadsafe operations through `sync.RWMutex`
## Cons
@ -31,12 +31,19 @@ err = db.Set(key, value) // key and value can be any of type
// Read data
var value ValueType
err = db.Get(key)
err = db.Get(key, &value)
// Delete data
err = db.Delete(key)
```
Other methods:
```go
// Flush data to disk
err = db.Flush()
```
## File structure
Record is `encoding/gob` structure:
@ -53,3 +60,9 @@ File is log stuctured list of commands:
| -------| ------------------------ | -------- |
| Length | Record body bytes length | int64 |
| Body | Gob-encoded record | variable |
## TODO
- [ ] Implement `Copy()` method to copy store without deleted records
- [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go)
- [ ] Implement optional separate index file to speedup store initialization

13
defaults.go Normal file
View File

@ -0,0 +1,13 @@
package zkv
import (
"runtime"
"github.com/klauspost/compress/zstd"
)
var defaultOptions = Options{
MaxParallelReads: runtime.NumCPU(),
CompressionLevel: zstd.SpeedDefault,
BufferSize: 4 * 1024 * 1024,
}

24
options.go Normal file
View File

@ -0,0 +1,24 @@
package zkv
import "github.com/klauspost/compress/zstd"
type Options struct {
// Maximum number of concurrent reads
MaxParallelReads int
// Compression level
CompressionLevel zstd.EncoderLevel
// Write buffer size in bytes
BufferSize int
}
func (o *Options) setDefaults() {
if o.MaxParallelReads == 0 {
o.MaxParallelReads = defaultOptions.MaxParallelReads
}
if o.CompressionLevel == 0 {
o.CompressionLevel = defaultOptions.CompressionLevel
}
}

294
zkv.go
View File

@ -11,114 +11,49 @@ import (
"github.com/klauspost/compress/zstd"
)
type Database struct {
type Store struct {
dataOffset map[string]int64
file *os.File
compressor *zstd.Encoder
filePath string
offset int64
file *os.File
filePath string
offset int64
encoder *zstd.Encoder
buffer *bytes.Buffer
bufferDataOffset map[string]int64
options Options
readOrderChan chan struct{}
mu sync.RWMutex
}
func (db *Database) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
func OpenWithOptions(filePath string, options Options) (*Store, error) {
options.setDefaults()
err := db.compressor.Close()
if err != nil {
return err
}
return db.file.Close()
}
func (db *Database) Set(key, value interface{}) error {
db.mu.Lock()
defer db.mu.Unlock()
record, err := newRecord(RecordTypeSet, key, value)
if err != nil {
return err
}
b, err := record.Marshal()
if err != nil {
return err
}
db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки
_, err = db.compressor.Write(b)
if err != nil {
return err
}
db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки
return nil
}
func (db *Database) Get(key, value interface{}) error {
db.mu.RLock()
defer db.mu.RUnlock()
hashToFind, err := hashInterface(key)
if err != nil {
return err
}
offset, exists := db.dataOffset[string(hashToFind[:])]
if !exists {
return ErrNotExists
}
readF, err := os.Open(db.filePath)
if err != nil {
return err
}
defer readF.Close()
decompressor, err := zstd.NewReader(readF)
if err != nil {
return err
}
defer decompressor.Close()
err = skip(decompressor, offset)
if err != nil {
return err
}
_, record, err := readRecord(decompressor)
if err != nil {
return err
}
if !bytes.Equal(record.KeyHash[:], hashToFind[:]) {
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку
}
return decode(record.ValueBytes, value)
}
func Open(filePath string) (*Database, error) {
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
f.Close()
return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err)
}
compressor, err := zstd.NewWriter(f)
if err != nil {
f.Close()
return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err)
}
database := &Database{
dataOffset: make(map[string]int64),
offset: 0,
file: f,
compressor: compressor,
filePath: filePath}
database := &Store{
dataOffset: make(map[string]int64),
bufferDataOffset: make(map[string]int64),
offset: 0,
file: f,
encoder: compressor,
buffer: new(bytes.Buffer),
filePath: filePath,
options: options,
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
// restore file data
readF, err := os.Open(filePath)
@ -130,6 +65,7 @@ func Open(filePath string) (*Database, error) {
decompressor, err := zstd.NewReader(readF)
if err != nil {
f.Close()
return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err)
}
defer decompressor.Close()
@ -158,9 +94,27 @@ func Open(filePath string) (*Database, error) {
return database, nil
}
func (db *Database) Delete(key interface{}) error {
db.mu.Lock()
defer db.mu.Unlock()
func Open(filePath string) (*Store, error) {
return OpenWithOptions(filePath, defaultOptions)
}
func (s *Store) Set(key, value interface{}) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.set(key, value)
}
func (s *Store) Get(key, value interface{}) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.get(key, value)
}
func (s *Store) Delete(key interface{}) error {
s.mu.Lock()
defer s.mu.Unlock()
keyHash, err := hashInterface(key)
if err != nil {
@ -177,14 +131,158 @@ func (db *Database) Delete(key interface{}) error {
return err
}
delete(db.dataOffset, string(record.KeyHash[:]))
delete(s.dataOffset, string(record.KeyHash[:]))
delete(s.bufferDataOffset, string(record.KeyHash[:]))
_, err = db.compressor.Write(b)
_, err = s.buffer.Write(b)
if err != nil {
return err
}
db.offset += int64(len(b))
if s.buffer.Len() > s.options.BufferSize {
err = s.flush()
if err != nil {
return err
}
}
return nil
}
func (s *Store) Flush() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.flush()
}
func (s *Store) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.flush()
if err != nil {
return err
}
err = s.encoder.Close()
if err != nil {
return err
}
return s.file.Close()
}
func (s *Store) set(key, value interface{}) error {
record, err := newRecord(RecordTypeSet, key, value)
if err != nil {
return err
}
b, err := record.Marshal()
if err != nil {
return err
}
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
_, err = s.buffer.Write(b)
if err != nil {
return err
}
if s.buffer.Len() > s.options.BufferSize {
err = s.flush()
if err != nil {
return err
}
}
return nil
}
func (s *Store) get(key, value interface{}) error {
s.readOrderChan <- struct{}{}
defer func() { <-s.readOrderChan }()
hashToFind, err := hashInterface(key)
if err != nil {
return err
}
offset, exists := s.bufferDataOffset[string(hashToFind[:])]
if exists {
reader := bytes.NewReader(s.buffer.Bytes())
err = skip(reader, offset)
if err != nil {
return err
}
_, record, err := readRecord(reader)
if err != nil {
return err
}
return decode(record.ValueBytes, value)
}
offset, exists = s.dataOffset[string(hashToFind[:])]
if !exists {
return ErrNotExists
}
readF, err := os.Open(s.filePath)
if err != nil {
return err
}
defer readF.Close()
decompressor, err := zstd.NewReader(readF)
if err != nil {
return err
}
defer decompressor.Close()
err = skip(decompressor, offset)
if err != nil {
return err
}
_, record, err := readRecord(decompressor)
if err != nil {
return err
}
if !bytes.Equal(record.KeyHash[:], hashToFind[:]) {
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку
}
return decode(record.ValueBytes, value)
}
func (s *Store) flush() error {
l := int64(s.buffer.Len())
_, err := s.buffer.WriteTo(s.encoder)
if err != nil {
return err
}
for key, val := range s.bufferDataOffset {
s.dataOffset[key] = val + s.offset
}
s.bufferDataOffset = make(map[string]int64)
s.offset += l
err = s.encoder.Flush()
if err != nil {
return err
}
return nil
}

View File

@ -20,7 +20,16 @@ func TestReadWriteBasic(t *testing.T) {
assert.NoError(t, err)
}
assert.Len(t, db.dataOffset, recordCount)
assert.Len(t, db.dataOffset, 0)
assert.Len(t, db.bufferDataOffset, recordCount)
for i := 1; i <= recordCount; i++ {
var gotValue int
err = db.Get(i, &gotValue)
assert.NoError(t, err)
assert.Equal(t, i, gotValue)
}
err = db.Close()
assert.NoError(t, err)
@ -91,12 +100,15 @@ func TestDeleteBasic(t *testing.T) {
assert.NoError(t, err)
}
assert.Len(t, db.dataOffset, recordCount)
assert.Len(t, db.dataOffset, 0)
assert.Len(t, db.bufferDataOffset, recordCount)
err = db.Delete(50)
assert.NoError(t, err)
assert.Len(t, db.dataOffset, recordCount-1)
assert.Len(t, db.dataOffset, 0)
assert.Len(t, db.bufferDataOffset, recordCount-1)
var value int
err = db.Get(50, &value)
assert.Equal(t, 0, value)
@ -110,6 +122,8 @@ func TestDeleteBasic(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, db.dataOffset, recordCount-1)
assert.Len(t, db.bufferDataOffset, 0)
value = 0
err = db.Get(50, &value)
assert.Equal(t, 0, value)
@ -118,3 +132,78 @@ func TestDeleteBasic(t *testing.T) {
err = db.Close()
assert.NoError(t, err)
}
func TestBufferBasic(t *testing.T) {
const filePath = "TestBuffer.zkv"
defer os.Remove(filePath)
db, err := OpenWithOptions(filePath, Options{BufferSize: 100})
assert.NoError(t, err)
err = db.Set(1, make([]byte, 100))
assert.NoError(t, err)
assert.NotEqual(t, 0, db.dataOffset)
assert.Len(t, db.bufferDataOffset, 0)
assert.Equal(t, 0, db.buffer.Len())
var gotValue []byte
err = db.Get(1, &gotValue)
assert.NoError(t, err)
assert.Equal(t, make([]byte, 100), gotValue)
err = db.Close()
assert.NoError(t, err)
}
func TestBufferRead(t *testing.T) {
const filePath = "TestBufferRead.zkv"
const recordCount = 100
defer os.Remove(filePath)
db, err := OpenWithOptions(filePath, Options{BufferSize: 100})
assert.NoError(t, err)
for i := 1; i <= recordCount; i++ {
err = db.Set(i, i)
assert.NoError(t, err)
}
for i := 1; i <= recordCount; i++ {
var gotValue int
err = db.Get(i, &gotValue)
assert.NoError(t, err)
assert.Equal(t, i, gotValue)
}
for i := 1; i <= recordCount; i++ {
var gotValue int
err = db.Get(i, &gotValue)
assert.NoError(t, err)
assert.Equal(t, i, gotValue)
}
err = db.Close()
assert.NoError(t, err)
// try to read
db, err = Open(filePath)
assert.NoError(t, err)
assert.Len(t, db.dataOffset, recordCount)
for i := 1; i <= recordCount; i++ {
var gotValue int
err = db.Get(i, &gotValue)
assert.NoError(t, err)
assert.Equal(t, i, gotValue)
}
err = db.Close()
assert.NoError(t, err)
}