2022-02-16 16:16:29 +05:00
package zkv
2022-02-16 16:08:20 +05:00
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"os"
"sync"
"github.com/klauspost/compress/zstd"
)
type Database struct {
dataOffset map [ string ] int64
file * os . File
compressor * zstd . Encoder
filePath string
offset int64
2022-12-03 12:40:36 +05:00
options Options
readOrderChan chan struct { }
2022-12-02 21:37:15 +05:00
mu sync . RWMutex
2022-02-16 16:08:20 +05:00
}
func ( db * Database ) Close ( ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
2022-02-16 17:45:46 +05:00
err := db . compressor . Close ( )
2022-02-16 16:08:20 +05:00
if err != nil {
return err
}
return db . file . Close ( )
}
func ( db * Database ) Set ( key , value interface { } ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
record , err := newRecord ( RecordTypeSet , key , value )
if err != nil {
return err
}
b , err := record . Marshal ( )
if err != nil {
return err
}
2022-12-02 21:37:34 +05:00
db . dataOffset [ string ( record . KeyHash [ : ] ) ] = db . offset // TODO: удалить хеш и откатить запись в случае ошибки
2022-02-16 16:08:20 +05:00
_ , err = db . compressor . Write ( b )
if err != nil {
return err
}
db . offset += int64 ( len ( b ) ) // TODO: удалить хеш и откатить запись в случае ошибки
return nil
}
func ( db * Database ) Get ( key , value interface { } ) error {
2022-12-02 21:37:15 +05:00
db . mu . RLock ( )
defer db . mu . RUnlock ( )
2022-02-16 16:08:20 +05:00
2022-12-03 12:40:36 +05:00
db . readOrderChan <- struct { } { }
defer func ( ) { <- db . readOrderChan } ( )
2022-02-16 16:08:20 +05:00
hashToFind , err := hashInterface ( key )
if err != nil {
return err
}
2022-12-02 21:37:34 +05:00
offset , exists := db . dataOffset [ string ( hashToFind [ : ] ) ]
2022-02-16 16:08:20 +05:00
if ! exists {
2022-12-02 20:32:09 +05:00
return ErrNotExists
2022-02-16 16:08:20 +05:00
}
readF , err := os . Open ( db . filePath )
if err != nil {
return err
}
defer readF . Close ( )
decompressor , err := zstd . NewReader ( readF )
if err != nil {
return err
}
defer decompressor . Close ( )
err = skip ( decompressor , offset )
if err != nil {
return err
}
_ , record , err := readRecord ( decompressor )
if err != nil {
return err
}
2022-12-02 21:37:34 +05:00
if ! bytes . Equal ( record . KeyHash [ : ] , hashToFind [ : ] ) {
return fmt . Errorf ( "wrong hash on this offset: expected %s, got %s" , base64 . StdEncoding . EncodeToString ( hashToFind [ : ] ) , base64 . StdEncoding . EncodeToString ( record . KeyHash [ : ] ) ) // TODO: заменить на константную ошибку
2022-02-16 16:08:20 +05:00
}
return decode ( record . ValueBytes , value )
}
2022-12-03 12:40:36 +05:00
func OpenWithOptions ( filePath string , options Options ) ( * Database , error ) {
2022-02-16 16:08:20 +05:00
f , err := os . OpenFile ( filePath , os . O_APPEND | os . O_CREATE | os . O_WRONLY , 0644 )
if err != nil {
2022-12-03 12:40:36 +05:00
f . Close ( )
2022-02-16 16:08:20 +05:00
return nil , fmt . Errorf ( "ошибка при открытии файла для записи: %v" , err )
}
2022-12-03 12:40:36 +05:00
if options . Validate ( ) != nil {
return nil , err
}
2022-02-16 16:08:20 +05:00
compressor , err := zstd . NewWriter ( f )
if err != nil {
2022-12-03 12:40:36 +05:00
f . Close ( )
2022-02-16 16:08:20 +05:00
return nil , fmt . Errorf ( "ошибка при инициализации компрессора: %v" , err )
}
database := & Database {
2022-12-03 12:40:36 +05:00
dataOffset : make ( map [ string ] int64 ) ,
offset : 0 ,
file : f ,
compressor : compressor ,
filePath : filePath ,
options : options ,
readOrderChan : make ( chan struct { } , int ( options . MaxParallelReads ) ) }
2022-02-16 16:08:20 +05:00
// restore file data
readF , err := os . Open ( filePath )
if err != nil {
f . Close ( )
return nil , fmt . Errorf ( "ошибка при открытии файла для чтения: %v" , err )
}
defer readF . Close ( )
decompressor , err := zstd . NewReader ( readF )
if err != nil {
2022-12-03 12:40:36 +05:00
f . Close ( )
2022-02-16 16:08:20 +05:00
return nil , fmt . Errorf ( "ошибка при инициализации декомпрессора: %v" , err )
}
defer decompressor . Close ( )
offset := int64 ( 0 )
for {
n , record , err := readRecord ( decompressor )
if err == io . EOF {
break
}
if err != nil {
f . Close ( )
return nil , fmt . Errorf ( "ошибка при чтении записи из файла: %v" , err )
}
switch record . Type {
case RecordTypeSet :
2022-12-02 21:37:34 +05:00
database . dataOffset [ string ( record . KeyHash [ : ] ) ] = offset
2022-02-16 16:08:20 +05:00
case RecordTypeDelete :
2022-12-02 21:37:34 +05:00
delete ( database . dataOffset , string ( record . KeyHash [ : ] ) )
2022-02-16 16:08:20 +05:00
}
offset += n
}
return database , nil
}
2022-12-02 20:32:09 +05:00
2022-12-03 12:40:36 +05:00
func Open ( filePath string ) ( * Database , error ) {
return OpenWithOptions ( filePath , defaultOptions )
}
2022-12-02 20:32:09 +05:00
func ( db * Database ) Delete ( key interface { } ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
keyHash , err := hashInterface ( key )
if err != nil {
return err
}
record := & Record {
Type : RecordTypeDelete ,
KeyHash : keyHash ,
}
b , err := record . Marshal ( )
if err != nil {
return err
}
2022-12-02 21:37:34 +05:00
delete ( db . dataOffset , string ( record . KeyHash [ : ] ) )
2022-12-02 20:32:09 +05:00
_ , err = db . compressor . Write ( b )
if err != nil {
return err
}
db . offset += int64 ( len ( b ) )
return nil
}