package main import ( "bytes" "database/sql" "fmt" "io" "log/slog" "os" "path/filepath" "strconv" "strings" "sync" "github.com/dimchansky/utfbom" go_ora "github.com/sijms/go-ora/v2" "gopkg.in/ini.v1" ) type Row struct { isHeader bool data []any } func launch(configFilePath, scriptFilePath string, exportFileFormat ExportFormat, encoding Encoding) error { sqlBytes, err := readFileIgnoreBOM(scriptFilePath) if err != nil { return err } servers, err := loadConfig(configFilePath) if err != nil { return err } branchFieldNum := getBranchFieldNumber(string(sqlBytes)) if branchFieldNum <= 0 { return fmt.Errorf("Некорректное значение номера поля филиала: %v", branchFieldNum) } rowsChan := iterateServers(servers, string(sqlBytes), branchFieldNum) err = export(scriptFilePath, exportFileFormat, encoding, rowsChan) if err != nil { return err } return nil } func export(scriptFilePath string, exportFileFormat ExportFormat, encoding Encoding, inputRows chan Row) error { converter, err := exportFileFormat.GetExporter(encoding) if err != nil { return err } fileName := filepath.Base(scriptFilePath) fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName)) outputRows := make(chan []any) go func() { defer close(outputRows) gotHeader := false rowsCache := make([][]any, 0) rowCount := -1 for row := range inputRows { rowCount += 1 if gotHeader { outputRows <- row.data continue } if row.isHeader { gotHeader = true outputRows <- row.data for _, cachedRow := range rowsCache { outputRows <- cachedRow } } else { rowsCache = append(rowsCache, row.data) } } }() return converter.Convert(fileName, outputRows) } func iterateServers(servers []Server, sqlStr string, branchFieldNum int) chan Row { rowsChan := make(chan Row) slog.Info("Выгрузка начата...") go func() { defer func() { close(rowsChan) }() wg := new(sync.WaitGroup) wg.Add(len(servers)) for i, server := range servers { i := i server := server go func() { defer wg.Done() db, err := sql.Open("oracle", server.Url) if err != nil { slog.Error("Ошибка подключения к серверу", slog.String("server", server.Url), slog.Any("err", err)) return } rows, err := db.Query(sqlStr) if err != nil { slog.Error("Ошибка выполнения запроса", slog.String("server", server.Url), slog.Any("err", err)) return } defer rows.Close() cols, err := rows.Columns() if err != nil { slog.Error("Ошибка получения списка колонок", slog.String("server", server.Url), slog.Any("err", err)) return } if i == 0 { rowsChan <- Row{isHeader: true, data: sliceToAnySlice[string](cols)} // Добавление заголовков } rowNum := 0 for rows.Next() { pointers := make([]any, len(cols)) container := make([]any, len(cols)) for i := range pointers { pointers[i] = &container[i] } err = rows.Scan(pointers...) if err != nil { slog.Error("Ошибка получения строки", slog.String("server", server.Url), slog.Any("err", err)) break } rowsChan <- Row{isHeader: false, data: append(append(container[:branchFieldNum-1], server.Name), container[branchFieldNum:]...)} // Добавление имени сервера rowNum += 1 } slog.Info("Получение строк завершено", slog.String("server", server.Name), slog.Int("rowCount", rowNum)) }() } wg.Wait() }() return rowsChan } // loadConfig считывает конфиг и возвращает список параметров серверов func loadConfig(filePath string) ([]Server, error) { servers := make([]Server, 0) iniBytes, err := readFileIgnoreBOM(filePath) if err != nil { return nil, err } cfg, err := ini.Load(iniBytes) if err != nil { return nil, err } cfg.DeleteSection("DEFAULT") for _, server := range cfg.SectionStrings() { loginKey, err := cfg.Section(server).GetKey("Login") if err != nil { return nil, err } passwordKey, err := cfg.Section(server).GetKey("Password") if err != nil { return nil, err } nameKey, err := cfg.Section(server).GetKey("Name") if err != nil { return nil, err } serv := strings.Split(server, "/")[0] service := strings.Split(server, "/")[1] dbUrl := go_ora.BuildUrl(serv, 1521, service, loginKey.String(), passwordKey.String(), nil) server := Server{ Url: dbUrl, Name: nameKey.String()} servers = append(servers, server) } return servers, nil } func sliceToAnySlice[T string | any](slice []T) []any { result := make([]any, len(slice)) for i := range result { result[i] = slice[i] } return result } func readFileIgnoreBOM(filePath string) ([]byte, error) { f, err := os.Open(filePath) if err != nil { return nil, err } defer f.Close() b, err := io.ReadAll(utfbom.SkipOnly(f)) if err != nil { return nil, err } return bytes.ReplaceAll(b, []byte("\r"), []byte{}), nil } // getBranchFieldNumber осуществляет определение номера колонки для подстановки // наименования филиала. В первой строке SQL-скрипта должен быть комментарий, // начинающийся на `// ` и содержащий только номер колонки (нумерация с 1). func getBranchFieldNumber(sqlStr string) int { lines := strings.Split(sqlStr, "\n") if len(lines) == 0 { return 1 } line := lines[0] if !strings.HasPrefix(line, "-- ") { slog.Warn("Не указан номер колонки для вывода филиала, будет перезаписан первый столбец!") return 1 } line = strings.TrimPrefix(line, "-- ") fieldNum, err := strconv.Atoi(line) if err != nil { slog.Warn("Неверно указан номер колонки для вывода филиала, будет перезаписан первый столбец!") return 1 } return fieldNum }