上线我的 2.0

上线我的 2.0

马图图

岁月变迁何必不悔,尘世喧嚣怎能无愧。

20 文章数
1 评论数

Asynq - Go 语言异步任务队列框架

Matuto
2026-02-03 / 0 评论 / 59 阅读 / 0 点赞

一、什么是 Asynq?

Asynq 是一个基于 Redis 的 Go 语言异步任务队列框架,由 hibiken 开发维护。它提供了简洁的 API 来创建、入队和处理后台任务,非常适合处理:

  • 耗时操作:图片/视频生成、文件处理

  • 异步流程:邮件发送、第三方 API 调用

  • 定时任务:定期清理、数据同步

  • 重试机制:失败任务自动重试

二、核心概念

概念

说明

Client

任务生产者,负责创建和入队任务

Server

任务消费者,负责处理任务

Task

任务实体,包含类型和载荷

Handler

任务处理函数

Scheduler

定时任务调度器

三、快速开始

3.1 安装

go get github.com/hibiken/asynq```  
  
### 3.2 基本使用  
  
**创建任务客户端:**  
  
```go  
package queue  
  
import "github.com/hibiken/asynq"  
  
var Client *asynq.Client  
  
func InitClient() error {  
    Client = asynq.NewClient(asynq.RedisClientOpt{        Addr:     "127.0.0.1:6379",        Password: "",        DB:       0,    })    return nil}  

定义任务类型:

package worker  
  
import (  
    "encoding/json"    "github.com/hibiken/asynq")  
  
// 任务类型常量  
const (  
    TypeTaskSubmit = "task:submit"    TypeTaskPoll   = "task:poll"    TypeTaskUpload = "task:upload")  
  
// 任务载荷结构  
type TaskSubmitPayload struct {  
    TaskID uint `json:"task_id"`}  
  
// 创建任务  
func NewTaskSubmit(taskID uint) (*asynq.Task, error) {  
    payload, err := json.Marshal(TaskSubmitPayload{TaskID: taskID})    if err != nil {        return nil, err    }    return asynq.NewTask(TypeTaskSubmit, payload), nil}  
  
// 入队任务  
func EnqueueTaskSubmit(taskID uint) error {  
    task, err := NewTaskSubmit(taskID)    if err != nil {        return err    }    _, err = queue.Client.Enqueue(task)    return err}  

编写任务处理器:

package worker  
  
import (  
    "context"    "encoding/json"    "github.com/hibiken/asynq")  
  
func HandleTaskSubmit(ctx context.Context, t *asynq.Task) error {  
    // 1. 解析载荷  
    var payload TaskSubmitPayload    if err := json.Unmarshal(t.Payload(), &payload); err != nil {        return err    }  
    // 2. 处理业务逻辑  
    // ... 执行实际任务 ...  
    // 3. 链式入队下一个任务  
    return EnqueueTaskPoll(payload.TaskID, 0)}  

启动 Worker 服务:

package main  
  
import (  
    "github.com/hibiken/asynq"    "your-project/internal/worker")  
  
func startWorker() {  
    srv := asynq.NewServer(        asynq.RedisClientOpt{Addr: "127.0.0.1:6379"},        asynq.Config{            Concurrency: 10,            Queues: map[string]int{                "critical": 6,  // 高优先级  
                "default":  3,  // 默认  
                "low":      1,  // 低优先级  
            },        },    )  
    mux := asynq.NewServeMux()    mux.HandleFunc(worker.TypeTaskSubmit, worker.HandleTaskSubmit)    mux.HandleFunc(worker.TypeTaskPoll, worker.HandleTaskPoll)  
    srv.Run(mux)}  

四、高级特性

4.1 延迟入队

适用于轮询等需要间隔执行的场景:

// 5 秒后执行  
_, err := queue.Client.Enqueue(task, asynq.ProcessIn(5*time.Second))  
  
// 指定时间执行  
_, err := queue.Client.Enqueue(task, asynq.ProcessAt(time.Now().Add(1*time.Hour)))  

4.2 指定队列优先级

// 入队到高优先级队列  
_, err := queue.Client.Enqueue(task, asynq.Queue("critical"))  
  
// 入队到低优先级队列  
_, err := queue.Client.Enqueue(task, asynq.Queue("low"))  

4.3 自定义重试策略

// 最多重试 5 次  
_, err := queue.Client.Enqueue(task, asynq.MaxRetry(5))  
  
// 任务唯一性(防重复)  
_, err := queue.Client.Enqueue(task, asynq.Unique(1*time.Hour))  
  
// 设置超时  
_, err := queue.Client.Enqueue(task, asynq.Timeout(30*time.Second))  

4.4 定时任务调度器 (Scheduler)

func startScheduler() {  
    scheduler := asynq.NewScheduler(        asynq.RedisClientOpt{Addr: "127.0.0.1:6379"},        nil,    )  
    // 每 5 分钟执行一次超时检查  
    task := asynq.NewTask("task:timeout_check", nil)    scheduler.Register("*/5 * * * *", task)  
    scheduler.Run()}  

五、实战案例:图像生成任务流水线

以 Prism 项目为例,展示完整的任务链:

用户请求 -> Submit(提交) -> Poll(轮询) -> Upload(上传) -> Notify(回调)  

5.1 任务状态机

type TaskStatus string  
  
const (  
    TaskStatusPending    TaskStatus = "pending"     // 待提交  
    TaskStatusProcessing TaskStatus = "processing"  // 处理中  
    TaskStatusSuccess    TaskStatus = "success"     // 成功  
    TaskStatusFailed     TaskStatus = "failed"      // 失败  
)  

5.2 轮询处理器(带超时保护)

const MaxPollCount = 360 // 最多轮询 360 次,约 30 分钟  
  
func HandleTaskPoll(ctx context.Context, t *asynq.Task) error {  
    var payload TaskPollPayload    json.Unmarshal(t.Payload(), &payload)  
    // 超时保护  
    if payload.PollCount >= MaxPollCount {        taskService.UpdateTaskFail(payload.TaskID, "poll timeout")        return nil    }  
    // 查询上游进度  
    result, err := provider.GetProgress(ctx, task.VendorTaskID)  
    switch result.Status {    case StatusSuccess:        return enqueueUpload(task.ID, result.URLs)  
    case StatusFail:        taskService.UpdateTaskFail(task.ID, result.Error)        return nil  
    case StatusProcessing:        // 继续轮询,延迟 5 秒  
        return requeuePoll(payload.TaskID, payload.PollCount+1)    }  
    return nil}  
  
func requeuePoll(taskID uint, pollCount int) error {  
    payload := TaskPollPayload{TaskID: taskID, PollCount: pollCount}    payloadBytes, _ := json.Marshal(payload)    task := asynq.NewTask(TypeTaskPoll, payloadBytes)    _, err := queue.Client.Enqueue(task, asynq.ProcessIn(5*time.Second))    return err}  

5.3 定时超时检查

func HandleTaskTimeoutCheck(ctx context.Context, t *asynq.Task) error {  
    timeout := time.Now().Add(-30 * time.Minute)  
    var tasks []model.Task    model.DB().Where("status = ? AND updated_at < ?",        model.TaskStatusProcessing,        timeout,    ).Find(&tasks)  
    for _, task := range tasks {        taskService.UpdateTaskFail(task.ID, "task timeout")    }  
    return nil}  

六、监控与管理

6.1 Asynqmon Web UI

go install github.com/hibiken/asynqmon/cmd/asynqmon@latest  
asynqmon --port=8080 --redis-addr=127.0.0.1:6379```  
  
访问 `http://localhost:8080` 查看:  
  
- 队列状态  
- 任务详情  
- 失败任务  
- 定时任务  
  
### 6.2 编程式查询  
  
```go  
inspector := asynq.NewInspector(asynq.RedisClientOpt{Addr: "127.0.0.1:6379"})  
  
// 获取队列信息  
info, _ := inspector.GetQueueInfo("default")  
fmt.Printf("pending: %d, active: %d, completed: %d\n",  
    info.Pending, info.Active, info.Completed)  
// 删除失败任务  
inspector.DeleteAllArchivedTasks("default")  

七、最佳实践

实践

说明

幂等处理

任务可能重试,确保处理逻辑幂等

合理超时

设置任务超时,避免永久阻塞

优先级队列

关键任务使用高优先级队列

错误分类

区分可重试错误和永久错误

日志追踪

记录 TaskID 便于排查问题

健康检查

监控 Redis 连接和队列积压

八、与其他方案对比

特性

Asynq

Machinery

Go-workers

依赖

Redis

Redis/多种

Redis

API 复杂度

简单

中等

简单

定时任务

支持

支持

不支持

Web UI

支持

不支持

不支持

社区活跃度

九、Prism 项目中的 asynq 架构

9.1 文件结构

pkg/  
  queue/    queue.go           # Redis 队列客户端初始化  
  
internal/  
  worker/    worker.go          # Handler 注册  
    types.go           # 任务类型定义  
    submit_worker.go   # 提交处理器  
    poll_worker.go     # 轮询处理器  
    upload_worker.go   # 上传处理器  
    notify_worker.go   # 通知处理器  
    timeout_checker.go # 超时检查处理器  
  
cmd/  
  server/    main.go            # 应用启动入口  

9.2 任务流程图

                    +----------------+                    |   用户请求      |                    +-------+--------+                            |                            v                    +-------+--------+                    | API 创建 Task  |                    +-------+--------+                            |                            v                    +-------+--------+                    | EnqueueSubmit  |                    +-------+--------+                            |                            v              +-------------+-------------+              |    HandleTaskSubmit       |              |  - 获取渠道信息            |              |  - 调用上游 API            |              |  - 更新状态为 Processing   |              +-------------+-------------+                            |              +-------------+-------------+              |                           |              v                           v    +---------+----------+     +----------+---------+    |  Poll 模式         |     |  Callback 模式     |    |  EnqueuePoll       |     |  等待上游回调       |    +---------+----------+     +--------------------+              |              v    +---------+----------+    |   HandleTaskPoll   |  <--+    |  - 查询进度         |     |    |  - 更新进度         |     | 循环轮询  
    +---------+----------+     | (最多 360 次)  
              |                |              +----------------+              |              v (完成)  
    +---------+----------+    |  HandleTaskUpload  |    |  - 下载原始文件     |    |  - 上传到 COS       |    +---------+----------+              |              v    +---------+----------+    |  HandleTaskNotify  |    |  - 发送回调通知     |    +--------------------+```  
  
### 9.3 配置示例  
  
```yaml  
# configs/config.yaml  
redis:  
  addr: 127.0.0.1:6379  password: ""  db: 0  
worker:  
  concurrency: 10  poll_interval: 5s  max_retry: 3  

十、参考资料

下一篇
评论
来首音乐
光阴似箭
今日已经过去小时
这周已经过去
本月已经过去
今年已经过去个月
文章目录
每日一句