package main import ( "bytes" "database/sql" "fmt" "io" "log/slog" "os" "path/filepath" "strconv" "strings" "sync" "github.com/dimchansky/utfbom" go_ora "github.com/sijms/go-ora/v2" ) type Row struct { isHeader bool data []any } type Job struct { configFilePath string scriptFilePath string exportFileFormat ExportFormat encoding Encoding servers []Server status string isFinished bool } func (j *Job) init() error { j.status = "Чтение списка серверов..." servers, err := loadConfig(j.configFilePath) if err != nil { return err } j.servers = servers return nil } func (j *Job) launch() error { j.status = "Чтение файла SQL-скрипта..." sqlBytes, err := readFileIgnoreBOM(j.scriptFilePath) if err != nil { return err } branchFieldNum := getBranchFieldNumber(string(sqlBytes)) if branchFieldNum <= 0 { return fmt.Errorf("Некорректное значение номера поля филиала: %v", branchFieldNum) } rowsChan := j.iterateServers(string(sqlBytes), branchFieldNum) err = j.export(rowsChan) if err != nil { return err } return nil } func (j *Job) export(inputRows chan Row) error { converter, err := j.exportFileFormat.GetExporter(j.encoding) if err != nil { return err } fileName := filepath.Base(j.scriptFilePath) fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName)) outputRows := make(chan []any) go func() { defer close(outputRows) gotHeader := false rowsCache := make([][]any, 0) rowCount := -1 for row := range inputRows { rowCount += 1 if gotHeader { outputRows <- row.data continue } if row.isHeader { gotHeader = true outputRows <- row.data for _, cachedRow := range rowsCache { outputRows <- cachedRow } } else { rowsCache = append(rowsCache, row.data) // store data row in cache until got a header row } } }() err = converter.Convert(fileName, outputRows) j.status = "ЗАВЕРШЕНО." j.isFinished = true if err != nil { return err } return nil } func (j *Job) iterateServers(sqlStr string, branchFieldNum int) chan Row { rowsChan := make(chan Row) go func() { defer func() { close(rowsChan) }() wg := new(sync.WaitGroup) wg.Add(len(j.servers)) for i, server := range j.servers { i := i server := server go func() { defer wg.Done() j.servers[i].Status = "Подключение к серверу" db := sql.OpenDB(go_ora.NewConnector(server.Url)) defer db.Close() err := db.Ping() if err != nil { j.servers[i].Error = err return } j.servers[i].Status = "Выполнение SQL-запроса" rows, err := db.Query(sqlStr) if err != nil { j.servers[i].Error = err return } defer rows.Close() cols, err := rows.Columns() if err != nil { j.servers[i].Error = err return } if i == 0 { rowsChan <- Row{isHeader: true, data: sliceToAnySlice[string](cols)} // Добавление заголовков } rowNum := 0 for rows.Next() { j.servers[i].Status = fmt.Sprintf("Выгружено %d строк", rowNum) pointers := make([]any, len(cols)) container := make([]any, len(cols)) for i := range pointers { pointers[i] = &container[i] } err = rows.Scan(pointers...) if err != nil { j.servers[i].Error = err break } rowsChan <- Row{isHeader: false, data: append(append(container[:branchFieldNum-1], server.Name), container[branchFieldNum:]...)} // Добавление имени сервера rowNum += 1 } j.servers[i].Status = fmt.Sprintf("ЗАВЕРШЕНО: Выгружено %d строк.", rowNum) }() } j.status = "Ожидание завершения работы с серверами" wg.Wait() }() return rowsChan } func sliceToAnySlice[T string | any](slice []T) []any { result := make([]any, len(slice)) for i := range result { result[i] = slice[i] } return result } func readFileIgnoreBOM(filePath string) ([]byte, error) { f, err := os.Open(filePath) if err != nil { return nil, err } defer f.Close() b, err := io.ReadAll(utfbom.SkipOnly(f)) if err != nil { return nil, err } return bytes.ReplaceAll(b, []byte("\r"), []byte{}), nil } // getBranchFieldNumber осуществляет определение номера колонки для подстановки // наименования филиала. В первой строке SQL-скрипта должен быть комментарий, // начинающийся на `// ` и содержащий только номер колонки (нумерация с 1). func getBranchFieldNumber(sqlStr string) int { lines := strings.Split(sqlStr, "\n") if len(lines) == 0 { return 1 } line := lines[0] if !strings.HasPrefix(line, "-- ") { slog.Warn("Не указан номер колонки для вывода филиала, будет перезаписан первый столбец!") return 1 } line = strings.TrimPrefix(line, "-- ") fieldNum, err := strconv.Atoi(line) if err != nil { slog.Warn("Неверно указан номер колонки для вывода филиала, будет перезаписан первый столбец!") return 1 } return fieldNum }