1
0
mirror of https://github.com/nxshock/zkv.git synced 2024-11-27 11:21:02 +05:00

Add option for limit a number of parallel reads

This commit is contained in:
nxshock 2022-12-03 12:40:36 +05:00
parent b5043e1319
commit 350634de38
3 changed files with 44 additions and 6 deletions

5
defaults.go Normal file
View File

@ -0,0 +1,5 @@
package zkv
var defaultOptions = Options{
MaxParallelReads: 64,
}

13
options.go Normal file
View File

@ -0,0 +1,13 @@
package zkv
type Options struct {
MaxParallelReads uint
}
func (o *Options) Validate() error {
if o.MaxParallelReads == 0 {
o.MaxParallelReads = defaultOptions.MaxParallelReads
}
return nil
}

32
zkv.go
View File

@ -18,6 +18,10 @@ type Database struct {
filePath string filePath string
offset int64 offset int64
options Options
readOrderChan chan struct{}
mu sync.RWMutex mu sync.RWMutex
} }
@ -63,6 +67,9 @@ func (db *Database) Get(key, value interface{}) error {
db.mu.RLock() db.mu.RLock()
defer db.mu.RUnlock() defer db.mu.RUnlock()
db.readOrderChan <- struct{}{}
defer func() { <-db.readOrderChan }()
hashToFind, err := hashInterface(key) hashToFind, err := hashInterface(key)
if err != nil { if err != nil {
return err return err
@ -102,23 +109,31 @@ func (db *Database) Get(key, value interface{}) error {
return decode(record.ValueBytes, value) return decode(record.ValueBytes, value)
} }
func Open(filePath string) (*Database, error) { func OpenWithOptions(filePath string, options Options) (*Database, error) {
f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
f.Close()
return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err)
} }
if options.Validate() != nil {
return nil, err
}
compressor, err := zstd.NewWriter(f) compressor, err := zstd.NewWriter(f)
if err != nil { if err != nil {
f.Close()
return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err)
} }
database := &Database{ database := &Database{
dataOffset: make(map[string]int64), dataOffset: make(map[string]int64),
offset: 0, offset: 0,
file: f, file: f,
compressor: compressor, compressor: compressor,
filePath: filePath} filePath: filePath,
options: options,
readOrderChan: make(chan struct{}, int(options.MaxParallelReads))}
// restore file data // restore file data
readF, err := os.Open(filePath) readF, err := os.Open(filePath)
@ -130,6 +145,7 @@ func Open(filePath string) (*Database, error) {
decompressor, err := zstd.NewReader(readF) decompressor, err := zstd.NewReader(readF)
if err != nil { if err != nil {
f.Close()
return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err)
} }
defer decompressor.Close() defer decompressor.Close()
@ -158,6 +174,10 @@ func Open(filePath string) (*Database, error) {
return database, nil return database, nil
} }
func Open(filePath string) (*Database, error) {
return OpenWithOptions(filePath, defaultOptions)
}
func (db *Database) Delete(key interface{}) error { func (db *Database) Delete(key interface{}) error {
db.mu.Lock() db.mu.Lock()
defer db.mu.Unlock() defer db.mu.Unlock()