后台任务处理 - Oban
在 Web 应用中,邮件发送、数据批处理等耗时操作若阻塞主线程,会严重影响用户体验和响应速度。本文介绍如何用 Oban 来异步处理这些后台任务,完整示例是创建用户时发送欢迎邮件。
环境要求
| 依赖 | 版本 |
|---|---|
| PostgreSQL | 16.8+ |
| Erlang | 27.2+ |
| Elixir | 1.18+ |
| Phoenix | 1.17+ |
| Oban | 2.19+ |
| Cookiecutter | 2.6+ |
初始化项目
cookiecutter git@github.com:seangong0/cookiecutter-phoenix.git \
--no-input \
app_name=oban_demo
cd oban_demo
mix setup
Oban 队列配置
在 config/config.exs 中配置 Oban:
config :oban_demo, Oban,
engine: Oban.Engines.Basic,
queues: [
# 默认队列,并发数 10
default: 10,
# 邮件队列,并发数 5
emails: 5,
# 高优先级队列,并发数 1
critical: 1
],
repo: ObanDemo.Repo,
prefix: "oban"
配置说明:
- engine: 使用 Basic 引擎,适合单节点应用
- queues: 定义队列及并发数,数字越小优先级越高
- prefix: 表前缀,支持多租户场景
创建 User Context
mix phx.gen.context Accounts User users name:string email:string:unique
修改 lib/oban_demo/accounts/user.ex,继承 ObanDemo.Schema:
use ObanDemo.Schema
配置测试邮件服务
我们使用 Mailtrap 的 SMTP 服务来测试邮件发送。
注册并获取凭证
注册后在 Email Testing -> Inboxes -> Integration 中获取 SMTP 配置:
| 配置项 | 值 |
|---|---|
| Host | sandbox.smtp.mailtrap.io |
| Port | 25, 465, 587 或 2525 |
| Username | 从 Mailtrap 获取 |
| Password | 从 Mailtrap 获取 |
| Auth | PLAIN, LOGIN, CRAM-MD5 |
| TLS | 可选 (所有端口支持 STARTTLS) |
添加依赖
# mix.exs
defp deps do
[
{:dotenvy, "~> 1.0.1"},
{:swoosh, "~> 1.18.2"},
{:gen_smtp, "~> 1.2.0"}
]
end
配置邮件发送
config/runtime.exs:
import Dotenvy
if config_env() != :test do
env_dir_prefix = System.get_env("RELEASE_ROOT") || Path.expand("./")
source!([
Path.absname(".env", env_dir_prefix),
System.get_env()
])
config :oban_demo, ObanDemo.Mailer,
adapter: Swoosh.Adapters.SMTP,
relay: env!("EMAIL_HOST", :string!),
port: env!("EMAIL_PORT", :integer!),
username: env!("EMAIL_USERNAME", :string!),
password: env!("EMAIL_PASSWORD", :string!),
ssl: :if_available,
tls: :never,
auth: :always,
retries: 2,
no_mx_lookup: false
end
lib/oban_demo/mailer.ex:
defmodule ObanDemo.Mailer do
use Swoosh.Mailer, otp_app: :oban_demo
end
定义邮件模板
lib/oban_demo/user_email.ex:
defmodule ObanDemo.UserEmail do
import Swoosh.Email
def welcome(user) do
new()
|> to({user.name, user.email})
|> from({"Oban Demo", "noreply@example.com"})
|> subject("欢迎注册 Oban Demo")
|> html_body("<h1>Hello #{user.name}</h1>")
|> text_body("Hello #{user.name}\n")
end
end
创建 Oban Worker
lib/oban_demo/workers/email_worker.ex:
defmodule ObanDemo.Workers.EmailWorker do
require Logger
use Oban.Worker, queue: :emails, max_attempts: 3
alias ObanDemo.Accounts
alias ObanDemo.Mailer
alias ObanDemo.UserEmail
@impl Oban.Worker
def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
case Accounts.get_user(user_id) do
nil ->
{:error, "User #{user_id} not found"}
user ->
send_welcome_email(user)
end
end
defp send_welcome_email(user) do
user
|> UserEmail.welcome()
|> Mailer.deliver()
|> handle_email_delivery()
end
defp handle_email_delivery({:ok, _}), do: :ok
defp handle_email_delivery({:error, reason}) do
Logger.error("邮件发送失败: #{inspect(reason)}")
{:error, reason}
end
end
参数说明:
- queue: 指定任务队列名称
- max_attempts: 最大重试次数,失败后会进入 retry 队列
.env 配置示例
EMAIL_HOST=sandbox.smtp.mailtrap.io
EMAIL_PORT=2525
EMAIL_USERNAME=your_username
EMAIL_PASSWORD=your_password
用户创建时触发邮件发送
lib/oban_demo/accounts.ex:
def create_user(attrs \\ %{}) do
%User{}
|> User.changeset(attrs)
|> Repo.insert()
|> case do
{:ok, user} ->
# 异步插入 Oban 任务
%{"user_id" => user.id}
|> ObanDemo.Workers.EmailWorker.new()
|> Oban.insert()
{:ok, user}
{:error, changeset} ->
{:error, changeset}
end
end
测试验证
# 启动服务器
iex -S mix phx.server
# 创建用户,触发邮件发送
ObanDemo.Accounts.create_user(%{name: "test", email: "test@example.com"})
执行后登录 Mailtrap 查看是否收到欢迎邮件。
编写 Worker 测试
test/oban_demo/workers/email_worker_test.exs:
defmodule ObanDemo.Workers.EmailWorkerTest do
use ObanDemo.DataCase, async: true
use Oban.Testing, repo: ObanDemo.Repo
alias ObanDemo.Workers.EmailWorker
test "发送欢迎邮件" do
{:ok, %{id: user_id}} =
ObanDemo.Accounts.create_user(%{name: "test", email: "test@example.com"})
assert :ok = perform_job(EmailWorker, %{"user_id" => user_id})
end
test "用户不存在时返回错误" do
fake_user_id = Ecto.UUID.generate()
assert {:error, reason} = perform_job(EmailWorker, %{"user_id" => fake_user_id})
assert reason =~ "User #{fake_user_id} not found"
end
end
运行测试:
mix test test/oban_demo/workers/email_worker_test.exs
总结
本文介绍了:
- Oban 队列配置 - 定义不同优先级的任务队列
- 邮件发送集成 - 使用 Swoosh + SMTP
- Worker 实现 - 定义后台任务处理逻辑
- 任务触发 - 在业务操作中插入异步任务
- 测试方法 - 使用 Oban.Testing 进行单元测试
完整示例代码:oban_demo