Friday, August 28, 2020

Postgrex Ecto Types

One thing I found confusing about Postgrex, the excellent PostgreSQL adapter for Elixir, was how to use PostgresSQL-specific data types (cidr, inet, interval, lexeme, range, etc) with Ecto. As far as I can tell, while Postgrex includes structs that can be converted to each type (like Postgrex.INET etc), you still have to write your own Ecto.Type implementation for each type you use in an Ecto schema.

For example, to implement an Ecto schema for a table like the following:

CREATE TABLE hits ( id BIGSERIAL NOT NULL PRIMARY KEY, url TEXT NOT NULL, ip INET NOT NULL, inserted_at TIMESTAMP NOT NULL );

You'd need minimally to create an Ecto type implementation like the following:

# lib/my_app/inet_type.ex defmodule MyApp.InetType do @moduledoc """ `Ecto.Type` implementation for postgres `INET` type. """ use Ecto.Type def type, do: :inet def cast(term), do: {:ok, term} def dump(term), do: {:ok, term} def load(term), do: {:ok, term} end

So that you could then define the Ecto schema with your custom Ecto type:

# lib/my_app/hits/hit.ex defmodule MyApp.Hits.Hit do @moduledoc """ The Hit schema. """ use Ecto.Schema schema "hits" do field :url, :string field :ip, MyApp.InetType timestamps updated_at: false end def changeset(hit, attrs) do hit |> cast(attrs, [:url, :ip]) |> validate_required([:url, :ip]) end end

Note that in your migration code, you use the native database type name (eg inet), not your custom type name:

# priv/repo/migrations/202001010000_create_hits.exs defmodule MyApp.Repo.Migrations.CreateHits do use Ecto.Migration def change do create table(:hits) do add :url, :text, null: false add :ip, :inet, null: false timestamps updated_at: false end end end

A Fancier Type

However, the above basic InetType implementation limits this example Hit schema to working only with Postgrex.INET structs for its ip field — so, while creating a Hit record with the IP address specified via a Postgrex.INET struct works nicely:

iex> ( ...> %MyApp.Hits.Hit{} ...> |> MyApp.Hits.Hit.changeset(%{url: "/", ip: %Postgrex.INET{address: {127, 0, 0, 1}}}) ...> |> MyApp.Repo.insert!() ...> |> Map.get(:ip) ...> ) %Postgrex.INET{address: {127, 0, 0, 1}}

Creating one with the IP address specified as a string (or even a plain tuple like {127, 0, 0, 1}) won't work:

iex> ( iex> %MyApp.Hits.Hit{} ...> |> MyApp.Hits.Hit.changeset(%{url: "/", ip: "127.0.0.1"}}) ...> |> MyApp.Repo.insert!() ...> |> Map.get(:ip) ...> ) ** (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.

This can be solved by implementing a fancier version of the cast function in the MyApp.InetType module, enabling Ecto to cast strings (and tuples) to the Postgrex.INET type. Here's a version of MyApp.InetType that does that, as well as allows the Postgrex.INET struct to be serialized as a string (including when serialized to JSON with the Jason library, or when rendered as part of a Phoenix HTML template):

# lib/my_app/inet_type.ex defmodule MyApp.InetType do @moduledoc """ `Ecto.Type` implementation for postgres `INET` type. """ use Bitwise use Ecto.Type alias Postgrex.INET def type, do: :inet def cast(nil), do: {:ok, nil} def cast(""), do: {:ok, nil} def cast(%INET{address: nil}), do: {:ok, nil} def cast(%INET{} = term), do: {:ok, term} def cast(term) when is_tuple(term), do: {:ok, %INET{address: term}} def cast(term) when is_binary(term) do [addr | mask] = String.split(term, "/", parts: 2) with {:ok, address} <- parse_address(addr), {:ok, number} <- parse_netmask(mask), {:ok, netmask} <- validate_netmask(number, address) do {:ok, %INET{address: address, netmask: netmask}} else message -> {:error, [message: message]} end end def cast(_), do: :error def dump(term), do: {:ok, term} def load(term), do: {:ok, term} defp parse_address(addr) do case :inet.parse_strict_address(String.to_charlist(addr)) do {:ok, address} -> {:ok, address} _ -> "not a valid IP address" end end defp parse_netmask([]), do: {:ok, nil} defp parse_netmask([mask]) do case Integer.parse(mask) do {number, ""} -> {:ok, number} _ -> "not a CIDR netmask" end end defp validate_netmask(nil, _addr), do: {:ok, nil} defp validate_netmask(mask, _addr) when mask < 0 do "CIDR netmask cannot be negative" end defp validate_netmask(mask, addr) when mask > 32 and tuple_size(addr) == 4 do "CIDR netmask cannot be greater than 32" end defp validate_netmask(mask, _addr) when mask > 128 do "CIDR netmask cannot be greater than 128" end defp validate_netmask(mask, addr) do ipv4 = tuple_size(addr) == 4 max = if ipv4, do: 32, else: 128 subnet = if ipv4, do: 8, else: 16 bits = addr |> Tuple.to_list() |> Enum.reverse() |> Enum.with_index() |> Enum.reduce(0, fn {value, index}, acc -> acc + (value <<< (index * subnet)) end) bitmask = ((1 <<< max) - 1) ^^^ ((1 <<< (max - mask)) - 1) if (bits &&& bitmask) == bits do {:ok, mask} else "masked bits of IP address all must be 0s" end end end defimpl String.Chars, for: Postgrex.INET do def to_string(%{address: address, netmask: netmask}) do "#{address_to_string(address)}#{netmask_to_string(netmask)}" end defp address_to_string(nil), do: "" defp address_to_string(address), do: address |> :inet.ntoa() defp netmask_to_string(nil), do: "" defp netmask_to_string(netmask), do: "/#{netmask}" end defimpl Jason.Encoder, for: Postgrex.INET do def encode(term, opts), do: term |> to_string() |> Jason.Encode.string(opts) end defimpl Phoenix.HTML.Safe, for: Postgrex.INET do def to_iodata(term), do: term |> to_string() end

Alternative Canonical Representation

Note that an alternative way of implementing your Ecto type would be to make the dump and load functions round-trip the Postgrex.INET struct to and from some more convenient canonical representation (like a plain string). For example, a MyApp.InetType like the following would allow you to use plain strings to represent IP address values in your schemas (instead of Postgrex.INET structs). It would dump each such string to a Postgrex.INET struct when Ecto attempts to save the value to the database, and load the value from a Postgrex.INET struct into a string when Ecto attempts to load the value from the database:

# lib/my_app/inet_type.ex defmodule MyApp.InetType do @moduledoc """ `Ecto.Type` implementation for postgres `INET` type. """ use Ecto.Type alias Postgrex.INET def type, do: :inet def cast(nil), do: {:ok, ""} def cast(term) when is_tuple(term), do: {:ok, address_to_string(term)} def cast(term) when is_binary(term), do: {:ok, term} def cast(_), do: :error def dump(nil), do: {:ok, nil} def dump(""), do: {:ok, nil} def dump(term) when is_binary(term) do [addr | mask] = String.split(term, "/", parts: 2) with {:ok, address} <- parse_address(addr), {:ok, number} <- parse_netmask(mask), {:ok, netmask} <- validate_netmask(number, address) do {:ok, %INET{address: address, netmask: netmask}} else message -> {:error, [message: message]} end end def dump(_), do: :error def load(nil), do: {:ok, ""} def load(%INET{address: address, netmask: netmask}) do "#{address_to_string(address)}#{netmask_to_string(netmask)}" end def load(_), do: :error defp parse_address(addr) do case :inet.parse_strict_address(String.to_charlist(addr)) do {:ok, address} -> {:ok, address} _ -> "not a valid IP address" end end defp parse_netmask([]), do: {:ok, nil} defp parse_netmask([mask]) do case Integer.parse(mask) do {number, ""} -> {:ok, number} _ -> "not a CIDR netmask" end end defp validate_netmask(nil, _addr), do: {:ok, nil} defp validate_netmask(mask, _addr) when mask < 0 do "CIDR netmask cannot be negative" end defp validate_netmask(mask, addr) when mask > 32 and tuple_size(addr) == 4 do "CIDR netmask cannot be greater than 32" end defp validate_netmask(mask, _addr) when mask > 128 do "CIDR netmask cannot be greater than 128" end defp validate_netmask(mask, addr) do ipv4 = tuple_size(addr) == 4 max = if ipv4, do: 32, else: 128 subnet = if ipv4, do: 8, else: 16 bits = addr |> Tuple.to_list() |> Enum.reverse() |> Enum.with_index() |> Enum.reduce(0, fn {value, index}, acc -> acc + (value <<< (index * subnet)) end) bitmask = ((1 <<< max) - 1) ^^^ ((1 <<< (max - mask)) - 1) if (bits &&& bitmask) == bits do {:ok, mask} else "masked bits of IP address all must be 0s" end end defp address_to_string(nil), do: "" defp address_to_string(address), do: address |> :inet.ntoa() defp netmask_to_string(nil), do: "" defp netmask_to_string(netmask), do: "/#{netmask}" end

Friday, August 14, 2020

Elixir Event Queue

As I'm learning Elixir, I was trying to search for the idiomatic way for building a event queue in Elixir. After a few twists and turns, I found that it's easy, elegant, and pretty well documented — you just need to know what to look for.

There are a number of nifty "job queue" libraries for Elixir (like Honeydew or Oban), but they're directed more toward queueing jobs themselves, rather than enabling a single job to work on a queue of items. What I was looking for was this:

  1. A singleton queue that would have events enqueued from multiple processes (in Elixir-world, this would take the form of an Agent).
  2. A client app running multiple processes that would enqueue events (in my case, a Phoenix app).
  3. A worker process that dequeues a batch of events and processes them (in Elixir-world, this would be a GenServer).

An Example

Following is an example of what I found to be the idiomatic Elixir way of implementing this, with 1) a generic agent that holds a queue as its state (QueueAgent), 2) the bits of a Phoenix app that listens for Phoenix telemetry events and enqueues some data from them onto this queue (RequestListener), and 3) a gen-server worker that dequeues those events and saves them to the DB (RequestSaver). These three components each are started by the Phoenix application's supervisor (4).

1. The Queue Agent

The QueueAgent module holds the state of the queue, as a Qex struct. Qex is a wrapper around the native Erlang/OTP :queue module, adding some Elixir syntactic sugar and implementing the Inpsect, Collectable, and Enumerable protocols.

The QueueAgent module can pretty much just proxy basic Qex calls through the core agent get, update, and get_and_update functions. Each of these functions accepts a function itself, to which current state of the agent (the Qex queue) is passed. The functions accepted by update and get_and_update also return the new state of the agent (the updated Qex queue).

# lib/my_app/queue_agent.ex defmodule MyApp.QueueAgent do @moduledoc """ Agent that holds a queue as its state. """ use Agent @doc """ Starts the agent with the specified options. """ @spec start_link(GenServer.options()) :: Agent.on_start() def start_link(opts \\ []) do Agent.start_link(&Qex.new/0, opts) end @doc """ Returns the length of the queue. """ @spec count(Agent.agent()) :: integer def count(agent) do Agent.get(agent, & &1) |> Enum.count() end @doc """ Enqueues the specified item to the end of the queue. """ @spec push(Agent.agent(), any) :: :ok def push(agent, value) do Agent.update(agent, &Qex.push(&1, value)) end @doc """ Dequeues the first item from the front of the queue, and returns it. If the queue is empty, returns the specified default value. """ @spec pop(Agent.agent(), any) :: any def pop(agent, default \\ nil) do case Agent.get_and_update(agent, &Qex.pop/1) do {:value, value} -> value _ -> default end end @doc """ Takes the specified number of items off the front of the queue, and returns them. If the queue has less than the specified number of items, empties the queue and returns all items. """ @spec split(Agent.agent(), integer) :: Qex.t() def split(agent, max) do Agent.get_and_update(agent, fn queue -> Qex.split(queue, Enum.min([Enum.count(queue), max])) end) end end

2. The Request Listener

The RequestListener module attaches a Telemetry listener (with the arbitrary name "my_app_web_request_listener") to handle one specific event (the "response sent" event from the Phoenix Logger, identified by [:phoenix, :endpoint, :stop]). The listener's handle_event function will be called whenever a response is sent (including error responses), and the response's Plug.Conn struct will be included under the :conn key of the event metadata.

In handling the event, the RequestListener simply enqueues a new map containing the details about the request that I want to save to a named QueueAgent queue. The name can be arbitrary — in this example it's MyApp.Request (a module name that doesn't happen to exist) — what's important is that a QueueAgent with that name has been started (it will be started by the application, later on in step 4), and that the RequestSaver (later on in step 3) will use the same name to dequeue events.

# lib/my_app_web/request_listener.ex defmodule MyAppWeb.RequestListener do @moduledoc """ Listens for request telemetry events, and queues them to be saved. """ require Logger @response_sent [:phoenix, :endpoint, :stop] @events [@response_sent] @doc """ Sets up event listener. """ def setup do :telemetry.attach_many("my_app_web_request_listener", @events, &handle_event/4, nil) end @doc """ Telemetry callback to handle specified event. """ def handle_event(@response_sent, measurement, metadata, _config) do handle_response_sent(measurement, metadata, MyApp.RequestQueue) end @doc """ Handles Phoenix response sent event. """ def handle_response_sent(measurement, metadata, queue_name) do conn = metadata.conn reason = conn.assigns[:reason] MyApp.QueueAgent.push(queue_name, %{ inserted_at: DateTime.utc_now(), ip: conn.remote_ip, request_id: Logger.metadata()[:request_id], controller: conn.private[:phoenix_controller], action: conn.private[:phoenix_action], status: conn.status, method: conn.method, path: conn.request_path, query: conn.query_string, error: if(reason, do: Exception.message(reason)), # nanoseconds duration: measurement.duration }) end end

3. The Request Saver

The RequestSaver module is run as a dedicated process, dequeueing batches of up to 100 events, and saving each batch. When done saving, it will "sleep" for a minute, then try to dequeue some more events. Everything but the do_work, save_next_batch, save_batch, and batch_changeset functions are boilerplate gen-server functionality for running a process periodically.

The do_work function uses the same MyApp.RequestQueue name as the RequestListener to identify the queue, ensuring that both modules use the same QueueAgent instance. The save_next_batch function dequeues up to 100 events and saves them via the save_batch function (and continues working until it has emptied the queue). The save_batch and batch_changeset functions create and commit an Ecto.Multi changeset using the app's MyApp.RequestEvent schema (not included in this example, but as you can imagine, it would include fields for the various properties that the RequestListener extracted from the event metadata).

The handle_info callback is the entry point for the gen-server's processing. It ignores the gen-server's state (it doesn't need to maintain any state itself) — it simply does some work, and then calls schedule_work to schedule itself to be called again in another minute.

# lib/my_app/request_saver.ex defmodule MyApp.RequestSaver do @moduledoc """ Saves queued events to the DB. """ use GenServer @doc """ Starts the server with the specified options. """ def start_link(_opts) do GenServer.start_link(__MODULE__, %{}) end @doc """ GenServer callback to start process. """ @impl true def init(state) do schedule_work() {:ok, state} end @doc """ GenServer callback to handle process messages. """ @impl true def handle_info(:work, state) do do_work() schedule_work() {:noreply, state} end @doc """ Does the next unit of work. """ def do_work do save_next_batch(MyApp.RequestQueue) end @doc """ Pops the next 100 events from the specified queue and saves them. """ def save_next_batch(queue_name) do batch = MyApp.QueueAgent.split(queue_name, 100) if Enum.count(batch) > 0 do save_batch(batch) save_next_batch(queue_name) end end @doc """ Saves the specified list of events in one big transaction. """ def save_batch(batch) do batch_changeset(batch) |> MyApp.Repo.transaction() end @doc """ Creates an Ecto.Multi from the specified list of events. """ def batch_changeset(batch) do batch |> Enum.reduce(Ecto.Multi.new(), fn event, multi -> changeset = MyApp.RequestEvent.changeset(event) Ecto.Multi.insert(multi, {:event, event.request_id}, changeset) end) end defp schedule_work do # in 1 minute Process.send_after(self(), :work, 60 * 1000) end end

4. The Application Supervisor

The above three components are all started in my Phoenix app via the standard Phoenix Application module. On start, it calls the RequestListener setup function, registering the RequestListener to receive Phoenix Telemetry events. Then the RequestSaver gen-server is started as a child process of the app (with no arguments, identified by its own module name); and the QueueAgent agent is also started as a child process — but with a name option, so that it can be identified via the MyApp.RequestQueue name. (Lines added to the boilerplate Phoneix Application module are highlighted in green.)

# lib/my_app/application.ex defmodule MyApp.Application do @moduledoc false use Application def start(_type, _args) do MyAppWeb.RequestListener.setup() children = [ MyApp.Repo, MyAppWeb.Endpoint, MyApp.RequestSaver, {MyApp.QueueAgent, name: MyApp.RequestQueue} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

However, you usually don't want periodic jobs popping up randomly while you run your unit tests; so I added a little extra logic to avoid starting up the RequestSaver in test mode:

# lib/my_app/application.ex defmodule MyApp.Application do @moduledoc false use Application def start(_type, _args) do MyAppWeb.RequestListener.setup() periodic_jobs = if Mix.env != :test do [MyApp.RequestSaver] else [] end children = [ MyApp.Repo, MyAppWeb.Endpoint, {MyApp.QueueAgent, name: MyApp.RequestQueue} ] ++ periodic_jobs opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

The overall processing flow of this queueing system, then, works like this:

  1. A request is handled by Phoenix, which raises a "response sent" telemetry event.
  2. The RequestListener handle_event function is called by the Phoenix process.
  3. The RequestListener calls the QueueAgent push function to queue the event (which the QueueAgent does within its own internal process).
  4. Once a minute, the RequestSaver process runs the handle_info function, which tries to dequeue the next batch of events via the QueueAgent split function (again with the QueueAgent managing the state update in its own internal process).
  5. The RequestSaver, continuing on in its process, saves any dequeued events to the DB.