0%

golang cron v3 定时任务

最近需要在 golang 中使用的定时任务,用到了一个 cron 库,现在的是 v3 版本,网上挺多都是 v2 的教程

在以前的旧版本的里面默认的 cron 表示不是标准的,第一个位是秒级的定义。
现在 v3 版本直接用标准 cron 表示式就行了,主要看 godoc 文档部分

cron 表示式

推荐使用在线工具来看自己写的 cron 对不对,不过简单的一般问题不大,这里推荐一个 crontab.guru

以下内容摘自维基百科

1
2
3
4
5
6
7
8
# 文件格式說明
# ┌──分鐘(0 - 59)
# │ ┌──小時(0 - 23)
# │ │ ┌──日(1 - 31)
# │ │ │ ┌─月(1 - 12)
# │ │ │ │ ┌─星期(0 - 6,表示从周日到周六)
# │ │ │ │ │
# * * * * * 被執行的命令

注:
在某些系统里,星期日也可以为 7
不很直观的用法:如果日期和星期同时被设定,那么其中的一个条件被满足时,指令便会被执行。请参考下例。
前 5 个域称之分时日月周,可方便个人记忆。
从第六个域起,指明要执行的命令。

安装

现在都是用的 Go module 进行模块的管理,直接在 goland 中使用 alt + 回车即可同步对应的包 “github.com/robfig/cron/v3”

使用 go get 安装方式如下

1
go get github.com/robfig/cron/v3

创建配置

建议使用标准的 cron 表达式

1
2
3
4
5
6
// 使用默认的配置
c := cron.New()
// 可以配置如果当前任务正在进行,那么跳过
c := cron.New(cron.WithChain(cron.SkipIfStillRunning(logger)))
// 官方也提供了旧版本的秒级的定义,这个注意你需要传入的 cron 表达式不再是标准 cron 表达式
c := cron.New(cron.WithSeconds())

在上面的代码中出现了一个 logger,我使用的是 logrus,在源码中可以看到 cron 需要的 logger 的定义

1
2
3
4
5
6
7
8
// Logger is the interface used in this package for logging, so that any backend
// can be plugged in. It is a subset of the github.com/go-logr/logr interface.
type Logger interface {
// Info logs routine messages about cron's operation.
Info(msg string, keysAndValues ...interface{})
// Error logs an error condition.
Error(err error, msg string, keysAndValues ...interface{})
}

那么我们定义了一个 Clog 结构体,实现对应的接口就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import (
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
)

type CLog struct {
clog *log.Logger
}

func (l *CLog) Info(msg string, keysAndValues ...interface{}) {
l.clog.WithFields(log.Fields{
"data": keysAndValues,
}).Info(msg)
}

func (l *CLog) Error(err error, msg string, keysAndValues ...interface{}) {
l.clog.WithFields(log.Fields{
"msg": msg,
"data": keysAndValues,
}).Warn(msg)
}

添加任务

传入函数

还有一个部分是任务调度,我们看到文档中给出的范例,可以看到任务的添加是通过 c.AddFunc() 这个函数来进行的,直接传入一个函数即可,可以看到定义是 func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Runs at 6am in time.Local
cron.New().AddFunc("0 6 * * ?", ...)

# Runs at 6am in America/New_York
nyc, _ := time.LoadLocation("America/New_York")
c := cron.New(cron.WithLocation(nyc))
c.AddFunc("0 6 * * ?", ...)

// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}

举个例子,如果你传入的任务仅仅就是一个简单函数进行执行,使用 AddFunc() 就行了,同时也可以通过闭包来引用函数外面的变量,下面是一个完整的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"github.com/robfig/cron/v3"
"time"
)

func TestCron() {
c := cron.New()
i := 1
c.AddFunc("*/1 * * * *", func() {
fmt.Println("每分钟执行一次", i)
i++
})
c.Start()
time.Sleep(time.Minute * 5)
}
func main() {
TestCron()
}

/* output

每分钟执行一次 1
每分钟执行一次 2
每分钟执行一次 3
每分钟执行一次 4
每分钟执行一次 5
*/

传入任务

但是如果我们定义的任务里面还需要留存其他信息呢,可以使用 AddJob() 这个函数,追溯一下源码定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}

// 可以看到需要传入两个参数,`spec` 就是 cron 表达式,Job 类型我们好像还没见过,点进去看
// Job is an interface for submitted cron jobs.
type Job interface {
Run()
}

现在知道我们任务只需要实现 Run() 这个函数就行了,所以我们可以给出自己的 Job 定义
但是这里有一点需要注意,在 Run() 里面获取的变量都是 Job 初始化时的值,如果想要修改,请使用指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Job struct {
A int `json:"a"`
B *int `json:"b"`
C *string `json:"c"`
Shut chan int `json:"shut"` // 这里定义了一个管道,可以用来关闭任务
}

// implement Run() interface to start rsync job
func (this Job) Run() {
this.A++
fmt.Printf("A: %d\n", this.A)
*this.B++
fmt.Printf("B: %d\n", *this.B)
*this.C += "str"
fmt.Printf("C: %s\n", *this.C)
}

代码例子

给出一个完整代码的示例,我封装了一个 StartJob 函数,方便自己的管理,当然在 c.AddJob()可添加多个任务,都会 cron 的要求执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package main

import (
"fmt"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
"time"
)

// 定时任务计划
/*
- spec,传入 cron 时间设置
- job,对应执行的任务
*/
func StartJob(spec string, job Job) {
logger := &CLog{clog: log.New()}
logger.clog.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
})
// cron.WithChain(cron.SkipIfStillRunning(logger)) 是配置选项
// 如果当前任务正在执行,那就跳过
c := cron.New(cron.WithChain(cron.SkipIfStillRunning(logger)))

// 这里可以添加多个任务,多次 c.AddJob()
c.AddJob(spec, job)

// 启动执行任务
c.Start()
// 退出时关闭计划任务
defer c.Stop()

// 如果使用 select{} 那么就一直会循环
select {
case <-job.Shut:
return
}
}

// 可以调用 StopJob 来关闭任务
func StopJob(shut chan int) {
shut <- 0
}

type CLog struct {
clog *log.Logger
}

func (l *CLog) Info(msg string, keysAndValues ...interface{}) {
l.clog.WithFields(log.Fields{
"data": keysAndValues,
}).Info(msg)
}

func (l *CLog) Error(err error, msg string, keysAndValues ...interface{}) {
l.clog.WithFields(log.Fields{
"msg": msg,
"data": keysAndValues,
}).Warn(msg)
}

type Job struct {
A int `json:"a"`
B *int `json:"b"`
C *string `json:"c"`
Shut chan int `json:"shut"`
}

// implement Run() interface to start rsync job
func (this Job) Run() {
this.A++
fmt.Printf("A: %d\n", this.A)
*this.B++
fmt.Printf("B: %d\n", *this.B)
*this.C += "str"
fmt.Printf("C: %s\n", *this.C)
}

func main() {
job1 := Job{
A: 0,
B: new(int),
C: new(string),
Shut: make(chan int, 1),
}
// 每分钟执行一次
go StartJob("*/1 * * * *", job1)
time.Sleep(time.Minute * 3)
}
/*
output

A: 1
B: 1
C: str
A: 1
B: 2
C: strstr
A: 1
B: 3
C: strstrstr
*/

总结

  • 这个 cron 库的 v3 版本直接使用标准 cron 表达式即可
  • 启动 cron 任务有添加函数和添加对象两种方法,如果需要管理建议实现自己的 Job 类
  • 在使用 Job 类的时候,每次任务启动类中对象值都是创建时的值,如果需要动态修改可以使用指针或者其他存储方式(全局缓存,数据库,消息队列等)

参考资料

如果对您有帮助,请我喝杯咖啡?