omq/kernel.go

253 lines
5.2 KiB
Go
Raw Permalink Normal View History

2023-11-17 20:34:20 +05:00
package main
import (
"bytes"
"database/sql"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/dimchansky/utfbom"
go_ora "github.com/sijms/go-ora/v2"
)
type Row struct {
isHeader bool
data []any
}
2023-11-20 20:34:09 +05:00
type Job struct {
configFilePath string
scriptFilePath string
exportFileFormat ExportFormat
encoding Encoding
config Config
2023-11-20 20:34:09 +05:00
status string
err error
isFinished bool
exportedRows int
2023-11-20 20:34:09 +05:00
}
func (j *Job) init() error {
j.status = "Чтение списка серверов..."
config, err := loadConfig(j.configFilePath)
2023-11-17 20:34:20 +05:00
if err != nil {
return err
}
j.config = *config
2023-11-20 20:34:09 +05:00
return nil
}
func (j *Job) launch() error {
j.status = "Чтение файла SQL-скрипта"
2023-11-20 20:34:09 +05:00
sqlBytes, err := readFileIgnoreBOM(j.scriptFilePath)
2023-11-17 20:34:20 +05:00
if err != nil {
return err
}
branchFieldNum := getBranchFieldNumber(string(sqlBytes))
2023-11-20 20:34:09 +05:00
rowsChan := j.iterateServers(string(sqlBytes), branchFieldNum)
2023-11-17 20:34:20 +05:00
2023-11-20 20:34:09 +05:00
err = j.export(rowsChan)
2023-11-17 20:34:20 +05:00
if err != nil {
return err
}
return nil
}
2023-11-20 20:34:09 +05:00
func (j *Job) export(inputRows chan Row) error {
converter, err := j.exportFileFormat.GetExporter(j.encoding)
2023-11-17 20:34:20 +05:00
if err != nil {
return err
}
2023-11-20 20:34:09 +05:00
fileName := filepath.Base(j.scriptFilePath)
2023-11-17 20:34:20 +05:00
fileName = strings.TrimSuffix(fileName, filepath.Ext(fileName))
outputRows := make(chan []any)
go func() {
defer close(outputRows)
gotHeader := false
rowsCache := make([][]any, 0)
for row := range inputRows {
j.exportedRows++
2023-11-17 20:34:20 +05:00
if gotHeader {
outputRows <- row.data
continue
}
if row.isHeader {
gotHeader = true
outputRows <- row.data
for _, cachedRow := range rowsCache {
outputRows <- cachedRow
}
} else {
2023-11-20 20:34:09 +05:00
rowsCache = append(rowsCache, row.data) // store data row in cache until got a header row
2023-11-17 20:34:20 +05:00
}
}
}()
2023-11-20 20:34:09 +05:00
err = converter.Convert(fileName, outputRows)
j.status = "Выгрузка завершена."
2023-11-20 20:34:09 +05:00
j.isFinished = true
if err != nil {
return err
}
return nil
2023-11-17 20:34:20 +05:00
}
2023-11-20 20:34:09 +05:00
func (j *Job) iterateServers(sqlStr string, branchFieldNum int) chan Row {
2023-11-17 20:34:20 +05:00
rowsChan := make(chan Row)
go func() {
defer func() {
close(rowsChan)
}()
wg := new(sync.WaitGroup)
wg.Add(len(j.config.Servers))
2023-11-17 20:34:20 +05:00
for i, server := range j.config.Servers {
2023-11-17 20:34:20 +05:00
i := i
server := server
go func() {
defer wg.Done()
serverUrl, err := server.Url()
if err != nil {
j.config.Servers[i].err = err
return
}
server.status = "Подключение к серверу..."
db := sql.OpenDB(go_ora.NewConnector(serverUrl))
2023-11-20 20:34:09 +05:00
defer db.Close()
err = db.Ping()
2023-11-17 20:34:20 +05:00
if err != nil {
server.err = err
2023-11-17 20:34:20 +05:00
return
}
server.status = "Выполнение SQL-запроса..."
2023-11-17 20:34:20 +05:00
rows, err := db.Query(sqlStr)
if err != nil {
server.err = err
2023-11-17 20:34:20 +05:00
return
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
server.err = err
2023-11-17 20:34:20 +05:00
return
}
if i == 0 {
rowsChan <- Row{isHeader: true, data: sliceToAnySlice[string](cols)} // Добавление заголовков
}
rowNum := 0
server.status = fmt.Sprintf("Выгружено %d строк...", rowNum)
2023-11-17 20:34:20 +05:00
for rows.Next() {
2023-11-20 20:34:09 +05:00
2023-11-17 20:34:20 +05:00
pointers := make([]any, len(cols))
container := make([]any, len(cols))
for i := range pointers {
pointers[i] = &container[i]
}
err = rows.Scan(pointers...)
if err != nil {
server.err = err
2023-11-17 20:34:20 +05:00
break
}
resultRow := make([]any, 0)
if branchFieldNum != 0 {
// Подстановка имени сервера
resultRow = append(append(container[:branchFieldNum-1], server.Name), container[branchFieldNum:]...)
} else {
resultRow = container
}
rowsChan <- Row{isHeader: false, data: resultRow}
2023-11-17 20:34:20 +05:00
rowNum += 1
server.status = fmt.Sprintf("Выгружено %d строк...", rowNum)
2023-11-17 20:34:20 +05:00
}
server.status = fmt.Sprintf("Выгружено %d строк.", rowNum)
2023-11-17 20:34:20 +05:00
}()
}
2023-11-20 20:34:09 +05:00
j.status = "Ожидание завершения работы с серверами"
2023-11-17 20:34:20 +05:00
wg.Wait()
}()
return rowsChan
}
func sliceToAnySlice[T string | any](slice []T) []any {
result := make([]any, len(slice))
for i := range result {
result[i] = slice[i]
}
return result
}
// readFileIgnoreBOM возвращает содержимое файла без BOM
2023-11-17 20:34:20 +05:00
func readFileIgnoreBOM(filePath string) ([]byte, error) {
f, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer f.Close()
b, err := io.ReadAll(utfbom.SkipOnly(f))
if err != nil {
return nil, err
}
return bytes.ReplaceAll(b, []byte("\r"), []byte{}), nil
}
// getBranchFieldNumber осуществляет определение номера колонки для подстановки
// наименования филиала. В первой строке SQL-скрипта должен быть комментарий,
// начинающийся на `// ` и содержащий только номер колонки (нумерация с 1).
func getBranchFieldNumber(sqlStr string) int {
lines := strings.Split(sqlStr, "\n")
if len(lines) == 0 {
return 0
2023-11-17 20:34:20 +05:00
}
line := lines[0]
if !strings.HasPrefix(line, "-- ") {
return 0
2023-11-17 20:34:20 +05:00
}
line = strings.TrimPrefix(line, "-- ")
fieldNum, err := strconv.Atoi(line)
if err != nil {
return 0
2023-11-17 20:34:20 +05:00
}
return fieldNum
}