diff --git a/defaults.go b/defaults.go index 959736c..ad476f8 100644 --- a/defaults.go +++ b/defaults.go @@ -1,8 +1,13 @@ package zkv -import "github.com/klauspost/compress/zstd" +import ( + "runtime" + + "github.com/klauspost/compress/zstd" +) var defaultOptions = Options{ - MaxParallelReads: 64, + MaxParallelReads: runtime.NumCPU(), CompressionLevel: zstd.SpeedDefault, + BufferSize: 4 * 1024 * 1024, } diff --git a/options.go b/options.go index dbbd27c..69b3c28 100644 --- a/options.go +++ b/options.go @@ -4,10 +4,13 @@ import "github.com/klauspost/compress/zstd" type Options struct { // Maximum number of concurrent reads - MaxParallelReads uint + MaxParallelReads int // Compression level CompressionLevel zstd.EncoderLevel + + // Write buffer size in bytes + BufferSize int } func (o *Options) setDefaults() { diff --git a/zkv.go b/zkv.go index 1dbc52b..7cc346d 100644 --- a/zkv.go +++ b/zkv.go @@ -11,12 +11,16 @@ import ( "github.com/klauspost/compress/zstd" ) -type Database struct { +type Store struct { dataOffset map[string]int64 - file *os.File - compressor *zstd.Encoder - filePath string - offset int64 + + file *os.File + filePath string + offset int64 + encoder *zstd.Encoder + + buffer *bytes.Buffer + bufferDataOffset map[string]int64 options Options @@ -25,91 +29,7 @@ type Database struct { mu sync.RWMutex } -func (db *Database) Close() error { - db.mu.Lock() - defer db.mu.Unlock() - - err := db.compressor.Close() - if err != nil { - return err - } - - return db.file.Close() -} - -func (db *Database) Set(key, value interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() - - record, err := newRecord(RecordTypeSet, key, value) - if err != nil { - return err - } - - b, err := record.Marshal() - if err != nil { - return err - } - - db.dataOffset[string(record.KeyHash[:])] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки - - _, err = db.compressor.Write(b) - if err != nil { - return err - } - - db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки - - return nil -} - -func (db *Database) Get(key, value interface{}) error { - db.mu.RLock() - defer db.mu.RUnlock() - - db.readOrderChan <- struct{}{} - defer func() { <-db.readOrderChan }() - - hashToFind, err := hashInterface(key) - if err != nil { - return err - } - - offset, exists := db.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(db.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) - if err != nil { - return err - } - - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку - } - - return decode(record.ValueBytes, value) -} - -func OpenWithOptions(filePath string, options Options) (*Database, error) { +func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) @@ -124,14 +44,16 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) { return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) } - database := &Database{ - dataOffset: make(map[string]int64), - offset: 0, - file: f, - compressor: compressor, - filePath: filePath, - options: options, - readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} + database := &Store{ + dataOffset: make(map[string]int64), + bufferDataOffset: make(map[string]int64), + offset: 0, + file: f, + encoder: compressor, + buffer: new(bytes.Buffer), + filePath: filePath, + options: options, + readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} // restore file data readF, err := os.Open(filePath) @@ -172,13 +94,27 @@ func OpenWithOptions(filePath string, options Options) (*Database, error) { return database, nil } -func Open(filePath string) (*Database, error) { +func Open(filePath string) (*Store, error) { return OpenWithOptions(filePath, defaultOptions) } -func (db *Database) Delete(key interface{}) error { - db.mu.Lock() - defer db.mu.Unlock() +func (s *Store) Set(key, value interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.set(key, value) +} + +func (s *Store) Get(key, value interface{}) error { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.get(key, value) +} + +func (s *Store) Delete(key interface{}) error { + s.mu.Lock() + defer s.mu.Unlock() keyHash, err := hashInterface(key) if err != nil { @@ -195,14 +131,158 @@ func (db *Database) Delete(key interface{}) error { return err } - delete(db.dataOffset, string(record.KeyHash[:])) + delete(s.dataOffset, string(record.KeyHash[:])) + delete(s.bufferDataOffset, string(record.KeyHash[:])) - _, err = db.compressor.Write(b) + _, err = s.buffer.Write(b) if err != nil { return err } - db.offset += int64(len(b)) + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) Flush() error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.flush() +} + +func (s *Store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + + err := s.flush() + if err != nil { + return err + } + + err = s.encoder.Close() + if err != nil { + return err + } + + return s.file.Close() +} + +func (s *Store) set(key, value interface{}) error { + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + b, err := record.Marshal() + if err != nil { + return err + } + + s.bufferDataOffset[string(record.KeyHash[:])] = int64(s.buffer.Len()) + + _, err = s.buffer.Write(b) + if err != nil { + return err + } + + if s.buffer.Len() > s.options.BufferSize { + err = s.flush() + + if err != nil { + return err + } + } + + return nil +} + +func (s *Store) get(key, value interface{}) error { + s.readOrderChan <- struct{}{} + defer func() { <-s.readOrderChan }() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := s.bufferDataOffset[string(hashToFind[:])] + if exists { + reader := bytes.NewReader(s.buffer.Bytes()) + + err = skip(reader, offset) + if err != nil { + return err + } + + _, record, err := readRecord(reader) + if err != nil { + return err + } + + return decode(record.ValueBytes, value) + } + + offset, exists = s.dataOffset[string(hashToFind[:])] + if !exists { + return ErrNotExists + } + + readF, err := os.Open(s.filePath) + if err != nil { + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind[:]), base64.StdEncoding.EncodeToString(record.KeyHash[:])) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) +} + +func (s *Store) flush() error { + l := int64(s.buffer.Len()) + + _, err := s.buffer.WriteTo(s.encoder) + if err != nil { + return err + } + + for key, val := range s.bufferDataOffset { + s.dataOffset[key] = val + s.offset + } + + s.bufferDataOffset = make(map[string]int64) + + s.offset += l + + err = s.encoder.Flush() + if err != nil { + return err + } return nil } diff --git a/zkv_test.go b/zkv_test.go index 36f6aac..a497a3c 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -20,7 +20,16 @@ func TestReadWriteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } err = db.Close() assert.NoError(t, err) @@ -91,12 +100,15 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) } - assert.Len(t, db.dataOffset, recordCount) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount) err = db.Delete(50) assert.NoError(t, err) - assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.dataOffset, 0) + assert.Len(t, db.bufferDataOffset, recordCount-1) + var value int err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -110,6 +122,8 @@ func TestDeleteBasic(t *testing.T) { assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount-1) + assert.Len(t, db.bufferDataOffset, 0) + value = 0 err = db.Get(50, &value) assert.Equal(t, 0, value) @@ -118,3 +132,78 @@ func TestDeleteBasic(t *testing.T) { err = db.Close() assert.NoError(t, err) } + +func TestBufferBasic(t *testing.T) { + const filePath = "TestBuffer.zkv" + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + err = db.Set(1, make([]byte, 100)) + assert.NoError(t, err) + + assert.NotEqual(t, 0, db.dataOffset) + assert.Len(t, db.bufferDataOffset, 0) + assert.Equal(t, 0, db.buffer.Len()) + + var gotValue []byte + err = db.Get(1, &gotValue) + assert.NoError(t, err) + + assert.Equal(t, make([]byte, 100), gotValue) + + err = db.Close() + assert.NoError(t, err) +} + +func TestBufferRead(t *testing.T) { + const filePath = "TestBufferRead.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + db, err := OpenWithOptions(filePath, Options{BufferSize: 100}) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) + +}