后台任务处理 - Oban


在 Web 应用中,邮件发送、数据批处理等耗时操作若阻塞主线程,会严重影响用户体验和响应速度。本文介绍如何用 Oban 来异步处理这些后台任务,完整示例是创建用户时发送欢迎邮件。

环境要求

依赖版本
PostgreSQL16.8+
Erlang27.2+
Elixir1.18+
Phoenix1.17+
Oban2.19+
Cookiecutter2.6+

初始化项目

cookiecutter git@github.com:seangong0/cookiecutter-phoenix.git \
    --no-input \
    app_name=oban_demo
cd oban_demo
mix setup

Oban 队列配置

config/config.exs 中配置 Oban:

config :oban_demo, Oban,
  engine: Oban.Engines.Basic,
  queues: [
    # 默认队列,并发数 10
    default: 10,
    # 邮件队列,并发数 5
    emails: 5,
    # 高优先级队列,并发数 1
    critical: 1
  ],
  repo: ObanDemo.Repo,
  prefix: "oban"

配置说明:

  • engine: 使用 Basic 引擎,适合单节点应用
  • queues: 定义队列及并发数,数字越小优先级越高
  • prefix: 表前缀,支持多租户场景

创建 User Context

mix phx.gen.context Accounts User users name:string email:string:unique

修改 lib/oban_demo/accounts/user.ex,继承 ObanDemo.Schema

use ObanDemo.Schema

配置测试邮件服务

我们使用 Mailtrap 的 SMTP 服务来测试邮件发送。

注册并获取凭证

注册后在 Email Testing -> Inboxes -> Integration 中获取 SMTP 配置:

配置项
Hostsandbox.smtp.mailtrap.io
Port25, 465, 587 或 2525
Username从 Mailtrap 获取
Password从 Mailtrap 获取
AuthPLAIN, LOGIN, CRAM-MD5
TLS可选 (所有端口支持 STARTTLS)

添加依赖

# mix.exs
defp deps do
  [
    {:dotenvy, "~> 1.0.1"},
    {:swoosh, "~> 1.18.2"},
    {:gen_smtp, "~> 1.2.0"}
  ]
end

配置邮件发送

config/runtime.exs:

import Dotenvy

if config_env() != :test do
  env_dir_prefix = System.get_env("RELEASE_ROOT") || Path.expand("./")

  source!([
    Path.absname(".env", env_dir_prefix),
    System.get_env()
  ])

  config :oban_demo, ObanDemo.Mailer,
    adapter: Swoosh.Adapters.SMTP,
    relay: env!("EMAIL_HOST", :string!),
    port: env!("EMAIL_PORT", :integer!),
    username: env!("EMAIL_USERNAME", :string!),
    password: env!("EMAIL_PASSWORD", :string!),
    ssl: :if_available,
    tls: :never,
    auth: :always,
    retries: 2,
    no_mx_lookup: false
end

lib/oban_demo/mailer.ex:

defmodule ObanDemo.Mailer do
  use Swoosh.Mailer, otp_app: :oban_demo
end

定义邮件模板

lib/oban_demo/user_email.ex:

defmodule ObanDemo.UserEmail do
  import Swoosh.Email

  def welcome(user) do
    new()
    |> to({user.name, user.email})
    |> from({"Oban Demo", "noreply@example.com"})
    |> subject("欢迎注册 Oban Demo")
    |> html_body("<h1>Hello #{user.name}</h1>")
    |> text_body("Hello #{user.name}\n")
  end
end

创建 Oban Worker

lib/oban_demo/workers/email_worker.ex:

defmodule ObanDemo.Workers.EmailWorker do
  require Logger
  use Oban.Worker, queue: :emails, max_attempts: 3

  alias ObanDemo.Accounts
  alias ObanDemo.Mailer
  alias ObanDemo.UserEmail

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
    case Accounts.get_user(user_id) do
      nil ->
        {:error, "User #{user_id} not found"}

      user ->
        send_welcome_email(user)
    end
  end

  defp send_welcome_email(user) do
    user
    |> UserEmail.welcome()
    |> Mailer.deliver()
    |> handle_email_delivery()
  end

  defp handle_email_delivery({:ok, _}), do: :ok

  defp handle_email_delivery({:error, reason}) do
    Logger.error("邮件发送失败: #{inspect(reason)}")
    {:error, reason}
  end
end

参数说明:

  • queue: 指定任务队列名称
  • max_attempts: 最大重试次数,失败后会进入 retry 队列

.env 配置示例

EMAIL_HOST=sandbox.smtp.mailtrap.io
EMAIL_PORT=2525
EMAIL_USERNAME=your_username
EMAIL_PASSWORD=your_password

用户创建时触发邮件发送

lib/oban_demo/accounts.ex:

def create_user(attrs \\ %{}) do
  %User{}
  |> User.changeset(attrs)
  |> Repo.insert()
  |> case do
    {:ok, user} ->
      # 异步插入 Oban 任务
      %{"user_id" => user.id}
      |> ObanDemo.Workers.EmailWorker.new()
      |> Oban.insert()

      {:ok, user}

    {:error, changeset} ->
      {:error, changeset}
  end
end

测试验证

# 启动服务器
iex -S mix phx.server

# 创建用户,触发邮件发送
ObanDemo.Accounts.create_user(%{name: "test", email: "test@example.com"})

执行后登录 Mailtrap 查看是否收到欢迎邮件。

编写 Worker 测试

test/oban_demo/workers/email_worker_test.exs:

defmodule ObanDemo.Workers.EmailWorkerTest do
  use ObanDemo.DataCase, async: true
  use Oban.Testing, repo: ObanDemo.Repo

  alias ObanDemo.Workers.EmailWorker

  test "发送欢迎邮件" do
    {:ok, %{id: user_id}} =
      ObanDemo.Accounts.create_user(%{name: "test", email: "test@example.com"})

    assert :ok = perform_job(EmailWorker, %{"user_id" => user_id})
  end

  test "用户不存在时返回错误" do
    fake_user_id = Ecto.UUID.generate()

    assert {:error, reason} = perform_job(EmailWorker, %{"user_id" => fake_user_id})
    assert reason =~ "User #{fake_user_id} not found"
  end
end

运行测试:

mix test test/oban_demo/workers/email_worker_test.exs

总结

本文介绍了:

  1. Oban 队列配置 - 定义不同优先级的任务队列
  2. 邮件发送集成 - 使用 Swoosh + SMTP
  3. Worker 实现 - 定义后台任务处理逻辑
  4. 任务触发 - 在业务操作中插入异步任务
  5. 测试方法 - 使用 Oban.Testing 进行单元测试

完整示例代码:oban_demo