From 350634de38c1afd514292b89be1e8037da3e5cea Mon Sep 17 00:00:00 2001 From: nxshock Date: Sat, 3 Dec 2022 12:40:36 +0500 Subject: [PATCH] Add option for limit a number of parallel reads --- defaults.go | 5 +++++ options.go | 13 +++++++++++++ zkv.go | 32 ++++++++++++++++++++++++++------ 3 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 defaults.go create mode 100644 options.go diff --git a/defaults.go b/defaults.go new file mode 100644 index 0000000..2e3437f --- /dev/null +++ b/defaults.go @@ -0,0 +1,5 @@ +package zkv + +var defaultOptions = Options{ + MaxParallelReads: 64, +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..7105683 --- /dev/null +++ b/options.go @@ -0,0 +1,13 @@ +package zkv + +type Options struct { + MaxParallelReads uint +} + +func (o *Options) Validate() error { + if o.MaxParallelReads == 0 { + o.MaxParallelReads = defaultOptions.MaxParallelReads + } + + return nil +} diff --git a/zkv.go b/zkv.go index 4df780b..f12364c 100644 --- a/zkv.go +++ b/zkv.go @@ -18,6 +18,10 @@ type Database struct { filePath string offset int64 + options Options + + readOrderChan chan struct{} + mu sync.RWMutex } @@ -63,6 +67,9 @@ func (db *Database) Get(key, value interface{}) error { db.mu.RLock() defer db.mu.RUnlock() + db.readOrderChan <- struct{}{} + defer func() { <-db.readOrderChan }() + hashToFind, err := hashInterface(key) if err != nil { return err @@ -102,23 +109,31 @@ func (db *Database) Get(key, value interface{}) error { return decode(record.ValueBytes, value) } -func Open(filePath string) (*Database, error) { +func OpenWithOptions(filePath string, options Options) (*Database, error) { f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) } + if options.Validate() != nil { + return nil, err + } + compressor, err := zstd.NewWriter(f) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath} + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -130,6 +145,7 @@ func Open(filePath string) (*Database, error) { decompressor, err := zstd.NewReader(readF) if err != nil { + f.Close() return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) } defer decompressor.Close() @@ -158,6 +174,10 @@ func Open(filePath string) (*Database, error) { return database, nil } +func Open(filePath string) (*Database, error) { + return OpenWithOptions(filePath, defaultOptions) +} + func (db *Database) Delete(key interface{}) error { db.mu.Lock() defer db.mu.Unlock()