From b3010bc660ed1f3381a61bc1cdcafb32c1282ba3 Mon Sep 17 00:00:00 2001 From: nxshock Date: Sun, 18 Sep 2022 12:43:16 +0500 Subject: [PATCH] first commit --- .gitignore | 1 + README.md | 26 +++++++ csv.go | 16 ++++ encodings.go | 49 ++++++++++++ go.mod | 17 +++++ go.sum | 46 +++++++++++ main.go | 210 +++++++++++++++++++++++++++++++++++++++++++++++++++ make.bat | 1 + sql.go | 40 ++++++++++ types.go | 105 ++++++++++++++++++++++++++ zip.go | 25 ++++++ 11 files changed, 536 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 csv.go create mode 100644 encodings.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 make.bat create mode 100644 sql.go create mode 100644 types.go create mode 100644 zip.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9299574 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/zillow.csv \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..eff5d17 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# 0csv2db + +Bulk CSV files uploader into Microsoft SQL Server. + +## Usage + +``` +Usage: + csv2db [OPTIONS] + +Application Options: + /f, /file: CSV file path + /s, /server: server address (default: 127.0.0.1) + /d, /database: database name + /t, /table: table name + /l, /fields: field types + /c, /comma:[,|;|t] CSV file comma character (default: ,) + /x, /create create table + /o, /overwrite overwrite existing table + /e, /encoding:[utf8|win1251] CSV file charset (default: utf8) + /r, /skiprows: number of rows to skip +``` + +## Build + +Use `make.bat` file to build `csv2db.exe` executable. \ No newline at end of file diff --git a/csv.go b/csv.go new file mode 100644 index 0000000..1222316 --- /dev/null +++ b/csv.go @@ -0,0 +1,16 @@ +package main + +import ( + "fmt" + "os" +) + +func processCsvFile(filePath string) error { + f, err := os.Open(opts.FilePath) + if err != nil { + return fmt.Errorf("open file: %v", err) + } + defer f.Close() + + return processReader(f) +} diff --git a/encodings.go b/encodings.go new file mode 100644 index 0000000..2f7c7c2 --- /dev/null +++ b/encodings.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "io" + + "github.com/dimchansky/utfbom" + "golang.org/x/text/encoding/charmap" +) + +type Encoding int + +const ( + Utf8 Encoding = iota + Win1251 +) + +func (e Encoding) Translate(r io.Reader) (io.Reader, error) { + switch e { + case Utf8: + return utfbom.SkipOnly(r), nil + case Win1251: + return charmap.Windows1251.NewDecoder().Reader(r), nil + } + + return nil, fmt.Errorf("unknown encoding id = %d", e) +} + +func (e Encoding) MarshalText() (text []byte, err error) { + switch e { + case Utf8: + return []byte("utf8"), nil + case Win1251: + return []byte("win1251"), nil + } + + return nil, fmt.Errorf("unknown encoding id = %d", e) +} + +func (e *Encoding) UnmarshalText(text []byte) error { + switch string(text) { + case "utf8": + *e = Utf8 + case "win1251": + *e = Win1251 + } + + return fmt.Errorf("unknown encoding: %s", string(text)) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a227e1d --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/nxshock/csv2db + +go 1.19 + +require ( + github.com/denisenkom/go-mssqldb v0.12.2 + golang.org/x/text v0.3.7 + github.com/dimchansky/utfbom v1.1.1 +) + +require ( + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect + github.com/golang-sql/sqlexp v0.1.0 // indirect + github.com/jessevdk/go-flags v1.5.0 + golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect + golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..90a6cdb --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/denisenkom/go-mssqldb v0.12.2 h1:1OcPn5GBIobjWNd+8yjfHNIaFX14B1pWI3F9HZy5KXw= +github.com/denisenkom/go-mssqldb v0.12.2/go.mod h1:lnIw1mZukFRZDJYQ0Pb833QS2IaC3l5HkEfra2LJ+sk= +github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= +github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= +github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= +github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= +github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= +github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= +golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 h1:ohgcoMbSofXygzo6AD2I1kz3BFmW1QArPYTtwEM3UXc= +golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..a19a65c --- /dev/null +++ b/main.go @@ -0,0 +1,210 @@ +package main + +import ( + "bufio" + "database/sql" + "encoding/csv" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + + _ "github.com/denisenkom/go-mssqldb" + mssql "github.com/denisenkom/go-mssqldb" + "github.com/jessevdk/go-flags" +) + +var db *sql.DB +var opts struct { + FilePath string `short:"f" long:"file" description:"CSV file path" required:"true"` + ServerAddress string `short:"s" long:"server" description:"server address" default:"127.0.0.1"` + DatabaseName string `short:"d" long:"database" description:"database name" required:"true"` + TableName string `short:"t" long:"table" description:"table name" required:"true"` + FieldTypes string `short:"l" long:"fields" description:"field types" required:"true"` + Comma string `short:"c" long:"comma" description:"CSV file comma character" choice:"," choice:";" choice:"t" default:","` + CreateTable bool `short:"x" long:"create" description:"create table"` + OverwriteTable bool `short:"o" long:"overwrite" description:"overwrite existing table"` + Encoding string `short:"e" long:"encoding" description:"CSV file charset" choice:"utf8" choice:"win1251" default:"utf8"` + SkipRows int `short:"r" long:"skiprows" description:"number of rows to skip"` +} + +func init() { + log.SetFlags(0) +} + +func main() { + _, err := flags.Parse(&opts) + if err != nil { + os.Exit(1) + } + + db, err = sql.Open("sqlserver", fmt.Sprintf("sqlserver://%s?database=%s", opts.ServerAddress, opts.DatabaseName)) + if err != nil { + log.Fatalln(fmt.Errorf("open database: %v", err)) + } + defer db.Close() + + switch strings.ToLower(filepath.Ext(opts.FilePath)) { + case ".zip": + err = processZipFile(opts.FilePath) + case ".csv": + err = processCsvFile(opts.FilePath) + } + if err != nil { + log.Fatalln(err) + } +} + +func processReader(r io.Reader) error { + var encoding Encoding + err := encoding.UnmarshalText([]byte(opts.Encoding)) + if err != nil { + return fmt.Errorf("get decoder: %v", opts.Encoding) + } + + decoder, err := encoding.Translate(r) + if err != nil { + return fmt.Errorf("enable decoder: %v", opts.Encoding) + } + + bufReader := bufio.NewReaderSize(decoder, 4*1024*1024) + + reader := csv.NewReader(bufReader) + reader.TrimLeadingSpace = true + reader.FieldsPerRecord = len(opts.FieldTypes) + + if []rune(opts.Comma)[0] == 't' { + reader.Comma = '\t' + } else { + reader.Comma = []rune(opts.Comma)[0] + } + + for i := 0; i < opts.SkipRows; i++ { + _, err := reader.Read() + if err == csv.ErrFieldCount { + continue + } + if err != nil { + return fmt.Errorf("skip rows: %v", err) + } + } + + header, err := reader.Read() + if err != nil { + return fmt.Errorf("read header: %v", err) + } + + headerList := `"` + for i, v := range header { + if opts.FieldTypes[i] == ' ' { + continue + } + + headerList += v + + if i+1 < len(header) { + headerList += `", "` + } else { + headerList += `"` + } + } + + var neededHeader []string + for i, v := range header { + if opts.FieldTypes[i] == ' ' { + continue + } + + neededHeader = append(neededHeader, v) + } + + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("start transaction: %v", err) + } + + if opts.CreateTable { + err = createTable(tx, header) + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("create table: %v", err) + } + } + + sql := mssql.CopyIn(opts.TableName, mssql.BulkOptions{Tablock: true}, neededHeader...) + + stmt, err := tx.Prepare(sql) + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("prepare statement: %v", err) + } + + n := 0 + for { + if n%100000 == 0 { + fmt.Fprintf(os.Stderr, "Processed %d records...\r", n) + } + + record, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read record: %v", err) + } + + var args []any + + for i, v := range record { + var fieldType FieldType + err = fieldType.UnmarshalText([]byte{opts.FieldTypes[i]}) + if err != nil { + return fmt.Errorf("get record type: %v", err) + } + if fieldType == Skip { + continue + } + + parsedValue, err := fieldType.ParseValue(v) + if err != nil { + return fmt.Errorf("parse value: %v", err) + } + + args = append(args, parsedValue) + } + + _, err = stmt.Exec(args...) + if err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return fmt.Errorf("execute statement: %v", err) + } + n++ + } + result, err := stmt.Exec() + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("execute statement: %v", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("calc rows affected: %v", err) + } + fmt.Fprintf(os.Stderr, "Processed %d records. \n", rowsAffected) + + err = stmt.Close() + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("close statement: %v", err) + } + + err = tx.Commit() + if err != nil { + return fmt.Errorf("commit: %v", err) + } + + return nil +} diff --git a/make.bat b/make.bat new file mode 100644 index 0000000..5b21e00 --- /dev/null +++ b/make.bat @@ -0,0 +1 @@ +go build -trimpath -buildmode=pie -ldflags "-linkmode external -s -w" \ No newline at end of file diff --git a/sql.go b/sql.go new file mode 100644 index 0000000..f4a6561 --- /dev/null +++ b/sql.go @@ -0,0 +1,40 @@ +package main + +import ( + "database/sql" + "fmt" +) + +func createTable(tx *sql.Tx, header []string) error { + if opts.OverwriteTable { + _, err := tx.Exec(fmt.Sprintf("IF object_id('%s', 'U') IS NOT NULL DROP TABLE %s", opts.TableName, opts.TableName)) + if err != nil { + return fmt.Errorf("drop table: %v", err) + } + } + + sql := fmt.Sprintf("CREATE TABLE %s (", opts.TableName) + + for i, v := range header { + var fieldType FieldType + err := fieldType.UnmarshalText([]byte(v)) + if err != nil { + return fmt.Errorf("detect field type: %v", err) + } + + sql += fmt.Sprintf(`"%s" %s`, v, fieldType.SqlFieldType()) + + if i+1 < len(header) { + sql += ", " + } else { + sql += ") WITH (DATA_COMPRESSION = PAGE)" + } + } + + _, err := tx.Exec(sql) + if err != nil { + return fmt.Errorf("execute table creation: %v", err) + } + + return nil +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..e77225a --- /dev/null +++ b/types.go @@ -0,0 +1,105 @@ +package main + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +type FieldType int + +const ( + Skip FieldType = iota + Integer + String + Float + Money + Date + Timestamp + TimestampWithoutSeconds +) + +func (ft FieldType) ParseValue(s string) (any, error) { + switch ft { + case String: + return s, nil + case Integer: + return strconv.ParseInt(s, 10, 64) + case Float: + return strconv.ParseFloat(strings.ReplaceAll(s, ",", "."), 64) + case Date: + return time.Parse("02.01.2006", s) + case Timestamp: + return time.Parse("02.01.2006 15:04:05", s) + case TimestampWithoutSeconds: + return time.Parse("02.01.2006 15:04", s) + } + + return nil, fmt.Errorf("unknown type id = %d", ft) +} + +func (ft FieldType) SqlFieldType() string { + switch ft { + case Integer: + return "bigint" + case String: + return "nvarchar(255)" + case Float: + return "float" + case Money: + panic("do not implemented - see https://github.com/denisenkom/go-mssqldb/issues/460") // TODO: https://github.com/denisenkom/go-mssqldb/issues/460 + case Date: + return "date" + case Timestamp, TimestampWithoutSeconds: + return "datetime2" + } + + return "" +} + +func (ft FieldType) MarshalText() (text []byte, err error) { + switch ft { + case Skip: + return []byte(" "), nil + case Integer: + return []byte("i"), nil + case String: + return []byte("s"), nil + case Float: + return []byte("f"), nil + case Money: + return []byte("m"), nil + case Date: + return []byte("d"), nil + case Timestamp: + return []byte("t"), nil + case TimestampWithoutSeconds: + return []byte("w"), nil + } + + return nil, fmt.Errorf("unknown type id = %d", ft) +} + +func (ft *FieldType) UnmarshalText(text []byte) error { + switch string(text) { + case " ": + *ft = Skip + case "i": + *ft = Integer + case "s": + *ft = String + case "f": + *ft = Float + case "m": + *ft = Money + case "d": + *ft = Date + case "t": + *ft = Timestamp + case "w": + *ft = TimestampWithoutSeconds + } + + return fmt.Errorf("unknown format code %s", string(text)) +} diff --git a/zip.go b/zip.go new file mode 100644 index 0000000..ed35ea8 --- /dev/null +++ b/zip.go @@ -0,0 +1,25 @@ +package main + +import ( + "archive/zip" + "fmt" +) + +func processZipFile(filePath string) error { + r, err := zip.OpenReader(filePath) + if err != nil { + return err + } + + if len(r.File) != 1 { + return fmt.Errorf("supported only one file in archive, got %d files", len(r.File)) + } + + zipFileReader, err := r.File[0].Open() + if err != nil { + return err + } + defer zipFileReader.Close() + + return processReader(zipFileReader) +}