commit b3010bc660ed1f3381a61bc1cdcafb32c1282ba3 Author: nxshock Date: Sun Sep 18 12:43:16 2022 +0500 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9299574 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/zillow.csv \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..eff5d17 --- /dev/null +++ b/README.md @@ -0,0 +1,26 @@ +# 0csv2db + +Bulk CSV files uploader into Microsoft SQL Server. + +## Usage + +``` +Usage: + csv2db [OPTIONS] + +Application Options: + /f, /file: CSV file path + /s, /server: server address (default: 127.0.0.1) + /d, /database: database name + /t, /table: table name + /l, /fields: field types + /c, /comma:[,|;|t] CSV file comma character (default: ,) + /x, /create create table + /o, /overwrite overwrite existing table + /e, /encoding:[utf8|win1251] CSV file charset (default: utf8) + /r, /skiprows: number of rows to skip +``` + +## Build + +Use `make.bat` file to build `csv2db.exe` executable. \ No newline at end of file diff --git a/csv.go b/csv.go new file mode 100644 index 0000000..1222316 --- /dev/null +++ b/csv.go @@ -0,0 +1,16 @@ +package main + +import ( + "fmt" + "os" +) + +func processCsvFile(filePath string) error { + f, err := os.Open(opts.FilePath) + if err != nil { + return fmt.Errorf("open file: %v", err) + } + defer f.Close() + + return processReader(f) +} diff --git a/encodings.go b/encodings.go new file mode 100644 index 0000000..2f7c7c2 --- /dev/null +++ b/encodings.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "io" + + "github.com/dimchansky/utfbom" + "golang.org/x/text/encoding/charmap" +) + +type Encoding int + +const ( + Utf8 Encoding = iota + Win1251 +) + +func (e Encoding) Translate(r io.Reader) (io.Reader, error) { + switch e { + case Utf8: + return utfbom.SkipOnly(r), nil + case Win1251: + return charmap.Windows1251.NewDecoder().Reader(r), nil + } + + return nil, fmt.Errorf("unknown encoding id = %d", e) +} + +func (e Encoding) MarshalText() (text []byte, err error) { + switch e { + case Utf8: + return []byte("utf8"), nil + case Win1251: + return []byte("win1251"), nil + } + + return nil, fmt.Errorf("unknown encoding id = %d", e) +} + +func (e *Encoding) UnmarshalText(text []byte) error { + switch string(text) { + case "utf8": + *e = Utf8 + case "win1251": + *e = Win1251 + } + + return fmt.Errorf("unknown encoding: %s", string(text)) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a227e1d --- /dev/null +++ b/go.mod @@ -0,0 +1,17 @@ +module github.com/nxshock/csv2db + +go 1.19 + +require ( + github.com/denisenkom/go-mssqldb v0.12.2 + golang.org/x/text v0.3.7 + github.com/dimchansky/utfbom v1.1.1 +) + +require ( + github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect + github.com/golang-sql/sqlexp v0.1.0 // indirect + github.com/jessevdk/go-flags v1.5.0 + golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect + golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..90a6cdb --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/denisenkom/go-mssqldb v0.12.2 h1:1OcPn5GBIobjWNd+8yjfHNIaFX14B1pWI3F9HZy5KXw= +github.com/denisenkom/go-mssqldb v0.12.2/go.mod h1:lnIw1mZukFRZDJYQ0Pb833QS2IaC3l5HkEfra2LJ+sk= +github.com/dimchansky/utfbom v1.1.1 h1:vV6w1AhK4VMnhBno/TPVCoK9U/LP0PkLCS9tbxHdi/U= +github.com/dimchansky/utfbom v1.1.1/go.mod h1:SxdoEBH5qIqFocHMyGOXVAybYJdr71b1Q/j0mACtrfE= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= +github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= +github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= +github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= +github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= +github.com/jessevdk/go-flags v1.5.0 h1:1jKYvbxEjfUl0fmqTCOfonvskHHXMjBySTLW4y9LFvc= +github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= +github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= +golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41 h1:ohgcoMbSofXygzo6AD2I1kz3BFmW1QArPYTtwEM3UXc= +golang.org/x/sys v0.0.0-20220915200043-7b5979e65e41/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..a19a65c --- /dev/null +++ b/main.go @@ -0,0 +1,210 @@ +package main + +import ( + "bufio" + "database/sql" + "encoding/csv" + "fmt" + "io" + "log" + "os" + "path/filepath" + "strings" + + _ "github.com/denisenkom/go-mssqldb" + mssql "github.com/denisenkom/go-mssqldb" + "github.com/jessevdk/go-flags" +) + +var db *sql.DB +var opts struct { + FilePath string `short:"f" long:"file" description:"CSV file path" required:"true"` + ServerAddress string `short:"s" long:"server" description:"server address" default:"127.0.0.1"` + DatabaseName string `short:"d" long:"database" description:"database name" required:"true"` + TableName string `short:"t" long:"table" description:"table name" required:"true"` + FieldTypes string `short:"l" long:"fields" description:"field types" required:"true"` + Comma string `short:"c" long:"comma" description:"CSV file comma character" choice:"," choice:";" choice:"t" default:","` + CreateTable bool `short:"x" long:"create" description:"create table"` + OverwriteTable bool `short:"o" long:"overwrite" description:"overwrite existing table"` + Encoding string `short:"e" long:"encoding" description:"CSV file charset" choice:"utf8" choice:"win1251" default:"utf8"` + SkipRows int `short:"r" long:"skiprows" description:"number of rows to skip"` +} + +func init() { + log.SetFlags(0) +} + +func main() { + _, err := flags.Parse(&opts) + if err != nil { + os.Exit(1) + } + + db, err = sql.Open("sqlserver", fmt.Sprintf("sqlserver://%s?database=%s", opts.ServerAddress, opts.DatabaseName)) + if err != nil { + log.Fatalln(fmt.Errorf("open database: %v", err)) + } + defer db.Close() + + switch strings.ToLower(filepath.Ext(opts.FilePath)) { + case ".zip": + err = processZipFile(opts.FilePath) + case ".csv": + err = processCsvFile(opts.FilePath) + } + if err != nil { + log.Fatalln(err) + } +} + +func processReader(r io.Reader) error { + var encoding Encoding + err := encoding.UnmarshalText([]byte(opts.Encoding)) + if err != nil { + return fmt.Errorf("get decoder: %v", opts.Encoding) + } + + decoder, err := encoding.Translate(r) + if err != nil { + return fmt.Errorf("enable decoder: %v", opts.Encoding) + } + + bufReader := bufio.NewReaderSize(decoder, 4*1024*1024) + + reader := csv.NewReader(bufReader) + reader.TrimLeadingSpace = true + reader.FieldsPerRecord = len(opts.FieldTypes) + + if []rune(opts.Comma)[0] == 't' { + reader.Comma = '\t' + } else { + reader.Comma = []rune(opts.Comma)[0] + } + + for i := 0; i < opts.SkipRows; i++ { + _, err := reader.Read() + if err == csv.ErrFieldCount { + continue + } + if err != nil { + return fmt.Errorf("skip rows: %v", err) + } + } + + header, err := reader.Read() + if err != nil { + return fmt.Errorf("read header: %v", err) + } + + headerList := `"` + for i, v := range header { + if opts.FieldTypes[i] == ' ' { + continue + } + + headerList += v + + if i+1 < len(header) { + headerList += `", "` + } else { + headerList += `"` + } + } + + var neededHeader []string + for i, v := range header { + if opts.FieldTypes[i] == ' ' { + continue + } + + neededHeader = append(neededHeader, v) + } + + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("start transaction: %v", err) + } + + if opts.CreateTable { + err = createTable(tx, header) + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("create table: %v", err) + } + } + + sql := mssql.CopyIn(opts.TableName, mssql.BulkOptions{Tablock: true}, neededHeader...) + + stmt, err := tx.Prepare(sql) + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("prepare statement: %v", err) + } + + n := 0 + for { + if n%100000 == 0 { + fmt.Fprintf(os.Stderr, "Processed %d records...\r", n) + } + + record, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("read record: %v", err) + } + + var args []any + + for i, v := range record { + var fieldType FieldType + err = fieldType.UnmarshalText([]byte{opts.FieldTypes[i]}) + if err != nil { + return fmt.Errorf("get record type: %v", err) + } + if fieldType == Skip { + continue + } + + parsedValue, err := fieldType.ParseValue(v) + if err != nil { + return fmt.Errorf("parse value: %v", err) + } + + args = append(args, parsedValue) + } + + _, err = stmt.Exec(args...) + if err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return fmt.Errorf("execute statement: %v", err) + } + n++ + } + result, err := stmt.Exec() + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("execute statement: %v", err) + } + rowsAffected, err := result.RowsAffected() + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("calc rows affected: %v", err) + } + fmt.Fprintf(os.Stderr, "Processed %d records. \n", rowsAffected) + + err = stmt.Close() + if err != nil { + _ = tx.Rollback() + return fmt.Errorf("close statement: %v", err) + } + + err = tx.Commit() + if err != nil { + return fmt.Errorf("commit: %v", err) + } + + return nil +} diff --git a/make.bat b/make.bat new file mode 100644 index 0000000..5b21e00 --- /dev/null +++ b/make.bat @@ -0,0 +1 @@ +go build -trimpath -buildmode=pie -ldflags "-linkmode external -s -w" \ No newline at end of file diff --git a/sql.go b/sql.go new file mode 100644 index 0000000..f4a6561 --- /dev/null +++ b/sql.go @@ -0,0 +1,40 @@ +package main + +import ( + "database/sql" + "fmt" +) + +func createTable(tx *sql.Tx, header []string) error { + if opts.OverwriteTable { + _, err := tx.Exec(fmt.Sprintf("IF object_id('%s', 'U') IS NOT NULL DROP TABLE %s", opts.TableName, opts.TableName)) + if err != nil { + return fmt.Errorf("drop table: %v", err) + } + } + + sql := fmt.Sprintf("CREATE TABLE %s (", opts.TableName) + + for i, v := range header { + var fieldType FieldType + err := fieldType.UnmarshalText([]byte(v)) + if err != nil { + return fmt.Errorf("detect field type: %v", err) + } + + sql += fmt.Sprintf(`"%s" %s`, v, fieldType.SqlFieldType()) + + if i+1 < len(header) { + sql += ", " + } else { + sql += ") WITH (DATA_COMPRESSION = PAGE)" + } + } + + _, err := tx.Exec(sql) + if err != nil { + return fmt.Errorf("execute table creation: %v", err) + } + + return nil +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..e77225a --- /dev/null +++ b/types.go @@ -0,0 +1,105 @@ +package main + +import ( + "fmt" + "strconv" + "strings" + "time" +) + +type FieldType int + +const ( + Skip FieldType = iota + Integer + String + Float + Money + Date + Timestamp + TimestampWithoutSeconds +) + +func (ft FieldType) ParseValue(s string) (any, error) { + switch ft { + case String: + return s, nil + case Integer: + return strconv.ParseInt(s, 10, 64) + case Float: + return strconv.ParseFloat(strings.ReplaceAll(s, ",", "."), 64) + case Date: + return time.Parse("02.01.2006", s) + case Timestamp: + return time.Parse("02.01.2006 15:04:05", s) + case TimestampWithoutSeconds: + return time.Parse("02.01.2006 15:04", s) + } + + return nil, fmt.Errorf("unknown type id = %d", ft) +} + +func (ft FieldType) SqlFieldType() string { + switch ft { + case Integer: + return "bigint" + case String: + return "nvarchar(255)" + case Float: + return "float" + case Money: + panic("do not implemented - see https://github.com/denisenkom/go-mssqldb/issues/460") // TODO: https://github.com/denisenkom/go-mssqldb/issues/460 + case Date: + return "date" + case Timestamp, TimestampWithoutSeconds: + return "datetime2" + } + + return "" +} + +func (ft FieldType) MarshalText() (text []byte, err error) { + switch ft { + case Skip: + return []byte(" "), nil + case Integer: + return []byte("i"), nil + case String: + return []byte("s"), nil + case Float: + return []byte("f"), nil + case Money: + return []byte("m"), nil + case Date: + return []byte("d"), nil + case Timestamp: + return []byte("t"), nil + case TimestampWithoutSeconds: + return []byte("w"), nil + } + + return nil, fmt.Errorf("unknown type id = %d", ft) +} + +func (ft *FieldType) UnmarshalText(text []byte) error { + switch string(text) { + case " ": + *ft = Skip + case "i": + *ft = Integer + case "s": + *ft = String + case "f": + *ft = Float + case "m": + *ft = Money + case "d": + *ft = Date + case "t": + *ft = Timestamp + case "w": + *ft = TimestampWithoutSeconds + } + + return fmt.Errorf("unknown format code %s", string(text)) +} diff --git a/zip.go b/zip.go new file mode 100644 index 0000000..ed35ea8 --- /dev/null +++ b/zip.go @@ -0,0 +1,25 @@ +package main + +import ( + "archive/zip" + "fmt" +) + +func processZipFile(filePath string) error { + r, err := zip.OpenReader(filePath) + if err != nil { + return err + } + + if len(r.File) != 1 { + return fmt.Errorf("supported only one file in archive, got %d files", len(r.File)) + } + + zipFileReader, err := r.File[0].Open() + if err != nil { + return err + } + defer zipFileReader.Close() + + return processReader(zipFileReader) +}