本文共 5826 字,大约阅读时间需要 19 分钟。
定时任务 robfig/cron
go get github.com/robfig/cron/v3@v3.0.0
https://github.com/robfig/cron
https://godoc.org/github.com/robfig/cron注意:v3开始执行策略为5个参数,不是6个,实际上秒级的定时任务也没有存在的意义。
robfig/cron 是一个常驻内存的进程,通过多个协程内部的定时器来来实现定时任务,因此需要考虑一下几个问题。
因为你无法预测一个任务要多久才能被执行完,比如有一个任务没十分钟执行一次,但是无法保证每一次都在十分钟之内就执行完毕。
默认情况下,新的任务会开始重新执行,原先正在执行的任务还在继续执行,相当于是并行了。
{ "* * * * *", fun1}func fun1() { fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111 start") for i := 0; i < 40; i++ { fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111") time.Sleep(time.Second * 2) } fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 1111 end")}func fun2() { fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 2222 start") time.Sleep(time.Second * 10) fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "job 2222 end")}
可以通过设置 Job Wrappers 来控制
c := cron.New(cron.WithChain( cron.SkipIfStillRunning(FileLogger{ }),))测试 { "* * * * *", fun1},{ "*/2 * * * *", fun2},
然而,SkipIfStillRunning 的控制并不是我理解的那样,我想说的是相同的任务如何避免并行的情况,而它控制的却是不同任务避免并行的情况。
SkipIfStillRunning 会控制你所有的任务池中,在同一时刻只能有一个在运行,这样会导致有些任务永远无法被运行,这显然不能满足需求,我想要的是同一个任务只有一个在运行,而不同的任务各自不受影响。
而如果使用 DelayIfStillRunning 的话,不同任务可以并行,相同任务串行,冲突的任务会延迟执行,这会导致任务可能不是设置的时刻执行的,且延迟超过1分钟会有日志。
貌似 DelayIfStillRunning 勉强能满足需求。
不会
每一个任务都是在协程里面运行的。不会
Stop 会暂停任务调度
,不会触发新的任务了,但是正在运行的任务不会受到影响,可通过调用 Start 再次启动任务调度,与 Start 是相反的操作。 没法知道,需要自己实现,在执行前后埋点,改变变量的值。
两者是不同的效果,前者是从启动是开始计时,后者是按照绝对时间来的。
https://www.jianshu.com/p/fd3dda663953
命令行文件 CronTab.go
/*-- @Time : 2020/9/27 18:02-- @Author : raoxiaoya-- @Desc : CronTabexample:./voteapi CronTab --action=run./voteapi CronTab --action=stop*/package CronTabimport ( "fmt" "github.com/robfig/cron/v3" "log" "time" "voteapi/commands/Interfaces" "voteapi/pkg/logging" "voteapi/pkg/util")var startTime = time.Now().Unix()var JobStatus = make(map[string]int8)var isRunning bool// 自定义loggertype FileLogger struct{ }func (fl FileLogger) Info(msg string, keysAndValues ...interface{ }) { logging.Info(msg, keysAndValues)}func (fl FileLogger) Error(err error, msg string, keysAndValues ...interface{ }) { logging.Error(err, msg, keysAndValues)}type CronTab struct { Interfaces.CommandInfo}func (c CronTab) GetCommand() (cmd Interfaces.CommandInfo) { return Interfaces.CommandInfo{ Signature: "CronTab", Description: "CronTab Schedule", }}func (c CronTab) Handle() { param := util.GetTerminalInput() action, ok := param["action"] if !ok { log.Fatal(fmt.Sprintf("the command %s missing parameter action", c.GetCommand().Signature)) } switch action { case "run": run(c) case "stop": c.SendStopSignal(c.GetCommand().Signature) }}func run(c CronTab) { defer func() { logging.Info("CronTab is stopped") if err := recover(); err != nil { logging.Info(err) // 自启动 c.Reboot() } }() logging.Info("CronTab is starting") cr := cron.New(cron.WithChain( cron.DelayIfStillRunning(FileLogger{ }), )) jobLen := len(Schedule) if jobLen == 0 { return } for _, v := range Schedule { // 注意golang的闭包问题,不能直接使用v.T, v.Job t := v.T j := v.Job name := v.Name JobStatus[name] = 0 _, err := cr.AddFunc(t, func() { defer func() { if err := recover(); err != nil { logging.Info("Job Error: ", err) } JobStatus[name] = 0 }() JobStatus[name] = 1 start := time.Now().Unix() logging.Info(name + " start") j() // 执行任务 end := time.Now().Unix() logging.Info(name+" end, time = ", end-start) }) if err != nil { logging.Info("AddFunc Error: ", err) return } } cr.Start() isRunning = true for { if ok, _ := c.CheckIfNeedExit(c.GetCommand().Signature, startTime); ok { if isRunning { cr.Stop() isRunning = false } else { stopedJobLen := 0 for _, v := range JobStatus { if v == 0 { stopedJobLen++ } } if stopedJobLen == jobLen { break } } } time.Sleep(time.Second * 1) }}
定义任务 CronJobs.go
package CronTabimport ( "voteapi/service/Vote2Service")type JobMap struct { T string Job func() Name string}var Schedule = []JobMap{ JobMap{ "*/10 * * * *", Vote2StoreVisiteLog, "Vote2StoreVisiteLog"},}// vote2: 回写总访问数,总投票数,作品访问量func Vote2StoreVisiteLog() { err := Vote2Service.Vote2StoreVisiteLog() if err != nil { panic(err) }}
其他
// 命令行模式下获取输入参数,flag 不适合// ./voteapi Vote2MQConsumer --action=runfunc GetTerminalInput() (param map[string]string) { param = make(map[string]string) list := os.Args if len(list) <= 2 { return param } for _, v := range list { if strings.Contains(v, "--") { arr := strings.Split(v, "=") key := string(([]byte(arr[0]))[2:]) param[key] = arr[1] } } return param}func (c CommandInfo) Reboot() { // 延迟执行,防止无限循环 time.Sleep(time.Minute * 1) path := os.Args[0] var args []string if len(os.Args) > 1 { args = os.Args[1:] } cmd := exec.Command(path, args...) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.Env = os.Environ() err := cmd.Start() if err != nil { logging.Fatal("Reboot: Failed to launch, error: %v", err) } else { logging.Info(fmt.Sprintf("Reboot %s success", c.Signature)) }}
1、robfig/cron
定时任务需要被当成常驻内存的进程来运行。
SendStopSignal
就是向一个文件写入当前时间戳,CheckIfNeedExit
就是比较程序启动时的时间戳和信号时间戳。 启动命令 nohup ./voteapi CronTab --action=run > /dev/null 2>&1 &
./voteapi CronTab --action=stop
查看进程 ps -ef |grep CronTab
转载地址:http://ycaui.baihongyu.com/