omq/kernel.go
2023-11-17 20:34:20 +05:00

254 lines
6.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bytes"
"database/sql"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/dimchansky/utfbom"
go_ora "github.com/sijms/go-ora/v2"
"gopkg.in/ini.v1"
)
type Row struct {
isHeader bool
data []any
}
func launch(configFilePath, scriptFilePath string, exportFileFormat ExportFormat, encoding Encoding) error {
sqlBytes, err := readFileIgnoreBOM(scriptFilePath)
if err != nil {
return err
}
servers, err := loadConfig(configFilePath)
if err != nil {
return err
}
branchFieldNum := getBranchFieldNumber(string(sqlBytes))
if branchFieldNum <= 0 {
return fmt.Errorf("Некорректное значение номера поля филиала: %v", branchFieldNum)
}
rowsChan := iterateServers(servers, string(sqlBytes), branchFieldNum)
err = export(scriptFilePath, exportFileFormat, encoding, rowsChan)
if err != nil {
return err
}
return nil
}
func export(scriptFilePath string, exportFileFormat ExportFormat, encoding Encoding, inputRows chan Row) error {
converter, err := exportFileFormat.GetExporter(encoding)
if err != nil {
return err
}
fileName := filepath.Base(scriptFilePath)
fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName))
outputRows := make(chan []any)
go func() {
defer close(outputRows)
gotHeader := false
rowsCache := make([][]any, 0)
rowCount := -1
for row := range inputRows {
rowCount += 1
if gotHeader {
outputRows <- row.data
continue
}
if row.isHeader {
gotHeader = true
outputRows <- row.data
for _, cachedRow := range rowsCache {
outputRows <- cachedRow
}
} else {
rowsCache = append(rowsCache, row.data)
}
}
}()
return converter.Convert(fileName, outputRows)
}
func iterateServers(servers []Server, sqlStr string, branchFieldNum int) chan Row {
rowsChan := make(chan Row)
slog.Info("Выгрузка начата...")
go func() {
defer func() {
close(rowsChan)
}()
wg := new(sync.WaitGroup)
wg.Add(len(servers))
for i, server := range servers {
i := i
server := server
go func() {
defer wg.Done()
db, err := sql.Open("oracle", server.Url)
if err != nil {
slog.Error("Ошибка подключения к серверу", slog.String("server", server.Url), slog.Any("err", err))
return
}
rows, err := db.Query(sqlStr)
if err != nil {
slog.Error("Ошибка выполнения запроса", slog.String("server", server.Url), slog.Any("err", err))
return
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
slog.Error("Ошибка получения списка колонок", slog.String("server", server.Url), slog.Any("err", err))
return
}
if i == 0 {
rowsChan <- Row{isHeader: true, data: sliceToAnySlice[string](cols)} // Добавление заголовков
}
rowNum := 0
for rows.Next() {
pointers := make([]any, len(cols))
container := make([]any, len(cols))
for i := range pointers {
pointers[i] = &container[i]
}
err = rows.Scan(pointers...)
if err != nil {
slog.Error("Ошибка получения строки", slog.String("server", server.Url), slog.Any("err", err))
break
}
rowsChan <- Row{isHeader: false, data: append(append(container[:branchFieldNum-1], server.Name), container[branchFieldNum:]...)} // Добавление имени сервера
rowNum += 1
}
slog.Info("Получение строк завершено", slog.String("server", server.Name), slog.Int("rowCount", rowNum))
}()
}
wg.Wait()
}()
return rowsChan
}
// loadConfig считывает конфиг и возвращает список параметров серверов
func loadConfig(filePath string) ([]Server, error) {
servers := make([]Server, 0)
iniBytes, err := readFileIgnoreBOM(filePath)
if err != nil {
return nil, err
}
cfg, err := ini.Load(iniBytes)
if err != nil {
return nil, err
}
cfg.DeleteSection("DEFAULT")
for _, server := range cfg.SectionStrings() {
loginKey, err := cfg.Section(server).GetKey("Login")
if err != nil {
return nil, err
}
passwordKey, err := cfg.Section(server).GetKey("Password")
if err != nil {
return nil, err
}
nameKey, err := cfg.Section(server).GetKey("Name")
if err != nil {
return nil, err
}
serv := strings.Split(server, "/")[0]
service := strings.Split(server, "/")[1]
dbUrl := go_ora.BuildUrl(serv, 1521, service, loginKey.String(), passwordKey.String(), nil)
server := Server{
Url: dbUrl,
Name: nameKey.String()}
servers = append(servers, server)
}
return servers, nil
}
func sliceToAnySlice[T string | any](slice []T) []any {
result := make([]any, len(slice))
for i := range result {
result[i] = slice[i]
}
return result
}
func readFileIgnoreBOM(filePath string) ([]byte, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer f.Close()
b, err := io.ReadAll(utfbom.SkipOnly(f))
if err != nil {
return nil, err
}
return bytes.ReplaceAll(b, []byte("\r"), []byte{}), nil
}
// getBranchFieldNumber осуществляет определение номера колонки для подстановки
// наименования филиала. В первой строке SQL-скрипта должен быть комментарий,
// начинающийся на `// ` и содержащий только номер колонки (нумерация с 1).
func getBranchFieldNumber(sqlStr string) int {
lines := strings.Split(sqlStr, "\n")
if len(lines) == 0 {
return 1
}
line := lines[0]
if !strings.HasPrefix(line, "-- ") {
slog.Warn("Не указан номер колонки для вывода филиала, будет перезаписан первый столбец!")
return 1
}
line = strings.TrimPrefix(line, "-- ")
fieldNum, err := strconv.Atoi(line)
if err != nil {
slog.Warn("Неверно указан номер колонки для вывода филиала, будет перезаписан первый столбец!")
return 1
}
return fieldNum
}