From 5f0d33828f6b611cb8bcdfbfcabedb0981937ef7 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 11 Dec 2022 21:00:36 +0500 Subject: [PATCH] Improve store read speed by skipping store blocks --- TestBufferRead.zkv.idx | Bin 0 -> 131 bytes TestSmallWrites.zkv.idx | Bin 0 -> 3465 bytes defaults.go | 2 +- options.go | 4 +++- zkv.go | 38 ++++++++++++++++++++++++-------------- zkv_test.go | 15 ++++++++++++--- 6 files changed, 40 insertions(+), 19 deletions(-) create mode 100644 TestBufferRead.zkv.idx create mode 100644 TestSmallWrites.zkv.idx diff --git a/TestBufferRead.zkv.idx b/TestBufferRead.zkv.idx new file mode 100644 index 0000000000000000000000000000000000000000..ced4e784f1da329060e7333b77539b4bdd131895 GIT binary patch literal 131 zcmd=8-_F9w^uL3Fk%#er8v}#x|5j!ozm0*BiILkWCqFscKP{~|wSXhCVrmpwo4P5!~lR}tMfHE*@|3-jq2j#5a|!?5xIDQpieV6eT2kC40P) z%n+gSW*ooguiu}~^SPh%+}CyA*LB~AbtjSz((FV5$OLyH0Fc=UrzM^v0HlF?wcWk! z-He@_d>#ED9T8#bXz%6Y@c$P8U}gj2QwQW3l90=0!OGm6y;V1+w3roMupm&!VxX0I zKh+0fmnndR`jt0sdgz-^O+3CTZg$Jfmv@xCv`O*tFS2?*QBGTyTjV_NE5((3$W zdL6jnB2MOcWFg8Ud1WmhV*gtJ#Tn>X{FTWtW0TBzt5MiJd$(IuW$VGrmQ%gLeu&vz zh%@YBe)>5l?>HiRpYaj>yN+wuokGvxZd(g`Q;1J=0c5JwxAE|1!zKM8VH>f<+Cu4P z?ET;MRE!?ej&ecF{1u=qJMx4B%|4Tt`8MryGRbqvu>tjNW)C&2@|S8LzE%&Ar)?t1 z_q$%YMnmD%d47hwVjgbt?4)by640m#Vu2EX)W40sOe>{&*2V<;O_tmeJ9l&Kn`=oB zIg0tcJH+lU0rCy-`20Px?PkB~s_BZ{CLizq6v+eH#abWrk0e6uoDGmjA7?mgr;omJ zNw%IZdtdbIt*fo#dI`eYiUA%F2ks%TC}wghdA~KOkeE}I{CwQ+kL~U=x^=niVJ1~i zAy#Yx=+Q~1YrJinpK^YsM&>fA6r`?r5C7NJ9yfjZ#7T&a#{fFzdnj2p>P8@0k5y|& zZmQ{&05^NOk1l0pLuUwL(KLV@NnD*3caFayCGpudcqojLhqoM5q%3)qf0_G2e5e~B zB`)q1iT`YjN7~lJWesj1TL+fVdskx~k|UG}KCV0hNW#Ba>S~s(1q+XUOS(s5=HQJ= z?iURI^664N3?R1u1CW>#C|?g+c;Fl`o*&dj%O9z9Q|sgNm`^u?4vs=Bdj_ChAwFd7 z{zPE#v~0E38$CwBS2JFL_VzqYt!LLEwnzl%$Iv22ewN5{g9d56r(~hy4P$br^xEnv z1A}n_5YrQkvHaBfC6$^M`SFOCtY0hby=9?-uYGpS2LykPWI=o~4v|Q61C*0EVD(WjVy;ZWA zfZ2ndw#sp9Pmaw#fLLD|pat7^IDTn#v8IQF9NPY-)}UCx_o|ul&~TC-#1aY4)EeUWk>6zwkRqeB=Ez*X+WF z*oTfzxmPyYcSmGISo<{ZIuEe~5m625-%V4~cB17KvaKa&Slyn<%yiHCiNOo|L?L!m zAgX`4Ty5Kzr^SvlX2G?ejh1FC%b&x+(T?6gR10D^KY$?ri_Lw@r|chV^Cx`r+9WK+NwB(D&d;W(NH`C`r_GU3Y|}Awu30m3E49 zlRl~9GsGem0HqF(e+oEMoSghSOAvL_g;$YEGes@af3DxSk`A$50YDzJq(g4=6489` z3i$6B>i_1(_thD=J?o=sA{rn*q76{uFHw%G{+7S0|BfY7nOOo!w^Ka}PtPgcchNdd zWJ&iVkEMD{C5=s*i4F=~S~RQgHqf8iUP&n+l%P%YDCcxqw|HIytx>PZTj!3vo674} zNhv=sljP><%pg{61L!rE(Ms!%Kw*WFp5j&mjK|S;?o;ot^-MJAaIuUo7yI1=Pe+<#Z zyNK?opora;m2S!C*U~awu-NT1z?*uq=@QfHo*FXY$3d|~Z)EzNr{PRuBeu}NntEPzeG!P${0f_lK z=qWfR>Kdx)vP-Bu$FQfS{kd`*aH}$->OgFi4p3y8tj+VT$H0G&_-W;Y!}5|LX2V`S z%2pCp@_7(T^#L^JD}03OujiKsWk%2P=%1dNVpzSld*>Gu(=7!xi20-mEE8qS7Sw-q zBGrqdriXjAPem5Jvy^X4AaBbN{=+H&(4RXM4j0?=XoC+HcRC|sWsS1RcaD>Db4j7^ z31>Ly08rlF&zxuHf6JxT|mNTv8VAIbjW;)_jLM=$6 zoL|qAY78Sq0YVAET|^#g1{b>IsH#VNTV@uSC$|fuotOCxwZ_y!HT@x$Co&k4&p!ibYhC=E)MQS=n`Vlv_+tgI#@;(wRdXzNr&FA8ms5*`>m&2SpYv$Hr=PB` zjw7X=|5ThOH!t;JVOs0L8pI}i0PRZpvb{wW4T=BshSA{WkKbQcT6V`Ko$RyW5-EpR zm<^z?un&SXF@ql6MF%hLw|qsXBq0~bRWW5|6vwRxG1LU;Rr-K{ZB8Be+_XJYO~mmm zV@HOEG#w#Us%uMe5NpH|v(~+>NJgQxmoeP^RI?hLdnl_*ht9y#8~jo7EW~m|#D5>W zeUWWp!co$r#D>W03Q@tTrC$x@9!T!VeFic63xMu~<#z;7{6xOF>5?Pq=LHo8R&9+R z9vc>*)DUQ|8vs;XPHjywYP+57_&&|G>!O;Tf2ZfFuu`KuLu4q#XATjQWAt3?Wc!|; zP5C{)4+sv<$@2whR;rCHhx#cJ`wIu5?4lH^Vx-CtE5(#sC_$%nW}wJ9jbr6aN(O62 zGsL9*08t(_(lSf`K5~z@NrK)!$g4EaJIq@tkAe56gf_(9#HMwx=X7=5%NI$xVhXj6 z+~!Ns26EoxXUoUz`Wc8kB;$!Qbh`#s>twjy?kH_fG-iH|^`xrt(e3(JMkCk-@zGF# zKFVh|2D6<%{4y(KmOGUJ2Fr*~ht4`x@#tP4Ci+<$fUZ>duBsfqazEpfgHPu&kCf`Y z%jG9>$>uHOHCu>vf&gL!Ts=B|8{`{Ke~Q2KTIx)>_B(G|p5u$On`D952Lt3{=dQRq zb8>n$gm%caW@XLr&vMwNWA|`~de0q*c?eGrpcxF%c1tkrSo56>4vs$iEWn3pp1 zPdrBnx71xC_PHNJ3Kyu>6w-ByP>HPIx3-UE6La(wI9;Q{4zZIfKm*DW2SQ1?4$b}{ z6q&LRI)AMaX|k9BrFWt91H>vs^}7S}ng66E*-fHs~td~akWI`s4Ynjba6f2zKqy-aF%%gT+%Oszd2wk7!JUCAESFbd1A)PM5Zk7M?& zyY>UFFX8iz;_{0JAvPd%dyyg$9a+KM`c*usr$=?5#G{x2gQbAmwk{ku;tZvtZ`E0gXF|M>K|K*}7f6ZjEb6R4zlJF0i6ICtf$ z>C^SZRzmwqGz1nJkOB=cD5i!#I53@j?!&A%@dw~ zn)qXEX4Yp^ziY-kw_yy}DL0#dSdaqH<$pUPnprE06eG6+lczHGcH7)Paz6TdyYC~i zFT|Xg0IdWCJnee`OX#!o6LZMQh5WI79~J5N7$ycL!n*V!y2w jU2X}NuB|BAC`Pm18^=<4?iC*E3_7yDxI|Qrwi^5o$az2m literal 0 HcmV?d00001 diff --git a/defaults.go b/defaults.go index d263685..cb4a086 100644 --- a/defaults.go +++ b/defaults.go @@ -11,7 +11,7 @@ var defaultOptions = Options{ CompressionLevel: zstd.SpeedDefault, MemoryBufferSize: 4 * 1024 * 1024, DiskBufferSize: 1 * 1024 * 1024, - UseIndexFile: false, + useIndexFile: true, } const indexFileExt = ".idx" diff --git a/options.go b/options.go index f42f14f..148992b 100644 --- a/options.go +++ b/options.go @@ -16,10 +16,12 @@ type Options struct { DiskBufferSize int // Use index file - UseIndexFile bool + useIndexFile bool } func (o *Options) setDefaults() { + o.useIndexFile = true // TODO: implement database search without index + if o.MaxParallelReads == 0 { o.MaxParallelReads = defaultOptions.MaxParallelReads } diff --git a/zkv.go b/zkv.go index de1263d..1c4d543 100644 --- a/zkv.go +++ b/zkv.go @@ -14,11 +14,15 @@ import ( "github.com/klauspost/compress/zstd" ) +type Offsets struct { + BlockOffset int64 + RecordOffset int64 +} + type Store struct { - dataOffset map[string]int64 + dataOffset map[string]Offsets filePath string - offset int64 buffer *bytes.Buffer bufferDataOffset map[string]int64 @@ -34,15 +38,14 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { options.setDefaults() database := &Store{ - dataOffset: make(map[string]int64), + dataOffset: make(map[string]Offsets), bufferDataOffset: make(map[string]int64), - offset: 0, buffer: new(bytes.Buffer), filePath: filePath, options: options, readOrderChan: make(chan struct{}, int(options.MaxParallelReads))} - if options.UseIndexFile { + if options.useIndexFile { idxFile, err := os.Open(filePath + indexFileExt) if err == nil { err = gob.NewDecoder(idxFile).Decode(&database.dataOffset) @@ -80,7 +83,7 @@ func OpenWithOptions(filePath string, options Options) (*Store, error) { switch record.Type { case RecordTypeSet: - database.dataOffset[string(record.KeyHash[:])] = offset + database.dataOffset[string(record.KeyHash[:])] = Offsets{} // offset case RecordTypeDelete: delete(database.dataOffset, string(record.KeyHash[:])) } @@ -283,7 +286,7 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { return record.ValueBytes, nil } - offset, exists = s.dataOffset[string(keyHash[:])] + offsets, exists := s.dataOffset[string(keyHash[:])] if !exists { return nil, ErrNotExists } @@ -294,13 +297,18 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } defer readF.Close() + _, err = readF.Seek(offsets.BlockOffset, io.SeekStart) + if err != nil { + return nil, err + } + decompressor, err := zstd.NewReader(readF) if err != nil { return nil, err } defer decompressor.Close() - err = skip(decompressor, offset) + err = skip(decompressor, offsets.RecordOffset) if err != nil { return nil, err } @@ -317,7 +325,6 @@ func (s *Store) getGobBytes(keyHash [sha256.Size224]byte) ([]byte, error) { } return record.ValueBytes, nil - } func (s *Store) get(key, value interface{}) error { @@ -344,13 +351,18 @@ func (s *Store) flush() error { if err != nil { return fmt.Errorf("open store file: %v", err) } + stat, err := f.Stat() + if err != nil { + f.Close() + return fmt.Errorf("stat store file: %v", err) + } diskWriteBuffer := bufio.NewWriterSize(f, s.options.DiskBufferSize) encoder, err := zstd.NewWriter(diskWriteBuffer, zstd.WithEncoderLevel(s.options.CompressionLevel)) if err != nil { f.Close() - return fmt.Errorf("open store file: %v", err) + return fmt.Errorf("init encoder: %v", err) } _, err = s.buffer.WriteTo(encoder) @@ -359,13 +371,11 @@ func (s *Store) flush() error { } for key, val := range s.bufferDataOffset { - s.dataOffset[key] = val + s.offset + s.dataOffset[key] = Offsets{BlockOffset: stat.Size(), RecordOffset: val} } s.bufferDataOffset = make(map[string]int64) - s.offset += l - err = encoder.Close() if err != nil { // TODO: truncate file to previous state @@ -384,7 +394,7 @@ func (s *Store) flush() error { } // Update index file only on data update - if s.options.UseIndexFile && l > 0 { + if s.options.useIndexFile && l > 0 { idxBuf := new(bytes.Buffer) err = gob.NewEncoder(idxBuf).Encode(s.dataOffset) diff --git a/zkv_test.go b/zkv_test.go index b10a2a2..4ec0d41 100644 --- a/zkv_test.go +++ b/zkv_test.go @@ -39,6 +39,7 @@ func TestReadWriteBasic(t *testing.T) { const filePath = "TestReadWriteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -84,6 +85,7 @@ func TestSmallWrites(t *testing.T) { const filePath = "TestSmallWrites.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) for i := 1; i <= recordCount; i++ { db, err := Open(filePath) @@ -119,6 +121,7 @@ func TestDeleteBasic(t *testing.T) { const filePath = "TestDeleteBasic.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -164,6 +167,7 @@ func TestDeleteBasic(t *testing.T) { func TestBufferBasic(t *testing.T) { const filePath = "TestBuffer.zkv" defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -187,8 +191,9 @@ func TestBufferBasic(t *testing.T) { func TestBufferRead(t *testing.T) { const filePath = "TestBufferRead.zkv" - const recordCount = 100 + const recordCount = 2 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) db, err := OpenWithOptions(filePath, Options{MemoryBufferSize: 100}) assert.NoError(t, err) @@ -241,7 +246,9 @@ func TestBackupBasic(t *testing.T) { const newFilePath = "TestBackupBasic2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -280,7 +287,9 @@ func TestBackupWithDeletedRecords(t *testing.T) { const newFilePath = "TestBackupWithDeletedRecords2.zkv" const recordCount = 100 defer os.Remove(filePath) + defer os.Remove(filePath + indexFileExt) defer os.Remove(newFilePath) + defer os.Remove(newFilePath + indexFileExt) db, err := Open(filePath) assert.NoError(t, err) @@ -335,7 +344,7 @@ func TestIndexFileBasic(t *testing.T) { defer os.Remove(filePath) defer os.Remove(filePath + indexFileExt) - db, err := OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err := Open(filePath) assert.NoError(t, err) for i := 1; i <= recordCount; i++ { @@ -358,7 +367,7 @@ func TestIndexFileBasic(t *testing.T) { assert.NoError(t, err) // try to read - db, err = OpenWithOptions(filePath, Options{UseIndexFile: true}) + db, err = Open(filePath) assert.NoError(t, err) assert.Len(t, db.dataOffset, recordCount)