package main import ( "bytes" "database/sql" "fmt" "io" "os" "path/filepath" "strconv" "strings" "sync" "github.com/dimchansky/utfbom" go_ora "github.com/sijms/go-ora/v2" ) type Row struct { isHeader bool data []any } type Job struct { configFilePath string scriptFilePath string exportFileFormat ExportFormat encoding Encoding exportPath string config Config status string isFinished bool exportedRows int } func (j *Job) init() error { j.status = "Чтение списка серверов..." config, err := loadConfig(j.configFilePath) if err != nil { return err } j.config = *config return nil } func (j *Job) launch() error { j.status = "Чтение файла SQL-скрипта" sqlBytes, err := readFileIgnoreBOM(j.scriptFilePath) if err != nil { return err } branchFieldNum := getBranchFieldNumber(string(sqlBytes)) rowsChan := j.iterateServers(string(sqlBytes), branchFieldNum) err = j.export(rowsChan) if err != nil { return err } return nil } func (j *Job) export(inputRows chan Row) error { converter, err := j.exportFileFormat.GetExporter(j.encoding) if err != nil { return err } fileName := filepath.Join(j.exportPath, filepath.Base(j.scriptFilePath)) fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName)) outputRows := make(chan []any) go func() { defer close(outputRows) gotHeader := false rowsCache := make([][]any, 0) for row := range inputRows { j.exportedRows++ if gotHeader { outputRows <- row.data continue } if row.isHeader { gotHeader = true outputRows <- row.data for _, cachedRow := range rowsCache { outputRows <- cachedRow } } else { rowsCache = append(rowsCache, row.data) // store data row in cache until got a header row } } }() err = converter.Convert(fileName, outputRows) j.status = "Выгрузка завершена." j.isFinished = true if err != nil { return err } return nil } func (j *Job) iterateServers(sqlStr string, branchFieldNum int) chan Row { rowsChan := make(chan Row) go func() { defer func() { close(rowsChan) }() wg := new(sync.WaitGroup) wg.Add(len(j.config.Servers)) for i, server := range j.config.Servers { i := i server := server go func() { defer wg.Done() serverUrl, err := server.Url() if err != nil { j.config.Servers[i].err = err return } server.status = "Подключение к серверу..." db := sql.OpenDB(go_ora.NewConnector(serverUrl)) defer db.Close() err = db.Ping() if err != nil { server.err = err return } server.status = "Выполнение SQL-запроса..." rows, err := db.Query(sqlStr) if err != nil { server.err = err return } defer rows.Close() cols, err := rows.Columns() if err != nil { server.err = err return } if i == 0 { rowsChan <- Row{isHeader: true, data: sliceToAnySlice[string](cols)} // Добавление заголовков } rowNum := 0 server.status = fmt.Sprintf("Выгружено %d строк...", rowNum) for rows.Next() { pointers := make([]any, len(cols)) container := make([]any, len(cols)) for i := range pointers { pointers[i] = &container[i] } err = rows.Scan(pointers...) if err != nil { server.err = err break } var resultRow []any if branchFieldNum != 0 { // Подстановка имени сервера resultRow = append(append(container[:branchFieldNum-1], server.Name), container[branchFieldNum:]...) } else { resultRow = container } rowsChan <- Row{isHeader: false, data: resultRow} rowNum += 1 server.status = fmt.Sprintf("Выгружено %d строк...", rowNum) } server.status = fmt.Sprintf("Выгружено %d строк.", rowNum) }() } j.status = "Ожидание завершения работы с серверами" wg.Wait() }() return rowsChan } func sliceToAnySlice[T string | any](slice []T) []any { result := make([]any, len(slice)) for i := range result { result[i] = slice[i] } return result } // readFileIgnoreBOM возвращает содержимое файла без BOM func readFileIgnoreBOM(filePath string) ([]byte, error) { f, err := os.Open(filePath) if err != nil { return nil, err } defer f.Close() b, err := io.ReadAll(utfbom.SkipOnly(f)) if err != nil { return nil, err } return bytes.ReplaceAll(b, []byte("\r"), []byte{}), nil } // getBranchFieldNumber осуществляет определение номера колонки для подстановки // наименования филиала. В первой строке SQL-скрипта должен быть комментарий, // начинающийся на `// ` и содержащий только номер колонки (нумерация с 1). func getBranchFieldNumber(sqlStr string) int { lines := strings.Split(sqlStr, "\n") if len(lines) == 0 { return 0 } line := lines[0] if !strings.HasPrefix(line, "-- ") { return 0 } line = strings.TrimPrefix(line, "-- ") fieldNum, err := strconv.Atoi(line) if err != nil { return 0 } return fieldNum }