mirror of
https://github.com/nxshock/zkv.git
synced 2025-01-17 19:11:10 +05:00
Implement write buffer
This commit is contained in:
parent
8ef0fd240b
commit
56c9dcce00
@ -1,8 +1,13 @@
|
||||
package zkv
|
||||
|
||||
import "github.com/klauspost/compress/zstd"
|
||||
import (
|
||||
"runtime"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
var defaultOptions = Options{
|
||||
MaxParallelReads: 64,
|
||||
MaxParallelReads: runtime.NumCPU(),
|
||||
CompressionLevel: zstd.SpeedDefault,
|
||||
BufferSize: 4 * 1024 * 1024,
|
||||
}
|
||||
|
@ -4,10 +4,13 @@ import "github.com/klauspost/compress/zstd"
|
||||
|
||||
type Options struct {
|
||||
// Maximum number of concurrent reads
|
||||
MaxParallelReads uint
|
||||
MaxParallelReads int
|
||||
|
||||
// Compression level
|
||||
CompressionLevel zstd.EncoderLevel
|
||||
|
||||
// Write buffer size in bytes
|
||||
BufferSize int
|
||||
}
|
||||
|
||||
func (o *Options) setDefaults() {
|
||||
|
290
zkv.go
290
zkv.go
@ -11,12 +11,16 @@ import (
|
||||
"github.com/klauspost/compress/zstd"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
type Store struct {
|
||||
dataOffset map[string]int64
|
||||
file *os.File
|
||||
compressor *zstd.Encoder
|
||||
filePath string
|
||||
offset int64
|
||||
|
||||
file *os.File
|
||||
filePath string
|
||||
offset int64
|
||||
encoder *zstd.Encoder
|
||||
|
||||
buffer *bytes.Buffer
|
||||
bufferDataOffset map[string]int64
|
||||
|
||||
options Options
|
||||
|
||||
@ -25,91 +29,7 @@ type Database struct {
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (db *Database) Close() error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
err := db.compressor.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return db.file.Close()
|
||||
}
|
||||
|
||||
func (db *Database) Set(key, value interface{}) error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
|
||||
record, err := newRecord(RecordTypeSet, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := record.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки
|
||||
|
||||
_, err = db.compressor.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) Get(key, value interface{}) error {
|
||||
db.mu.RLock()
|
||||
defer db.mu.RUnlock()
|
||||
|
||||
db.readOrderChan <- struct{}{}
|
||||
defer func() { <-db.readOrderChan }()
|
||||
|
||||
hashToFind, err := hashInterface(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
offset, exists := db.dataOffset[string(hashToFind[:])]
|
||||
if !exists {
|
||||
return ErrNotExists
|
||||
}
|
||||
|
||||
readF, err := os.Open(db.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer readF.Close()
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
err = skip(decompressor, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, record, err := readRecord(decompressor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !bytes.Equal(record.KeyHash[:], hashToFind[:]) {
|
||||
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку
|
||||
}
|
||||
|
||||
return decode(record.ValueBytes, value)
|
||||
}
|
||||
|
||||
func OpenWithOptions(filePath string, options Options) (*Database, error) {
|
||||
func OpenWithOptions(filePath string, options Options) (*Store, error) {
|
||||
options.setDefaults()
|
||||
|
||||
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
@ -124,14 +44,16 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) {
|
||||
return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err)
|
||||
}
|
||||
|
||||
database := &Database{
|
||||
dataOffset: make(map[string]int64),
|
||||
offset: 0,
|
||||
file: f,
|
||||
compressor: compressor,
|
||||
filePath: filePath,
|
||||
options: options,
|
||||
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
|
||||
database := &Store{
|
||||
dataOffset: make(map[string]int64),
|
||||
bufferDataOffset: make(map[string]int64),
|
||||
offset: 0,
|
||||
file: f,
|
||||
encoder: compressor,
|
||||
buffer: new(bytes.Buffer),
|
||||
filePath: filePath,
|
||||
options: options,
|
||||
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
|
||||
|
||||
// restore file data
|
||||
readF, err := os.Open(filePath)
|
||||
@ -172,13 +94,27 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) {
|
||||
return database, nil
|
||||
}
|
||||
|
||||
func Open(filePath string) (*Database, error) {
|
||||
func Open(filePath string) (*Store, error) {
|
||||
return OpenWithOptions(filePath, defaultOptions)
|
||||
}
|
||||
|
||||
func (db *Database) Delete(key interface{}) error {
|
||||
db.mu.Lock()
|
||||
defer db.mu.Unlock()
|
||||
func (s *Store) Set(key, value interface{}) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.set(key, value)
|
||||
}
|
||||
|
||||
func (s *Store) Get(key, value interface{}) error {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.get(key, value)
|
||||
}
|
||||
|
||||
func (s *Store) Delete(key interface{}) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
keyHash, err := hashInterface(key)
|
||||
if err != nil {
|
||||
@ -195,14 +131,158 @@ func (db *Database) Delete(key interface{}) error {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(db.dataOffset, string(record.KeyHash[:]))
|
||||
delete(s.dataOffset, string(record.KeyHash[:]))
|
||||
delete(s.bufferDataOffset, string(record.KeyHash[:]))
|
||||
|
||||
_, err = db.compressor.Write(b)
|
||||
_, err = s.buffer.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db.offset += int64(len(b))
|
||||
if s.buffer.Len() > s.options.BufferSize {
|
||||
err = s.flush()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) Flush() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return s.flush()
|
||||
}
|
||||
|
||||
func (s *Store) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
err := s.flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.encoder.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.file.Close()
|
||||
}
|
||||
|
||||
func (s *Store) set(key, value interface{}) error {
|
||||
record, err := newRecord(RecordTypeSet, key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b, err := record.Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len())
|
||||
|
||||
_, err = s.buffer.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.buffer.Len() > s.options.BufferSize {
|
||||
err = s.flush()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) get(key, value interface{}) error {
|
||||
s.readOrderChan <- struct{}{}
|
||||
defer func() { <-s.readOrderChan }()
|
||||
|
||||
hashToFind, err := hashInterface(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
offset, exists := s.bufferDataOffset[string(hashToFind[:])]
|
||||
if exists {
|
||||
reader := bytes.NewReader(s.buffer.Bytes())
|
||||
|
||||
err = skip(reader, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, record, err := readRecord(reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return decode(record.ValueBytes, value)
|
||||
}
|
||||
|
||||
offset, exists = s.dataOffset[string(hashToFind[:])]
|
||||
if !exists {
|
||||
return ErrNotExists
|
||||
}
|
||||
|
||||
readF, err := os.Open(s.filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer readF.Close()
|
||||
|
||||
decompressor, err := zstd.NewReader(readF)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer decompressor.Close()
|
||||
|
||||
err = skip(decompressor, offset)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, record, err := readRecord(decompressor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !bytes.Equal(record.KeyHash[:], hashToFind[:]) {
|
||||
return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку
|
||||
}
|
||||
|
||||
return decode(record.ValueBytes, value)
|
||||
}
|
||||
|
||||
func (s *Store) flush() error {
|
||||
l := int64(s.buffer.Len())
|
||||
|
||||
_, err := s.buffer.WriteTo(s.encoder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for key, val := range s.bufferDataOffset {
|
||||
s.dataOffset[key] = val + s.offset
|
||||
}
|
||||
|
||||
s.bufferDataOffset = make(map[string]int64)
|
||||
|
||||
s.offset += l
|
||||
|
||||
err = s.encoder.Flush()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
95
zkv_test.go
95
zkv_test.go
@ -20,7 +20,16 @@ func TestReadWriteBasic(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
@ -91,12 +100,15 @@ func TestDeleteBasic(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount)
|
||||
|
||||
err = db.Delete(50)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount-1)
|
||||
assert.Len(t, db.dataOffset, 0)
|
||||
assert.Len(t, db.bufferDataOffset, recordCount-1)
|
||||
|
||||
var value int
|
||||
err = db.Get(50, &value)
|
||||
assert.Equal(t, 0, value)
|
||||
@ -110,6 +122,8 @@ func TestDeleteBasic(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount-1)
|
||||
assert.Len(t, db.bufferDataOffset, 0)
|
||||
|
||||
value = 0
|
||||
err = db.Get(50, &value)
|
||||
assert.Equal(t, 0, value)
|
||||
@ -118,3 +132,78 @@ func TestDeleteBasic(t *testing.T) {
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestBufferBasic(t *testing.T) {
|
||||
const filePath = "TestBuffer.zkv"
|
||||
defer os.Remove(filePath)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{BufferSize: 100})
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Set(1, make([]byte, 100))
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, 0, db.dataOffset)
|
||||
assert.Len(t, db.bufferDataOffset, 0)
|
||||
assert.Equal(t, 0, db.buffer.Len())
|
||||
|
||||
var gotValue []byte
|
||||
err = db.Get(1, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, make([]byte, 100), gotValue)
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestBufferRead(t *testing.T) {
|
||||
const filePath = "TestBufferRead.zkv"
|
||||
const recordCount = 100
|
||||
defer os.Remove(filePath)
|
||||
|
||||
db, err := OpenWithOptions(filePath, Options{BufferSize: 100})
|
||||
assert.NoError(t, err)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
err = db.Set(i, i)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// try to read
|
||||
db, err = Open(filePath)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Len(t, db.dataOffset, recordCount)
|
||||
|
||||
for i := 1; i <= recordCount; i++ {
|
||||
var gotValue int
|
||||
|
||||
err = db.Get(i, &gotValue)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, i, gotValue)
|
||||
}
|
||||
|
||||
err = db.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user