From 412ddb11a8f7be09e4f09f920312a8283d3e5414 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 18:16:23 +0500 Subject: [PATCH 1/3] Remove duplicated code --- zkv.go | 49 ++----------------------------------------------- 1 file changed, 2 insertions(+), 47 deletions(-) diff --git a/zkv.go b/zkv.go index 4706c28..de1263d 100644 --- a/zkv.go +++ b/zkv.go @@ -329,57 +329,12 @@ func (s *Store) get(key, value interface{}) error { return err } - offset, exists := s.bufferDataOffset[string(hashToFind[:])] - if exists { - reader := bytes.NewReader(s.buffer.Bytes()) - - err = skip(reader, offset) - if err != nil { - return err - } - - _, record, err := readRecord(reader) - if err != nil { - return err - } - - return decode(record.ValueBytes, value) - } - - offset, exists = s.dataOffset[string(hashToFind[:])] - if !exists { - return ErrNotExists - } - - readF, err := os.Open(s.filePath) - if err != nil { - return err - } - defer readF.Close() - - decompressor, err := zstd.NewReader(readF) - if err != nil { - return err - } - defer decompressor.Close() - - err = skip(decompressor, offset) + b, err := s.getGobBytes(hashToFind) if err != nil { return err } - _, record, err := readRecord(decompressor) - if err != nil { - return err - } - - if !bytes.Equal(record.KeyHash[:], hashToFind[:]) { - expectedHashStr := base64.StdEncoding.EncodeToString(hashToFind[:]) - gotHashStr := base64.StdEncoding.EncodeToString(record.KeyHash[:]) - return fmt.Errorf("wrong hash of offset %d: expected %s, got %s", offset, expectedHashStr, gotHashStr) - } - - return decode(record.ValueBytes, value) + return decode(b, value) } func (s *Store) flush() error { From 5f0d33828f6b611cb8bcdfbfcabedb0981937ef7 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 21:00:36 +0500 Subject: [PATCH 2/3] Improve store read speed by skipping store blocks --- TestBufferRead.zkv.idx | Bin 0 -> 131 bytes TestSmallWrites.zkv.idx | Bin 0 -> 3465 bytes defaults.go | 2 +- options.go | 4 +++- zkv.go | 38 ++++++++++++++++++++++++-------------- zkv_test.go | 15 ++++++++++++--- 6 files changed, 40 insertions(+), 19 deletions(-) create mode 100644 TestBufferRead.zkv.idx create mode 100644 TestSmallWrites.zkv.idx diff --git a/TestBufferRead.zkv.idx b/TestBufferRead.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..ced4e784f1da329060e7333b77539b4bdd131895 GIT binary patch literal 131 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr1_qb^9Slq|6TYa57L>+(+4J+>Y^Xrv#WBkYHzyJUezb&5t literal 0 HcmV?d00001 diff --git a/TestSmallWrites.zkv.idx b/TestSmallWrites.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..15a6d389fca5b0bafaacc87682cfb2e099a9f8ad GIT binary patch literal 3465 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j#3rvGgWj7*H&PC5C>+5Ty1#i=EXEDVf1L8;04 zMJaFr28MsU9{)QSQe^V#`j-nt@beyD^5?)k^EZoISG`s^z~|w(VEWV|#(x=47-W>L zTKuZdbq{~?aBhJ{SVMNP>}|nK$4uvb$+S7m_^+UeLB^lse|ONP^bIG|?>_y`knwM= z81E~Y2vupV*v~5&|CKf{$n<)+Mf}vA7bc*!beG-g3(q?*s9OG+@$669AtMpSe*zf{ zGX6>3A4<|E|4)@kF1mlqy)*T5Wm=QZzYB>G$w7?&?9VaC1ess?HM9Omr27r!1ob!j zSL-Yiy!z73(r+&FZ3)JIybl;;7AIvsOkolVT(>ge^b6g|FDCbtAIc7!X&1Nh-9E;D zWrrAK@)9RR7r%6yV|QeA!7F*rPW9Yu1HtIBIhu@q#~A-9Y+#VFd31Z-v`s(q_IFp8 zJl@ct9#H@JQPzg4%nshYIgI~u)-lKwm*-x5*?y}2s@12UkNUsmWJRWEi94-#JnU{f zf$?AZ5(XKSD`KtuXN%k|HY{-~7QE8+yrJNvhFib#U(@nj#(xzY3^GBc3&hL|cLi?M zd|{<^(YEAUBD;_4f#rfNfjbv6{xdzsAT!G)txWdR@AFH)Ol@DzWw~nV$HJSlPo3_4 z>TU1B_|N|ygN#?P+GM?s+KNm!e#ig%PmXyhND9s=a=r8Mm-7wAf2z|MWKyFg&TehC z-5xzjF7ZgN(e2Eee^M;9W27yHS|1cWfD{PnkE)> z&&+JIIL7!db{2z-X6bS5g2j3feA4bG=HyOTaJ_biBT=o|N=iNrmBK#_@;CUw?Yf_%Gjz zK}NClgWxqzofmx)sgBRK2o#`T^s=m=Fe;V_Q5I{a7v)_}k(3&B?8er*}L$U827C zKhJaF4q3*37Jdve@+q2+WY51io*^eAc*EpL&W@wW7ux2v#TOlC3ugSM>Bk_mhvCrf zr$46s?=mo260h})KYNkxvkT8l)EPsBR2l!J8ZpScjoM=Uw^;f_l34fatg8adO!pR- zil(F{v3jUFGXBdhVUS^zU-Mh&y;t&FC;9&KdB1msM89EOm)~Q`oOfvlpsYEcy zOuc#ke7V~C$&R+PEG(z-Kx(*o6)IUl~)#?E21S5d zTYQh3H0487wEioWFvfpYrx;|mN%(y{`CsecKDp^-iQX=Y?{Du}|N7Ro6^wr+dKmv1 zaWTlq_i^6H?T;+K#=UMncZbub7ZNAZY+B~>&D?+HDC55b2?m+`@-CB%1&be^I=xJN z;g0@IJa3m7?C0#iwt{W)KgNIDaSSrjDiva4vrep(o|U!f?`j!-w%iTrcDAh-UN_y@ z%J|QQhe5_Jl-n)yF&obnMQIz|`<+v^-RWO#Z6vVh5}o^fhw9QxH<=mtiZ&bWN%v>`=edDFM!PlhnW9do zsCHc<@07bwW~fZbxq2tDXydWAy-bY%#Gf(9@V;cYw8~61tH~jQO=-(g&rADGFS9ts zki9ePhcn|pzc~yt?bGxUmYti+P|Bgn!ho`~Fn(?2U4uj0rhnsmLeh53a+pO<+dh@cASNq!C zF$PEbGym$pXZ**nz##Ljd234O=@rb?QtQvA%P3peZQijv_3?`rlbZI{GX9fFVUStz z^P-sVtFL;NxjQR8{aBt)S-Weg+uEaniJvYnXZ#m=hCxPx>#z8{I|)tO;tKj!#AsGM zeY)Oh;?B3>zGn9G82|CiV~~k|8a(+0cN)(LdoBNjK)vs$_CL&DU~5vdGQC}n@t>py zgG`}~r>R)qbV&)8?KzT_-|r}v$Gwid{D&dMNBalkzn}~T86F+|p#7o(?n~G67^iLc zv1;0#;O{H6kG4d2Fw9~6SAC8_M#9=7{k@#0<2~yphf>CW z24@&#T)&-I8I>yOyY!5j%%S`G?^JdwSS3!p>a;u0T88mo+zkeqPn(_2MT(p$yzd~; z_NqkYqf~c?!EK{wPc+Xw?qmEHyof>O&v%X8+K-p^Z$ExVPh+kT!)L+Ek9Y1fTmJv7 zhAQJfuQv=bjWXee7gsR9oa23HPjB?J;PdNO=)clF`1+}1$Y;iXfpQEoY?JQ({qsY$ zuJ8PIE|2=RUmtut!QL~;eWzr$L-jQ{j|7-VK2EG(8uT-|RHJ!`^|_DZ9UYO75T zJoRU2aV%fN_)l;RgG_VF$}{D6N>m`e0FVZ`ZUpx+o#OqpLd+`U$h8= z%$-?BnR#`-8c&oxrpcOImA|Q?u%*y^1*hx>O(({Gg@+hqCSLO1b70+?Nz2uZ_NPjQ zzw7eQFTC%&qFJncZyu_un}-?|e$p z+4s`gRuiMP*e}lvKau%4k@25%6@v^HgTy80l3$s%Y2Vi0y&U8GBj<(PhY=JNDY9rKN>eIGx2etX~YJN3fNI-5()jQ^xpG02oNT`zaa?hiWi zx%hE)b(im=@*+a}ou45^6+@O=G?s4bx>rDq4|5^4h$Xuvc$@^{E zq@;%@*^Q+3Je?WOv{^}|U;XTHc!CT57% z_Ihe}H5`c7SityC=pBPh$?x*$1=kNKyzSaoVeZh_arwsj_at`A^b0!9v z3rzlN&9?qHwaWa>9EWEo57fxbn9OMMoip;>|38fXWS%g{ggToyCOO#WeLE)Ko$&1Y zGpm;l-OCcV%KvW(d&u}t@eYGbRL$A8E23ur79=ur$+RwS3Q&!=@~9T+&m`7cu@5kztUzyk%YbkMOH;aqHt8 zss;R=*4~x*&#Zd=-0Z2kdW`?Lcjnt~NVcW&HikyMv7XnD;OM0LXbj0{{R3 literal 0 HcmV?d00001 diff --git a/defaults.go b/defaults.go index d263685..cb4a086 100644 --- a/defaults.go +++ b/defaults.go @@ -11,7 +11,7 @@ var defaultOptions = Options{ CompressionLevel: zstd.SpeedDefault, MemoryBufferSize: 4 * 1024 * 1024, DiskBufferSize: 1 * 1024 * 1024, - UseIndexFile: false, + useIndexFile: true, } const indexFileExt = ".idx" diff --git a/options.go b/options.go index f42f14f..148992b 100644 --- a/options.go +++ b/options.go @@ -16,10 +16,12 @@ type Options struct { DiskBufferSize int // Use index file - UseIndexFile bool + useIndexFile bool } func (o *Options) setDefaults() { + o.useIndexFile = true // TODO: implement database search without index + if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } diff --git a/zkv.go b/zkv.go index de1263d..1c4d543 100644 --- a/zkv.go +++ b/zkv.go @@ -14,11 +14,15 @@ import ( "github.com/klauspost/compress/zstd" ) +type Offsets struct { + BlockOffset int64 + RecordOffset int64 +} + type Store struct { - dataOffset map[string]int64 + dataOffset map[string]Offsets filePath string - offset int64 buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -34,15 +38,14 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() database := &Store{ - dataOffset: make(map[string]int64), + dataOffset: make(map[string]Offsets), bufferDataOffset: make(map[string]int64), - offset: 0, buffer: new(bytes.Buffer), filePath: filePath, options: options, readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - if options.UseIndexFile { + if options.useIndexFile { idxFile, err := os.Open(filePath + indexFileExt) if err == nil { err = gob.NewDecoder(idxFile).Decode(&database.dataOffset) @@ -80,7 +83,7 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { switch record.Type { case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset + database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset case RecordTypeDelete: delete(database.dataOffset, string(record.KeyHash[:])) } @@ -283,7 +286,7 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { return record.ValueBytes, nil } - offset, exists = s.dataOffset[string(keyHash[:])] + offsets, exists := s.dataOffset[string(keyHash[:])] if !exists { return nil, ErrNotExists } @@ -294,13 +297,18 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } defer readF.Close() + _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) + if err != nil { + return nil, err + } + decompressor, err := zstd.NewReader(readF) if err != nil { return nil, err } defer decompressor.Close() - err = skip(decompressor, offset) + err = skip(decompressor, offsets.RecordOffset) if err != nil { return nil, err } @@ -317,7 +325,6 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } return record.ValueBytes, nil - } func (s *Store) get(key, value interface{}) error { @@ -344,13 +351,18 @@ func (s *Store) flush() error { if err != nil { return fmt.Errorf("open store file: %v", err) } + stat, err := f.Stat() + if err != nil { + f.Close() + return fmt.Errorf("stat store file: %v", err) + } diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) if err != nil { f.Close() - return fmt.Errorf("open store file: %v", err) + return fmt.Errorf("init encoder: %v", err) } _, err = s.buffer.WriteTo(encoder) @@ -359,13 +371,11 @@ func (s *Store) flush() error { } for key, val := range s.bufferDataOffset { - s.dataOffset[key] = val + s.offset + s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} } s.bufferDataOffset = make(map[string]int64) - s.offset += l - err = encoder.Close() if err != nil { // TODO: truncate file to previous state @@ -384,7 +394,7 @@ func (s *Store) flush() error { } // Update index file only on data update - if s.options.UseIndexFile && l > 0 { + if s.options.useIndexFile && l > 0 { idxBuf := new(bytes.Buffer) err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) diff --git a/zkv_test.go b/zkv_test.go index b10a2a2..4ec0d41 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -39,6 +39,7 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -84,6 +85,7 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -119,6 +121,7 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -164,6 +167,7 @@ func TestDeleteBasic(t *testing.T) { func TestBufferBasic(t *testing.T) { const filePath = "TestBuffer.zkv" defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -187,8 +191,9 @@ func TestBufferBasic(t *testing.T) { func TestBufferRead(t *testing.T) { const filePath = "TestBufferRead.zkv" - const recordCount = 100 + const recordCount = 2 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -241,7 +246,9 @@ func TestBackupBasic(t *testing.T) { const newFilePath = "TestBackupBasic2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -280,7 +287,9 @@ func TestBackupWithDeletedRecords(t *testing.T) { const newFilePath = "TestBackupWithDeletedRecords2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -335,7 +344,7 @@ func TestIndexFileBasic(t *testing.T) { defer os.Remove(filePath) defer os.Remove(filePath + indexFileExt) - db, err := OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err := Open(filePath) assert.NoError(t, err) for i := 1; i <= recordCount; i++ { @@ -358,7 +367,7 @@ func TestIndexFileBasic(t *testing.T) { assert.NoError(t, err) // try to read - db, err = OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err = Open(filePath) assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount) From d950b6546c1fba17c5ee980e4235f3d424b7ac98 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 21:33:51 +0500 Subject: [PATCH 3/3] Add notes about current store state --- README.md | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 5ed2b58..4474fec 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,17 @@ Simple key-value store for single-user applications. ## Pros -* Simple one file structure +* Simple two file structure (data file and index file) * Internal Zstandard compression by [klauspost/compress/zstd](https://github.com/klauspost/compress/tree/master/zstd) * Threadsafe operations through `sync.RWMutex` ## Cons * Index stored in memory (`map[key hash (28 bytes)]file offset (int64)`) -* Need to read the whole file on store open to create file index (you can use index file options to avoid this) +* No transaction system +* Index file is fully rewrited on every store commit * No way to recover disk space from deleted records * Write/Delete operations block Read and each other operations -* Need to decode whole file until stored value ## Usage @@ -63,9 +63,6 @@ type Options struct { // Disk write buffer size in bytes DiskBufferSize int - - // Use index file - UseIndexFile bool } ``` @@ -87,6 +84,17 @@ File is log stuctured list of commands: | Length | Record body bytes length | int64 | | Body | Gob-encoded record | variable | +Index file is simple gob-encoded map: + +```go +map[string]struct { + BlockOffset int64 + RecordOffset int64 +} +``` + +where map key is data key hash and value - data offset in data file. + ## Resource consumption Store requirements: @@ -97,4 +105,4 @@ Store requirements: ## TODO - [ ] Add recovery previous state of store file on write error -- [ ] Add fast file seek to value (add compressed block start position) +- [ ] Add method for index rebuild