后台任务处理 - Oban


在 Web 应用中,诸如邮件发送、数据批处理等耗时操作若阻塞主线程,将严重影响实时响应。 所以我们需要 Oban 来处理这些耗时操作。 下面的示例,是使用 Oban 在创建用户时,发送邮件。

环境要求

  • PostgreSQL 16.8+
  • Erlang 27.2+
  • Elixir 1.18+
  • Phoenix 1.17+
  • Oban 2.19+
  • Cookiecutter 2.6+

初始化项目

cookiecutter git@github.com:seangong0/cookiecutter-phoenix.git \
    --no-input \
    app_name=oban_demo
cd oban_demo
mix setup

重新配置 oban 队列

# File: config/config.exs
config :oban_demo, Oban,
  engine: Oban.Engines.Basic,
  queues: [
    # 默认队列
    default: 10,
    # 邮件队列
    emails: 5,
    # 高优先级队列
    critical: 1
  ],
  repo: ObanDemo.Repo,
  prefix: "oban"

创建 User Context

mix phx.gen.context \
  Accounts User users \
  name:string email:string:unique

# File: lib/oban_demo/accounts/user.ex
use Ecto.Schema
import Ecto.Changeset

# 替换为
use ObanDemo.Schema

测试邮箱申请

Phoenix 提供了 Swoosh 但处理邮件发送,但我们的模板创建中没有带,而且和真实的环境有些区别。 这次我们使用 mailtrap.io SMTP 来测试邮件发送。

注册帐号,然后在 Email Testing -> Inboxes -> Integration 中,mailtrap 我们提供了 smtp 相关信息。

  • Host: sandbox.smtp.mailtrap.io
  • Port: 25, 465, 587 or 2525
  • Username: xxx
  • Password: ********d506
  • Auth: PLAIN, LOGIN and CRAM-MD5
  • TLS: Optional (STARTTLS on all ports)

添加依赖

# File: mix.exs
defp deps do
  [
    ...,
    {:dotenvy, "~> 1.0.1"},
    {:swoosh, "~> 1.18.2"},
    {:gen_smtp, "~> 1.2.0"}
  ]
end

添加邮箱配置

为了不直接将密码写在代码中,我们使用 dotenvy 来从 .env 文件中读取环境变量。

# File: config/runtime.exs
import Dotenvy

if config_env() != :test do
  env_dir_prefix = System.get_env("RELEASE_ROOT") || Path.expand("./")

  source!([
    Path.absname(".env", env_dir_prefix),
    System.get_env()
  ])

  config :oban_demo, ObanDemo.Mailer,
    adapter: Swoosh.Adapters.SMTP,
    relay: env!("EMAIL_HOST", :string!),
    port: env!("EMAIL_PORT", :integer!),
    username: env!("EMAIL_USERNAME", :string!),
    password: env!("EMAIL_PASSWORD", :string!),
    ssl: :if_available,
    tls: :never,
    auth: :always,
    retries: 2,
    no_mx_lookup: false
end

# File: lib/oban_demo/mailer.ex
defmodule ObanDemo.Mailer do
  use Swoosh.Mailer, otp_app: :oban_demo

创建邮件发送

# File: lib/oban_demo/user_email.ex
defmodule ObanDemo.UserEmail do
  import Swoosh.Email

  def welcome(user) do
    new()
    |> to({user.name, user.email})
    |> from({"Oban Demo", "noreply@example.com"})
    |> subject("Hello, Travelers!")
    |> html_body("<h1>Hello #{user.name}</h1>")
    |> text_body("Hello #{user.name}\n")
  end
end

创建 oban job

# File: lib/oban_demo/workers/email_worker.ex
defmodule ObanDemo.Workers.EmailWorker do
  require Logger
  use Oban.Worker, queue: :emails, max_attempts: 3

  alias ObanDemo.Accounts
  alias ObanDemo.Mailer
  alias ObanDemo.UserEmail

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"user_id" => user_id}}) do
    case Accounts.get_user(user_id) do
      nil ->
        {:error, "User #{user_id} not found"}

      user ->
        send_welcome_email(user)
    end
  end

  defp send_welcome_email(user) do
    user
    |> UserEmail.welcome()
    |> Mailer.deliver()
    |> handle_email_delivery()
  end

  defp handle_email_delivery({:ok, _}), do: :ok

  defp handle_email_delivery({:error, reason}) do
    Logger.error("Failed to send email: #{inspect(reason)}")
    {:error, reason}
  end
end

参数说明:

  • queue: 队列名称
  • max_attempts: 最大重试次数

.env 示例

# Mailtrap 测试凭证
EMAIL_HOST=sandbox.smtp.mailtrap.io
EMAIL_PORT=2525
EMAIL_USERNAME=your_username
EMAIL_PASSWORD=your_password

创建 user 时发送欢迎邮件

# File: lib/oban_demo/accounts.ex
def create_user(attrs \\ %{}) do
    %User{}
    |> User.changeset(attrs)
    |> Repo.insert()
    |> case do
      {:ok, user} ->
        %{"user_id" => user.id}
        |> ObanDemo.Workers.EmailWorker.new()
        |> Oban.insert()

        {:ok, user}

      {:error, changeset} ->
        {:error, changeset}
    end
  end

手动测试

# iex -S mix phx.server
ObanDemo.Accounts.create_user(%{name: "test", email: "test@example.com"})

然后,在 mailtrap 中查看邮件是否发送成功。

添加 worker 测试

# File: test/oban_demo/workers/email_worker_test.exs
defmodule ObanDemo.Workers.EmailWorkerTest do
  use ObanDemo.DataCase, async: true
  use Oban.Testing, repo: ObanDemo.Repo

  alias ObanDemo.Workers.EmailWorker

  test "send welcome email" do
    {:ok, %{id: user_id}} =
      ObanDemo.Accounts.create_user(%{name: "test", email: "test@example.com"})

    assert :ok =
             perform_job(EmailWorker, %{"user_id" => user_id})
  end

  test "send welcome email with invalid user" do
    fake_user_id = Ecto.UUID.generate()

    assert {:error, reason} = perform_job(EmailWorker, %{"user_id" => fake_user_id})
    assert reason =~ "User #{fake_user_id} not found"
  end
end

至此,我们就完成了在创建用户时,发送邮件的功能。代码 oban_demo