Asynq 是一个基于 Redis 的 Go 语言异步任务队列框架,由 hibiken 开发维护。它提供了简洁的 API 来创建、入队和处理后台任务,非常适合处理:
耗时操作:图片/视频生成、文件处理
异步流程:邮件发送、第三方 API 调用
定时任务:定期清理、数据同步
重试机制:失败任务自动重试
|
概念 |
说明 |
|---|---|
|
Client |
任务生产者,负责创建和入队任务 |
|
Server |
任务消费者,负责处理任务 |
|
Task |
任务实体,包含类型和载荷 |
|
Handler |
任务处理函数 |
|
Scheduler |
定时任务调度器 |
go get github.com/hibiken/asynq```
### 3.2 基本使用
**创建任务客户端:**
```go
package queue
import "github.com/hibiken/asynq"
var Client *asynq.Client
func InitClient() error {
Client = asynq.NewClient(asynq.RedisClientOpt{ Addr: "127.0.0.1:6379", Password: "", DB: 0, }) return nil}
定义任务类型:
package worker
import (
"encoding/json" "github.com/hibiken/asynq")
// 任务类型常量
const (
TypeTaskSubmit = "task:submit" TypeTaskPoll = "task:poll" TypeTaskUpload = "task:upload")
// 任务载荷结构
type TaskSubmitPayload struct {
TaskID uint `json:"task_id"`}
// 创建任务
func NewTaskSubmit(taskID uint) (*asynq.Task, error) {
payload, err := json.Marshal(TaskSubmitPayload{TaskID: taskID}) if err != nil { return nil, err } return asynq.NewTask(TypeTaskSubmit, payload), nil}
// 入队任务
func EnqueueTaskSubmit(taskID uint) error {
task, err := NewTaskSubmit(taskID) if err != nil { return err } _, err = queue.Client.Enqueue(task) return err}
编写任务处理器:
package worker
import (
"context" "encoding/json" "github.com/hibiken/asynq")
func HandleTaskSubmit(ctx context.Context, t *asynq.Task) error {
// 1. 解析载荷
var payload TaskSubmitPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return err }
// 2. 处理业务逻辑
// ... 执行实际任务 ...
// 3. 链式入队下一个任务
return EnqueueTaskPoll(payload.TaskID, 0)}
启动 Worker 服务:
package main
import (
"github.com/hibiken/asynq" "your-project/internal/worker")
func startWorker() {
srv := asynq.NewServer( asynq.RedisClientOpt{Addr: "127.0.0.1:6379"}, asynq.Config{ Concurrency: 10, Queues: map[string]int{ "critical": 6, // 高优先级
"default": 3, // 默认
"low": 1, // 低优先级
}, }, )
mux := asynq.NewServeMux() mux.HandleFunc(worker.TypeTaskSubmit, worker.HandleTaskSubmit) mux.HandleFunc(worker.TypeTaskPoll, worker.HandleTaskPoll)
srv.Run(mux)}
适用于轮询等需要间隔执行的场景:
// 5 秒后执行
_, err := queue.Client.Enqueue(task, asynq.ProcessIn(5*time.Second))
// 指定时间执行
_, err := queue.Client.Enqueue(task, asynq.ProcessAt(time.Now().Add(1*time.Hour)))
// 入队到高优先级队列
_, err := queue.Client.Enqueue(task, asynq.Queue("critical"))
// 入队到低优先级队列
_, err := queue.Client.Enqueue(task, asynq.Queue("low"))
// 最多重试 5 次
_, err := queue.Client.Enqueue(task, asynq.MaxRetry(5))
// 任务唯一性(防重复)
_, err := queue.Client.Enqueue(task, asynq.Unique(1*time.Hour))
// 设置超时
_, err := queue.Client.Enqueue(task, asynq.Timeout(30*time.Second))
func startScheduler() {
scheduler := asynq.NewScheduler( asynq.RedisClientOpt{Addr: "127.0.0.1:6379"}, nil, )
// 每 5 分钟执行一次超时检查
task := asynq.NewTask("task:timeout_check", nil) scheduler.Register("*/5 * * * *", task)
scheduler.Run()}
以 Prism 项目为例,展示完整的任务链:
用户请求 -> Submit(提交) -> Poll(轮询) -> Upload(上传) -> Notify(回调)
type TaskStatus string
const (
TaskStatusPending TaskStatus = "pending" // 待提交
TaskStatusProcessing TaskStatus = "processing" // 处理中
TaskStatusSuccess TaskStatus = "success" // 成功
TaskStatusFailed TaskStatus = "failed" // 失败
)
const MaxPollCount = 360 // 最多轮询 360 次,约 30 分钟
func HandleTaskPoll(ctx context.Context, t *asynq.Task) error {
var payload TaskPollPayload json.Unmarshal(t.Payload(), &payload)
// 超时保护
if payload.PollCount >= MaxPollCount { taskService.UpdateTaskFail(payload.TaskID, "poll timeout") return nil }
// 查询上游进度
result, err := provider.GetProgress(ctx, task.VendorTaskID)
switch result.Status { case StatusSuccess: return enqueueUpload(task.ID, result.URLs)
case StatusFail: taskService.UpdateTaskFail(task.ID, result.Error) return nil
case StatusProcessing: // 继续轮询,延迟 5 秒
return requeuePoll(payload.TaskID, payload.PollCount+1) }
return nil}
func requeuePoll(taskID uint, pollCount int) error {
payload := TaskPollPayload{TaskID: taskID, PollCount: pollCount} payloadBytes, _ := json.Marshal(payload) task := asynq.NewTask(TypeTaskPoll, payloadBytes) _, err := queue.Client.Enqueue(task, asynq.ProcessIn(5*time.Second)) return err}
func HandleTaskTimeoutCheck(ctx context.Context, t *asynq.Task) error {
timeout := time.Now().Add(-30 * time.Minute)
var tasks []model.Task model.DB().Where("status = ? AND updated_at < ?", model.TaskStatusProcessing, timeout, ).Find(&tasks)
for _, task := range tasks { taskService.UpdateTaskFail(task.ID, "task timeout") }
return nil}
go install github.com/hibiken/asynqmon/cmd/asynqmon@latest
asynqmon --port=8080 --redis-addr=127.0.0.1:6379```
访问 `http://localhost:8080` 查看:
- 队列状态
- 任务详情
- 失败任务
- 定时任务
### 6.2 编程式查询
```go
inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: "127.0.0.1:6379"})
// 获取队列信息
info, _ := inspector.GetQueueInfo("default")
fmt.Printf("pending: %d, active: %d, completed: %d\n",
info.Pending, info.Active, info.Completed)
// 删除失败任务
inspector.DeleteAllArchivedTasks("default")
|
实践 |
说明 |
|---|---|
|
幂等处理 |
任务可能重试,确保处理逻辑幂等 |
|
合理超时 |
设置任务超时,避免永久阻塞 |
|
优先级队列 |
关键任务使用高优先级队列 |
|
错误分类 |
区分可重试错误和永久错误 |
|
日志追踪 |
记录 TaskID 便于排查问题 |
|
健康检查 |
监控 Redis 连接和队列积压 |
|
特性 |
Asynq |
Machinery |
Go-workers |
|---|---|---|---|
|
依赖 |
Redis |
Redis/多种 |
Redis |
|
API 复杂度 |
简单 |
中等 |
简单 |
|
定时任务 |
支持 |
支持 |
不支持 |
|
Web UI |
支持 |
不支持 |
不支持 |
|
社区活跃度 |
高 |
中 |
低 |
pkg/
queue/ queue.go # Redis 队列客户端初始化
internal/
worker/ worker.go # Handler 注册
types.go # 任务类型定义
submit_worker.go # 提交处理器
poll_worker.go # 轮询处理器
upload_worker.go # 上传处理器
notify_worker.go # 通知处理器
timeout_checker.go # 超时检查处理器
cmd/
server/ main.go # 应用启动入口
+----------------+ | 用户请求 | +-------+--------+ | v +-------+--------+ | API 创建 Task | +-------+--------+ | v +-------+--------+ | EnqueueSubmit | +-------+--------+ | v +-------------+-------------+ | HandleTaskSubmit | | - 获取渠道信息 | | - 调用上游 API | | - 更新状态为 Processing | +-------------+-------------+ | +-------------+-------------+ | | v v +---------+----------+ +----------+---------+ | Poll 模式 | | Callback 模式 | | EnqueuePoll | | 等待上游回调 | +---------+----------+ +--------------------+ | v +---------+----------+ | HandleTaskPoll | <--+ | - 查询进度 | | | - 更新进度 | | 循环轮询
+---------+----------+ | (最多 360 次)
| | +----------------+ | v (完成)
+---------+----------+ | HandleTaskUpload | | - 下载原始文件 | | - 上传到 COS | +---------+----------+ | v +---------+----------+ | HandleTaskNotify | | - 发送回调通知 | +--------------------+```
### 9.3 配置示例
```yaml
# configs/config.yaml
redis:
addr: 127.0.0.1:6379 password: "" db: 0
worker:
concurrency: 10 poll_interval: 5s max_retry: 3