2022-02-16 16:16:29 +05:00
package zkv
2022-02-16 16:08:20 +05:00
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"os"
"sync"
"github.com/klauspost/compress/zstd"
)
type Database struct {
dataOffset map [ string ] int64
file * os . File
compressor * zstd . Encoder
filePath string
offset int64
mu sync . Mutex
}
func ( db * Database ) Close ( ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
2022-02-16 17:45:46 +05:00
err := db . compressor . Close ( )
2022-02-16 16:08:20 +05:00
if err != nil {
return err
}
return db . file . Close ( )
}
func ( db * Database ) Set ( key , value interface { } ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
record , err := newRecord ( RecordTypeSet , key , value )
if err != nil {
return err
}
b , err := record . Marshal ( )
if err != nil {
return err
}
db . dataOffset [ string ( record . KeyHash ) ] = db . offset // TODO: удалить хеш и откатить запись в случае ошибки
_ , err = db . compressor . Write ( b )
if err != nil {
return err
}
db . offset += int64 ( len ( b ) ) // TODO: удалить хеш и откатить запись в случае ошибки
return nil
}
func ( db * Database ) Get ( key , value interface { } ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
hashToFind , err := hashInterface ( key )
if err != nil {
return err
}
offset , exists := db . dataOffset [ string ( hashToFind ) ]
if ! exists {
2022-12-02 20:32:09 +05:00
return ErrNotExists
2022-02-16 16:08:20 +05:00
}
readF , err := os . Open ( db . filePath )
if err != nil {
return err
}
defer readF . Close ( )
decompressor , err := zstd . NewReader ( readF )
if err != nil {
return err
}
defer decompressor . Close ( )
err = skip ( decompressor , offset )
if err != nil {
return err
}
_ , record , err := readRecord ( decompressor )
if err != nil {
return err
}
2022-12-02 20:32:09 +05:00
if ! bytes . Equal ( record . KeyHash , hashToFind ) {
2022-02-16 16:08:20 +05:00
return fmt . Errorf ( "wrong hash on this offset: expected %s, got %s" , base64 . StdEncoding . EncodeToString ( hashToFind ) , base64 . StdEncoding . EncodeToString ( record . KeyHash ) ) // TODO: заменить на константную ошибку
}
return decode ( record . ValueBytes , value )
}
func Open ( filePath string ) ( * Database , error ) {
f , err := os . OpenFile ( filePath , os . O_APPEND | os . O_CREATE | os . O_WRONLY , 0644 )
if err != nil {
return nil , fmt . Errorf ( "ошибка при открытии файла для записи: %v" , err )
}
compressor , err := zstd . NewWriter ( f )
if err != nil {
return nil , fmt . Errorf ( "ошибка при инициализации компрессора: %v" , err )
}
database := & Database {
dataOffset : make ( map [ string ] int64 ) ,
offset : 0 ,
file : f ,
compressor : compressor ,
filePath : filePath }
// restore file data
readF , err := os . Open ( filePath )
if err != nil {
f . Close ( )
return nil , fmt . Errorf ( "ошибка при открытии файла для чтения: %v" , err )
}
defer readF . Close ( )
decompressor , err := zstd . NewReader ( readF )
if err != nil {
return nil , fmt . Errorf ( "ошибка при инициализации декомпрессора: %v" , err )
}
defer decompressor . Close ( )
offset := int64 ( 0 )
for {
n , record , err := readRecord ( decompressor )
if err == io . EOF {
break
}
if err != nil {
f . Close ( )
return nil , fmt . Errorf ( "ошибка при чтении записи из файла: %v" , err )
}
switch record . Type {
case RecordTypeSet :
database . dataOffset [ string ( record . KeyHash ) ] = offset
case RecordTypeDelete :
delete ( database . dataOffset , string ( record . KeyHash ) )
}
offset += n
}
return database , nil
}
2022-12-02 20:32:09 +05:00
func ( db * Database ) Delete ( key interface { } ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
keyHash , err := hashInterface ( key )
if err != nil {
return err
}
record := & Record {
Type : RecordTypeDelete ,
KeyHash : keyHash ,
}
b , err := record . Marshal ( )
if err != nil {
return err
}
delete ( db . dataOffset , string ( record . KeyHash ) )
_ , err = db . compressor . Write ( b )
if err != nil {
return err
}
db . offset += int64 ( len ( b ) )
return nil
}