1
0
mirror of https://github.com/nxshock/gron.git synced 2024-11-27 03:41:00 +05:00
gron/job.go

330 lines
7.3 KiB
Go
Raw Permalink Normal View History

2022-03-26 13:23:39 +05:00
package main
import (
2022-05-10 19:44:41 +05:00
"database/sql"
"errors"
"fmt"
2022-03-29 21:38:04 +05:00
"io"
2022-03-26 13:23:39 +05:00
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"github.com/BurntSushi/toml"
2022-03-29 21:38:04 +05:00
formatter "github.com/antonfisher/nested-logrus-formatter"
2022-03-26 13:23:39 +05:00
log "github.com/sirupsen/logrus"
)
2022-03-29 19:45:31 +05:00
// JobConfig is a TOML representation of job
2022-03-26 13:23:39 +05:00
type JobConfig struct {
2022-05-10 19:44:41 +05:00
Type JobType
2022-10-21 20:12:41 +05:00
Category string
2022-05-10 19:44:41 +05:00
// JobType = Cmd
Command string // command for execution
// JobType = Sql
Driver string
ConnectionString string
SqlText string
// Other fields
2022-03-29 21:38:04 +05:00
Cron string // cron decription
Description string // job description
NumberOfRestartAttemts int
RestartSec int // the time to sleep before restarting a job (seconds)
RestartRule RestartRule // Configures whether the job shall be restarted when the job process exits
2022-06-21 22:14:50 +05:00
OnSuccessMessageFmt string // Success message format
OnErrorMessageFmt string // Error message format
OnSuccessCmd string
OnErrorCmd string
2022-06-21 22:14:50 +05:00
OnSuccessHttpPostUrl string
OnErrorHttpPostUrl string
OnSuccessHttpGetUrl string
OnErrorHttpGetUrl string
2022-03-26 13:23:39 +05:00
}
2022-03-28 19:55:43 +05:00
type Job struct {
Name string // from filename
2022-03-29 21:38:04 +05:00
JobConfig JobConfig
2022-03-28 19:55:43 +05:00
// Fields for stats
Status Status
2022-03-28 19:55:43 +05:00
CurrentRunningCount int
LastStartTime string
LastEndTime string
LastExecutionDuration string
LastError string
NextLaunch string
}
2022-03-26 20:38:39 +05:00
var globalMutex sync.RWMutex
2022-03-26 13:23:39 +05:00
func readJob(filePath string) (*Job, error) {
var jobConfig JobConfig
_, err := toml.DecodeFile(filePath, &jobConfig)
if err != nil {
return nil, err
}
2022-05-10 19:44:41 +05:00
if jobConfig.Type <= 0 {
return nil, errors.New("job type is not specified")
}
if jobConfig.Type > 2 {
return nil, fmt.Errorf("unknown job type id: %v", int(jobConfig.Type)) // TODO: add job name to log
}
2022-03-26 13:23:39 +05:00
job := &Job{
2022-03-29 19:45:31 +05:00
Name: strings.TrimSuffix(filepath.Base(filePath), filepath.Ext(filePath)),
Status: Inactive,
2022-03-29 19:45:31 +05:00
JobConfig: jobConfig}
2022-03-26 13:23:39 +05:00
return job, nil
}
func splitCommandAndParams(s string) (command string, params []string) {
2022-03-29 19:45:31 +05:00
quoted := false
items := strings.FieldsFunc(s, func(r rune) bool {
2022-03-29 19:45:31 +05:00
if r == '"' {
quoted = !quoted
}
return !quoted && r == ' '
})
for i := range items {
items[i] = strings.Trim(items[i], `"`)
}
return items[0], items[1:]
}
// logEntry - logger which merged with main logger,
// jobLogFile - job log file with needs to be closed after job is done
func (j *Job) openAndMergeLog() (logEntry *log.Entry, jobLogFile *os.File) {
jobLogFile, _ = os.OpenFile(filepath.Join(config.LogFilesPath, j.Name+".log"), os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) // TODO: handle error
2022-11-05 14:07:55 +05:00
_, _ = jobLogFile.WriteString("\n")
2022-03-26 13:23:39 +05:00
2022-05-11 18:59:33 +05:00
logWriter := io.MultiWriter(mainLogFile, jobLogFile)
2022-03-26 13:23:39 +05:00
2022-03-29 21:38:04 +05:00
log := log.New()
log.SetFormatter(&formatter.Formatter{
TimestampFormat: config.TimeFormat,
HideKeys: true,
NoColors: true,
TrimMessages: true})
log.SetOutput(logWriter)
logEntry = log.WithField("job", j.Name)
return logEntry, jobLogFile
}
func (j *Job) Run() {
log, jobLogFile := j.openAndMergeLog()
defer jobLogFile.Close()
2022-03-26 13:23:39 +05:00
2022-03-29 21:38:04 +05:00
for i := 0; i < j.JobConfig.NumberOfRestartAttemts+1; i++ {
2022-05-10 21:36:34 +05:00
err := j.runTry(log, jobLogFile)
2022-03-29 21:38:04 +05:00
if err == nil {
break
}
if j.JobConfig.RestartRule == No || j.JobConfig.NumberOfRestartAttemts == 0 {
break
}
2022-03-26 13:23:39 +05:00
2022-03-29 21:38:04 +05:00
if i == 0 {
log.Printf("Job failed, restarting in %d seconds.", j.JobConfig.RestartSec)
j.Status = Restarting
} else if i < j.JobConfig.NumberOfRestartAttemts {
j.Status = Restarting
log.Printf("Retry attempt №%d of %d failed, restarting in %d seconds.", i, j.JobConfig.NumberOfRestartAttemts, j.JobConfig.RestartSec)
2022-03-29 21:38:04 +05:00
} else {
log.Printf("Retry attempt №%d of %d failed.", i, j.JobConfig.NumberOfRestartAttemts)
2022-03-29 21:38:04 +05:00
}
time.Sleep(time.Duration(j.JobConfig.RestartSec) * time.Second)
}
2022-03-26 13:23:39 +05:00
}
2022-05-10 19:44:41 +05:00
2022-05-10 21:36:34 +05:00
func (j *Job) runTry(log *log.Entry, jobLogFile *os.File) error {
log.Info("Started.")
startTime := time.Now()
globalMutex.Lock()
j.CurrentRunningCount++
j.Status = Running
j.LastStartTime = startTime.Format(config.TimeFormat)
globalMutex.Unlock()
var err error
switch j.JobConfig.Type {
case Cmd:
err = j.runCmd(jobLogFile)
case Sql:
err = j.runSql(jobLogFile)
}
if err != nil {
j.Status = Error
log.Error(err.Error())
globalMutex.Lock()
j.LastError = err.Error()
globalMutex.Unlock()
} else {
j.Status = Inactive
globalMutex.Lock()
j.LastError = ""
globalMutex.Unlock()
}
2022-11-05 14:07:55 +05:00
go j.runFinishCallback(log, err)
2022-05-10 21:36:34 +05:00
endTime := time.Now()
log.Infof("Finished (%s).", endTime.Sub(startTime).Truncate(time.Second).String())
globalMutex.Lock()
j.CurrentRunningCount--
j.LastEndTime = endTime.Format(config.TimeFormat)
j.LastExecutionDuration = endTime.Sub(startTime).Truncate(time.Second).String()
globalMutex.Unlock()
return err
}
2022-05-10 19:44:41 +05:00
func (j *Job) runCmd(jobLogFile *os.File) error {
command, params := splitCommandAndParams(j.JobConfig.Command)
2022-05-10 19:44:41 +05:00
cmd := exec.Command(command, params...)
cmd.Stdout = jobLogFile
cmd.Stderr = jobLogFile
return cmd.Run()
}
func (j *Job) runSql(jobLogFile *os.File) error {
2022-05-14 14:17:52 +05:00
var (
db *sql.DB
err error
)
switch j.JobConfig.Driver {
case "mssql", "sqlserver":
db, err = openMsSqlDb(j.JobConfig.ConnectionString, jobLogFile)
default:
db, err = sql.Open(j.JobConfig.Driver, j.JobConfig.ConnectionString)
}
2022-05-10 19:44:41 +05:00
if err != nil {
return err
}
defer db.Close()
_, err = db.Exec(j.JobConfig.SqlText)
if err != nil {
return err
}
return nil
}
2022-11-05 14:07:55 +05:00
func (j *Job) runFinishCallback(log *log.Entry, jobErr error) error {
s := j.JobConfig.OnSuccessCmd
// Fill variables
errStr := ""
2022-11-05 14:07:55 +05:00
if jobErr != nil {
errStr = jobErr.Error()
}
2022-11-05 14:07:55 +05:00
if jobErr != nil {
2022-06-21 22:14:50 +05:00
s = format(s, struct{ Error string }{Error: errStr})
}
2022-11-05 14:07:55 +05:00
if jobErr == nil && j.JobConfig.OnSuccessCmd != "" {
cmd, params := splitCommandAndParams(s)
return runSimpleCmd(cmd, params...)
}
2022-11-05 14:07:55 +05:00
if jobErr == nil && j.JobConfig.OnSuccessHttpPostUrl != "" {
err := httpPost(j.JobConfig.OnSuccessHttpPostUrl, j.successMessage())
if err != nil {
log.Error(err)
}
2022-06-21 22:14:50 +05:00
}
2022-11-05 14:07:55 +05:00
if jobErr == nil && j.JobConfig.OnSuccessHttpGetUrl != "" {
err := httpGet(j.JobConfig.OnSuccessHttpPostUrl, j.Name, j.successMessage())
if err != nil {
log.Error(err)
}
2022-06-21 22:14:50 +05:00
}
2022-11-05 14:07:55 +05:00
if jobErr != nil && j.JobConfig.OnErrorCmd != "" {
cmd, params := splitCommandAndParams(s)
return runSimpleCmd(cmd, params...)
}
2022-11-05 14:07:55 +05:00
if jobErr != nil && j.JobConfig.OnErrorHttpPostUrl != "" {
err := httpPost(j.JobConfig.OnErrorHttpPostUrl, j.errorMessage(jobErr))
if err != nil {
log.Error(err)
}
2022-06-21 22:14:50 +05:00
}
2022-11-05 14:07:55 +05:00
if jobErr != nil && j.JobConfig.OnErrorHttpGetUrl != "" {
err := httpGet(j.JobConfig.OnErrorHttpGetUrl, j.Name, jobErr.Error())
if err != nil {
log.Errorf("OnErrorHttpGetUrl error: %v", err) // TODO: сделать формат сообщения по стандарту
2022-06-21 22:14:50 +05:00
}
}
return nil
}
2022-06-21 22:14:50 +05:00
func (j *Job) successMessage() string {
s := j.JobConfig.OnSuccessMessageFmt
if s == "" {
s = defaultOnSuccessMessageFmt
}
v := struct {
JobName string
}{
JobName: j.Name}
return format(s, v)
}
func (j *Job) errorMessage(err error) string {
s := j.JobConfig.OnErrorMessageFmt
if s == "" {
s = defaultOnErrorMessageFmt
}
v := struct {
JobName string
Error string
}{
JobName: j.Name,
Error: err.Error()}
return format(s, v)
}
func runSimpleCmd(command string, args ...string) error {
log.Println(command)
log.Println(args)
err := exec.Command(command, args...).Run()
if err != nil {
log.Println(">>", err)
}
return err
}