omq/kernel.go
2024-03-05 21:41:51 +05:00

254 lines
5.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bytes"
"database/sql"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/dimchansky/utfbom"
go_ora "github.com/sijms/go-ora/v2"
)
type Row struct {
isHeader bool
data []any
}
type Job struct {
configFilePath string
scriptFilePath string
exportFileFormat ExportFormat
encoding Encoding
exportPath string
config Config
status string
err error
isFinished bool
exportedRows int
}
func (j *Job) init() error {
j.status = "Чтение списка серверов..."
config, err := loadConfig(j.configFilePath)
if err != nil {
return err
}
j.config = *config
return nil
}
func (j *Job) launch() error {
j.status = "Чтение файла SQL-скрипта"
sqlBytes, err := readFileIgnoreBOM(j.scriptFilePath)
if err != nil {
return err
}
branchFieldNum := getBranchFieldNumber(string(sqlBytes))
rowsChan := j.iterateServers(string(sqlBytes), branchFieldNum)
err = j.export(rowsChan)
if err != nil {
return err
}
return nil
}
func (j *Job) export(inputRows chan Row) error {
converter, err := j.exportFileFormat.GetExporter(j.encoding)
if err != nil {
return err
}
fileName := filepath.Join(j.exportPath, filepath.Base(j.scriptFilePath))
fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName))
outputRows := make(chan []any)
go func() {
defer close(outputRows)
gotHeader := false
rowsCache := make([][]any, 0)
for row := range inputRows {
j.exportedRows++
if gotHeader {
outputRows <- row.data
continue
}
if row.isHeader {
gotHeader = true
outputRows <- row.data
for _, cachedRow := range rowsCache {
outputRows <- cachedRow
}
} else {
rowsCache = append(rowsCache, row.data) // store data row in cache until got a header row
}
}
}()
err = converter.Convert(fileName, outputRows)
j.status = "Выгрузка завершена."
j.isFinished = true
if err != nil {
return err
}
return nil
}
func (j *Job) iterateServers(sqlStr string, branchFieldNum int) chan Row {
rowsChan := make(chan Row)
go func() {
defer func() {
close(rowsChan)
}()
wg := new(sync.WaitGroup)
wg.Add(len(j.config.Servers))
for i, server := range j.config.Servers {
i := i
server := server
go func() {
defer wg.Done()
serverUrl, err := server.Url()
if err != nil {
j.config.Servers[i].err = err
return
}
server.status = "Подключение к серверу..."
db := sql.OpenDB(go_ora.NewConnector(serverUrl))
defer db.Close()
err = db.Ping()
if err != nil {
server.err = err
return
}
server.status = "Выполнение SQL-запроса..."
rows, err := db.Query(sqlStr)
if err != nil {
server.err = err
return
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
server.err = err
return
}
if i == 0 {
rowsChan <- Row{isHeader: true, data: sliceToAnySlice[string](cols)} // Добавление заголовков
}
rowNum := 0
server.status = fmt.Sprintf("Выгружено %d строк...", rowNum)
for rows.Next() {
pointers := make([]any, len(cols))
container := make([]any, len(cols))
for i := range pointers {
pointers[i] = &container[i]
}
err = rows.Scan(pointers...)
if err != nil {
server.err = err
break
}
resultRow := make([]any, 0)
if branchFieldNum != 0 {
// Подстановка имени сервера
resultRow = append(append(container[:branchFieldNum-1], server.Name), container[branchFieldNum:]...)
} else {
resultRow = container
}
rowsChan <- Row{isHeader: false, data: resultRow}
rowNum += 1
server.status = fmt.Sprintf("Выгружено %d строк...", rowNum)
}
server.status = fmt.Sprintf("Выгружено %d строк.", rowNum)
}()
}
j.status = "Ожидание завершения работы с серверами"
wg.Wait()
}()
return rowsChan
}
func sliceToAnySlice[T string | any](slice []T) []any {
result := make([]any, len(slice))
for i := range result {
result[i] = slice[i]
}
return result
}
// readFileIgnoreBOM возвращает содержимое файла без BOM
func readFileIgnoreBOM(filePath string) ([]byte, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer f.Close()
b, err := io.ReadAll(utfbom.SkipOnly(f))
if err != nil {
return nil, err
}
return bytes.ReplaceAll(b, []byte("\r"), []byte{}), nil
}
// getBranchFieldNumber осуществляет определение номера колонки для подстановки
// наименования филиала. В первой строке SQL-скрипта должен быть комментарий,
// начинающийся на `// ` и содержащий только номер колонки (нумерация с 1).
func getBranchFieldNumber(sqlStr string) int {
lines := strings.Split(sqlStr, "\n")
if len(lines) == 0 {
return 0
}
line := lines[0]
if !strings.HasPrefix(line, "-- ") {
return 0
}
line = strings.TrimPrefix(line, "-- ")
fieldNum, err := strconv.Atoi(line)
if err != nil {
return 0
}
return fieldNum
}