From c40ec11e9a552422e754fa9e00b4339aba6ab6a1 Mon Sep 17 00:00:00 2001 From: nxshock Date: Wed, 16 Feb 2022 16:08:20 +0500 Subject: [PATCH] Initial commit --- .gitattributes | 2 + .gitignore | 12 ++++ LICENSE | 21 ++++++ go.mod | 14 ++++ go.sum | 13 ++++ record.go | 82 +++++++++++++++++++++++ utils.go | 46 +++++++++++++ zkv.go | 174 +++++++++++++++++++++++++++++++++++++++++++++++++ zkv_test.go | 79 ++++++++++++++++++++++ 9 files changed, 443 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 go.mod create mode 100644 go.sum create mode 100644 record.go create mode 100644 utils.go create mode 100644 zkv.go create mode 100644 zkv_test.go diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..dfe0770 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +# Auto detect text files and perform LF normalization +* text=auto diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1c181e --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..01aa971 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2022 nxshock + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..313f293 --- /dev/null +++ b/go.mod @@ -0,0 +1,14 @@ +module github.com/nxshock/zkv + +go 1.17 + +require ( + github.com/klauspost/compress v1.14.2 + github.com/stretchr/testify v1.7.0 +) + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9feb705 --- /dev/null +++ b/go.sum @@ -0,0 +1,13 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw= +github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/record.go b/record.go new file mode 100644 index 0000000..dae2edb --- /dev/null +++ b/record.go @@ -0,0 +1,82 @@ +package zkv2 + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "io" +) + +type RecordType uint8 + +const ( + RecordTypeSet RecordType = iota + 1 + RecordTypeDelete +) + +type Record struct { + Type RecordType `json:"t"` + KeyHash []byte `json:"h"` + KeyBytes []byte `json:"k,omitempty"` // optional + ValueBytes []byte `json:"v"` +} + +func newRecord(recordType RecordType, key, value interface{}) (*Record, error) { + keyBytes, err := encode(key) + if err != nil { + return nil, err + } + + valueBytes, err := encode(value) + if err != nil { + return nil, err + } + + record := &Record{ + Type: recordType, + KeyHash: hashBytes(keyBytes), + KeyBytes: keyBytes, + ValueBytes: valueBytes} + + return record, nil +} + +func (r *Record) Marshal() ([]byte, error) { + buf := new(bytes.Buffer) + + err := json.NewEncoder(buf).Encode(r) + if err != nil { + return nil, err + } + + buf2 := new(bytes.Buffer) + + err = binary.Write(buf2, binary.LittleEndian, int64(buf.Len())) + if err != nil { + return nil, err + } + + return append(buf2.Bytes(), buf.Bytes()...), nil +} + +func readRecord(r io.Reader) (n int64, record *Record, err error) { + var recordBytesLen int64 + err = binary.Read(r, binary.LittleEndian, &recordBytesLen) + if err != nil { + return 0, nil, err // TODO: вместо нуля должно быть реальное кол-во считанных байт + } + + recordBytes := make([]byte, int(recordBytesLen)) + + _, err = io.ReadAtLeast(r, recordBytes, int(recordBytesLen)) + if err != nil { + return 0, nil, err // TODO: вместо нуля должно быть реальное кол-во считанных байт + } + + err = json.NewDecoder(bytes.NewReader(recordBytes)).Decode(&record) + if err != nil { + return 0, nil, err // TODO: вместо нуля должно быть реальное кол-во считанных байт + } + + return recordBytesLen + 8, record, nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..8f0ba49 --- /dev/null +++ b/utils.go @@ -0,0 +1,46 @@ +package zkv2 + +import ( + "bytes" + "crypto/sha256" + "encoding/json" + "io" + "io/ioutil" +) + +func encode(value interface{}) ([]byte, error) { + buf := new(bytes.Buffer) + err := json.NewEncoder(buf).Encode(value) + return buf.Bytes(), err +} + +func decode(b []byte, value interface{}) error { + return json.NewDecoder(bytes.NewReader(b)).Decode(&value) +} + +func hashInterface(value interface{}) ([]byte, error) { + valueBytes, err := encode(value) + if err != nil { + return nil, err + } + + return hashBytes(valueBytes), nil +} + +func hashBytes(b []byte) []byte { + bytes := sha256.Sum224(b) + + return bytes[:] + +} + +func skip(r io.Reader, count int64) (err error) { + switch r := r.(type) { + case io.Seeker: + _, err = r.Seek(count, io.SeekCurrent) + default: + _, err = io.CopyN(ioutil.Discard, r, count) + } + + return err +} diff --git a/zkv.go b/zkv.go new file mode 100644 index 0000000..07a3fe7 --- /dev/null +++ b/zkv.go @@ -0,0 +1,174 @@ +package zkv2 + +import ( + "bytes" + "encoding/base64" + "errors" + "fmt" + "io" + "os" + "sync" + + "github.com/klauspost/compress/zstd" +) + +type Options struct { + SaveKeys bool +} + +type Database struct { + dataOffset map[string]int64 + file *os.File + compressor *zstd.Encoder + filePath string + offset int64 + + options Options + + mu sync.Mutex +} + +func (db *Database) Close() error { + db.mu.Lock() + defer db.mu.Unlock() + + err := db.compressor.Flush() + if err != nil { + return err + } + err = db.compressor.Close() + if err != nil { + return err + } + + return db.file.Close() +} + +func (db *Database) Set(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + record, err := newRecord(RecordTypeSet, key, value) + if err != nil { + return err + } + + if !db.options.SaveKeys { + record.KeyBytes = nil + } + + b, err := record.Marshal() + if err != nil { + return err + } + + db.dataOffset[string(record.KeyHash)] = db.offset // TODO: удалить хеш и откатить запись в случае ошибки + + _, err = db.compressor.Write(b) + if err != nil { + return err + } + + db.offset += int64(len(b)) // TODO: удалить хеш и откатить запись в случае ошибки + + return nil +} + +func (db *Database) Get(key, value interface{}) error { + db.mu.Lock() + defer db.mu.Unlock() + + hashToFind, err := hashInterface(key) + if err != nil { + return err + } + + offset, exists := db.dataOffset[string(hashToFind)] + if !exists { + return errors.New("not exists") // TODO: заменить на константную ошибку + } + + readF, err := os.Open(db.filePath) + if err != nil { + return err + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return err + } + defer decompressor.Close() + + err = skip(decompressor, offset) + if err != nil { + return err + } + + _, record, err := readRecord(decompressor) + if err != nil { + return err + } + + if bytes.Compare(record.KeyHash, hashToFind) != 0 { + return fmt.Errorf("wrong hash on this offset: expected %s, got %s", base64.StdEncoding.EncodeToString(hashToFind), base64.StdEncoding.EncodeToString(record.KeyHash)) // TODO: заменить на константную ошибку + } + + return decode(record.ValueBytes, value) +} + +func Open(filePath string) (*Database, error) { + f, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nil, fmt.Errorf("ошибка при открытии файла для записи: %v", err) + } + + compressor, err := zstd.NewWriter(f) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации компрессора: %v", err) + } + + database := &Database{ + dataOffset: make(map[string]int64), + offset: 0, + file: f, + compressor: compressor, + filePath: filePath} + + // restore file data + readF, err := os.Open(filePath) + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при открытии файла для чтения: %v", err) + } + defer readF.Close() + + decompressor, err := zstd.NewReader(readF) + if err != nil { + return nil, fmt.Errorf("ошибка при инициализации декомпрессора: %v", err) + } + defer decompressor.Close() + + offset := int64(0) + for { + n, record, err := readRecord(decompressor) + if err == io.EOF { + break + } + if err != nil { + f.Close() + return nil, fmt.Errorf("ошибка при чтении записи из файла: %v", err) + } + + switch record.Type { + case RecordTypeSet: + database.dataOffset[string(record.KeyHash)] = offset + case RecordTypeDelete: + delete(database.dataOffset, string(record.KeyHash)) + } + + offset += n + } + + return database, nil +} diff --git a/zkv_test.go b/zkv_test.go new file mode 100644 index 0000000..cba35d2 --- /dev/null +++ b/zkv_test.go @@ -0,0 +1,79 @@ +package zkv2 + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadWriteBasic(t *testing.T) { + const filePath = "TestReadWriteBasic.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + db, err := Open(filePath) + assert.NoError(t, err) + + for i := 1; i <= recordCount; i++ { + err = db.Set(i, i) + assert.NoError(t, err) + } + + assert.Len(t, db.dataOffset, recordCount) + + err = db.Close() + assert.NoError(t, err) + + // try to read + db, err = Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) +} + +func TestSmallWrites(t *testing.T) { + const filePath = "TestSmallWrites.zkv" + const recordCount = 100 + defer os.Remove(filePath) + + for i := 1; i <= recordCount; i++ { + db, err := Open(filePath) + assert.NoError(t, err) + + err = db.Set(i, i) + assert.NoError(t, err) + + err = db.Close() + assert.NoError(t, err) + } + + // try to read + + db, err := Open(filePath) + assert.NoError(t, err) + + assert.Len(t, db.dataOffset, recordCount) + + for i := 1; i <= recordCount; i++ { + var gotValue int + + err = db.Get(i, &gotValue) + assert.NoError(t, err) + assert.Equal(t, i, gotValue) + } + + err = db.Close() + assert.NoError(t, err) +}