From e166e07daaa21ced22f35fcb387dc8fd6bc13d63 Mon Sep 17 00:00:00 2001 From: nxshock Date: Wed, 7 Dec 2022 21:06:36 +0500 Subject: [PATCH] Do not block file for writes --- README.md | 1 + zkv.go | 52 ++++++++++++++++++++++++---------------------------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index b66f815..0622264 100644 --- a/README.md +++ b/README.md @@ -68,3 +68,4 @@ File is log stuctured list of commands: - [ ] Test [seekable zstd streams](https://github.com/SaveTheRbtz/zstd-seekable-format-go) - [ ] Implement optional separate index file to speedup store initialization +- [ ] Add recovery previous state of store file on write error diff --git a/zkv.go b/zkv.go index d1bedae..1bbd4b4 100644 --- a/zkv.go +++ b/zkv.go @@ -15,10 +15,8 @@ import ( type Store struct { dataOffset map[string]int64 - file *os.File filePath string offset int64 - encoder *zstd.Encoder buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -33,24 +31,10 @@ type Store struct { func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() - f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - f.Close() - return nil, fmt.Errorf("open store file: %v", err) - } - - compressor, err := zstd.NewWriter(f) - if err != nil { - f.Close() - return nil, fmt.Errorf("compressor initialization: %v", err) - } - database := &Store{ dataOffset: make(map[string]int64), bufferDataOffset: make(map[string]int64), offset: 0, - file: f, - encoder: compressor, buffer: new(bytes.Buffer), filePath: filePath, options: options, @@ -58,15 +42,16 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { // restore file data readF, err := os.Open(filePath) - if err != nil { - f.Close() + if os.IsNotExist(err) { + // Empty datebase + return database, nil + } else if err != nil { return nil, fmt.Errorf("open file for indexing: %v", err) } defer readF.Close() decompressor, err := zstd.NewReader(readF) if err != nil { - f.Close() return nil, fmt.Errorf("decompressor initialization: %v", err) } defer decompressor.Close() @@ -78,7 +63,6 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { break } if err != nil { - f.Close() return nil, fmt.Errorf("read record error: %v", err) } @@ -204,12 +188,7 @@ func (s *Store) Close() error { return err } - err = s.encoder.Close() - if err != nil { - return err - } - - return s.file.Close() + return nil } func (s *Store) setBytes(keyHash [sha256.Size224]byte, valueBytes []byte) error { @@ -393,7 +372,18 @@ func (s *Store) get(key, value interface{}) error { func (s *Store) flush() error { l := int64(s.buffer.Len()) - _, err := s.buffer.WriteTo(s.encoder) + f, err := os.OpenFile(s.filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open store file: %v", err) + } + + encoder, err := zstd.NewWriter(f, zstd.WithEncoderLevel(s.options.CompressionLevel)) + if err != nil { + f.Close() + return fmt.Errorf("open store file: %v", err) + } + + _, err = s.buffer.WriteTo(encoder) if err != nil { return err } @@ -406,7 +396,13 @@ func (s *Store) flush() error { s.offset += l - err = s.encoder.Flush() + err = encoder.Close() + if err != nil { + // TODO: truncate file to previous state + return err + } + + err = f.Close() if err != nil { return err }