commit c40ec11e9a552422e754fa9e00b4339aba6ab6a1 Author: nxshock Date: Wed Feb 16 16:08:20 2022 +0500 Initial commit diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..dfe0770 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +# Auto detect text files and perform LF normalization +* text=auto diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1c181e --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..01aa971 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 nxshock + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..313f293 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/nxshock/zkv + +go 1.17 + +require ( + github.com/klauspost/compress v1.14.2 + github.com/stretchr/testify v1.7.0 +) + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9feb705 --- /dev/null +++ b/go.sum @@ -0,0 +1,13 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= +github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/record.go b/record.go new file mode 100644 index 0000000..dae2edb --- /dev/null +++ b/record.go @@ -0,0 +1,82 @@ +package zkv2 + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "io" +) + +type RecordType uint8 + +const ( + RecordTypeSet RecordType = iota + 1 + RecordTypeDelete +) + +type Record struct { + Type RecordType `json:"t"` + KeyHash []byte `json:"h"` + KeyBytes []byte `json:"k,omitempty"` // optional + ValueBytes []byte `json:"v"` +} + +func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { + keyBytes, err := encode(key) + if err != nil { + return nil, err + } + + valueBytes, err := encode(value) + if err != nil { + return nil, err + } + + record := &Record{ + Type: recordType, + KeyHash: hashBytes(keyBytes), + KeyBytes: keyBytes, + ValueBytes: valueBytes} + + return record, nil +} + +func (r *Record) Marshal() ([]byte, error) { + buf := new(bytes.Buffer) + + err := json.NewEncoder(buf).Encode(r) + if err != nil { + return nil, err + } + + buf2 := new(bytes.Buffer) + + err = binary.Write(buf2, binary.LittleEndian, int64(buf.Len())) + if err != nil { + return nil, err + } + + return append(buf2.Bytes(), buf.Bytes()...), nil +} + +func readRecord(r io.Reader) (n int64, record *Record, err error) { + var recordBytesLen int64 + err = binary.Read(r, binary.LittleEndian, &recordBytesLen) + if err != nil { + return 0, nil, err // TODO: вместо нуля должно быть реальное кол-во считанных байт + } + + recordBytes := make([]byte, int(recordBytesLen)) + + _, err = io.ReadAtLeast(r, recordBytes, int(recordBytesLen)) + if err != nil { + return 0, nil, err // TODO: вместо нуля должно быть реальное кол-во считанных байт + } + + err = json.NewDecoder(bytes.NewReader(recordBytes)).Decode(&record) + if err != nil { + return 0, nil, err // TODO: вместо нуля должно быть реальное кол-во считанных байт + } + + return recordBytesLen + 8, record, nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..8f0ba49 --- /dev/null +++ b/utils.go @@ -0,0 +1,46 @@ +package zkv2 + +import ( + "bytes" + "crypto/sha256" + "encoding/json" + "io" + "io/ioutil" +) + +func encode(value interface{}) ([]byte, error) { + buf := new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(value) + return buf.Bytes(), err +} + +func decode(b []byte, value interface{}) error { + return json.NewDecoder(bytes.NewReader(b)).Decode(&value) +} + +func hashInterface(value interface{}) ([]byte, error) { + valueBytes, err := encode(value) + if err != nil { + return nil, err + } + + return hashBytes(valueBytes), nil +} + +func hashBytes(b []byte) []byte { + bytes := sha256.Sum224(b) + + return bytes[:] + +} + +func skip(r io.Reader, count int64) (err error) { + switch r := r.(type) { + case io.Seeker: + _, err = r.Seek(count, io.SeekCurrent) + default: + _, err = io.CopyN(ioutil.Discard, r, count) + } + + return err +} diff --git a/zkv.go b/zkv.go new file mode 100644 index 0000000..07a3fe7 --- /dev/null +++ b/zkv.go @@ -0,0 +1,174 @@ +package zkv2 + +import ( + "bytes" + "encoding/base64" + "errors" + "fmt" + "io" + "os" + "sync" + + "github.com/klauspost/compress/zstd" +) + +type Options struct { + SaveKeys bool +} + +type Database struct { + dataOffset map[string]int64 + file *os.File + compressor *zstd.Encoder + filePath string + offset int64 + + options Options + + mu sync.Mutex +} + +func (db *Database) Close() error { + db.mu.Lock() + defer db.mu.Unlock() + + err := db.compressor.Flush() + if err != nil { + return err + } + err = db.compressor.Close() + if err != nil { + return err + } + + return db.file.Close() +} + +func (db *Database) Set(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + if !db.options.SaveKeys { + record.KeyBytes = nil + } + + b, err := record.Marshal() + if err != nil { + return err + } + + db.dataOffset[string(record.KeyHash)] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки + + _, err = db.compressor.Write(b) + if err != nil { + return err + } + + db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки + + return nil +} + +func (db *Database) Get(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := db.dataOffset[string(hashToFind)] + if !exists { + return errors.New("not exists") // TODO: заменить на константную ошибку + } + + readF, err := os.Open(db.filePath) + if err != nil { + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if bytes.Compare(record.KeyHash, hashToFind) != 0 { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind), base64.StdEncoding.EncodeToString(record.KeyHash)) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) +} + +func Open(filePath string) (*Database, error) { + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) + } + + compressor, err := zstd.NewWriter(f) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) + } + + database := &Database{ + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath} + + // restore file data + readF, err := os.Open(filePath) + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) + } + defer decompressor.Close() + + offset := int64(0) + for { + n, record, err := readRecord(decompressor) + if err == io.EOF { + break + } + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) + } + + switch record.Type { + case RecordTypeSet: + database.dataOffset[string(record.KeyHash)] = offset + case RecordTypeDelete: + delete(database.dataOffset, string(record.KeyHash)) + } + + offset += n + } + + return database, nil +} diff --git a/zkv_test.go b/zkv_test.go new file mode 100644 index 0000000..cba35d2 --- /dev/null +++ b/zkv_test.go @@ -0,0 +1,79 @@ +package zkv2 + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadWriteBasic(t *testing.T) { + const filePath = "TestReadWriteBasic.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + assert.Len(t, db.dataOffset, recordCount) + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) +} + +func TestSmallWrites(t *testing.T) { + const filePath = "TestSmallWrites.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + for i := 1; i <= recordCount; i++ { + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.Set(i, i) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + } + + // try to read + + db, err := Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) +}