Tuesday, October 13, 2020

How To Test OpenRC Services with Docker-Compose

Similar to how I abused Docker conceptually to test systemd services with docker-compose, I spent some time recently trying to do the same thing with OpenRC for Alpine Linux.

It basically requires the same steps as systemd. With the base 3.12 Alpine image, it's a matter of:

  1. Install OpenRC
  2. Optionally map /sys/fs/cgroup
  3. Start up with /sbin/init
  4. Run tests via docker exec

1. Install OpenRC

The base Alpine images don't include OpenRC, so you have to install it with apk. I do this in my Dockerfile:

FROM alpine:3.12 RUN apk add openrc CMD ["/sbin/init"]

2. Optionally map /sys/fs/cgroup

Unlike with systemd, I didn't have to set up any tmpfs mounts to get OpenRC services running. I also didn't have to map the /sys/fs/cgroup directory -- but if I didn't, I would get a bunch of cgroup-related error messages when starting and stopping services (although the services themselves still seemed to work fine). So I just went ahead and mapped the dir in my docker-compose.yml to avoid those error messages:

version: '3' services: my_test_container: build: . image: my_test_image volumes: - /sys/fs/cgroup:/sys/fs/cgroup:ro

3. Start up with /sbin/init

With the Alpine openrc package, the traditional /sbin/init startup command works to start OpenRC. I added CMD ["/sbin/init"] to my Dockerfile to start up with it, but you could instead add command: /sbin/init to the service in your docker-compose.yml file.

4. Run tests via docker exec

The above docker-compose.yml and Dockerfile will allow you to start up OpenRC in my_test_container with one command:

docker-compose up -d my_test_container

With OpenRC up and running, you can use a second command to execute a shell on the very same container to test it out:

docker-compose exec my_test_container /bin/sh

Or use exec to run other commands to test the services managed by OpenRC:

docker-compose exec my_test_container rc-status --servicelist

Cleaning up

The clean up steps with OpenRC are also basically the same as with systemd:

  1. Stop the running container: docker-compose stop my_test_container
  2. Remove the saved container state: docker-compose rm my_test_container
  3. Remove the built image: docker image rm my_test_image

Friday, October 2, 2020

Testing Systemd Services on Arch, Fedora, and Friends

Following up on a previous post about how to test systemd services with docker-compose on Ubuntu, I spent some time recently trying to do the same thing with a few other Linux distributions. I was able to get the same tricks to work on these other distributions:

  • Amazon Linux
  • Arch
  • CentOS
  • Debian
  • Fedora
  • openSUSE
  • RHEL

A few of those distros required an additional tweak, however.

One more tmpfs directory for Arch and Fedora

For Arch and Fedora, I had to do one more thing: add /tmp as a tmpfs mount.

So the docker-compose.yml file for those distros should look like this:

version: '3' services: my_test_container: build: . image: my_test_image tmpfs: - /run - /run/lock - /tmp volumes: - /sys/fs/cgroup:/sys/fs/cgroup:ro

Or when running as a regular docker command, start the container like this:

docker run \ --tmpfs /run --tmpfs /run/lock --tmpfs /tmp \ --volume /sys/fs/cgroup:/sys/fs/cgroup:ro \ --detach --rm \ --name my_test_container my_test_image

Different init script location for openSUSE

For openSUSE, the systemd init script is located at /usr/lib/systemd/systemd instead of just /lib/systemd/systemd. So the Dockerfile I used for it looks like this:

FROM opensuse/leap:15 RUN zypper install -y systemd CMD ["/usr/lib/systemd/systemd"]

Monday, September 14, 2020

Elixir Ed25519 Signatures With Enacl

The most-actively supported library for using ed25519 with Elixir currently looks to be enacl. It provides straightforward, idiomatic Erlang bindings for libsodium.

Installing

Installing enacl for a Mix project requires first installing your operating system's libsodium-dev package on your dev & build machines (as well as the regular libsodium package anywhere else you run your project binaries). Then in the mix.exs file of your project, add {:enacl, "~> 1.0.0"} to the deps section of that file; and then run mix deps.get to download the enacl package from Hex.

Keys

In the parlance of libsodium, the "secret key" is the full keypair, the "public key" is the public part of the keypair (the public curve point), and the "seed" is the private part of the keypair (the 256-bit secret). The seed is represented in enacl as a 32-byte binary string, as is the public key; and the secret key is the 64-byte binary concatenation of the seed plus the public key.

You can generate a brand new ed25519 keypair with enacl via the sign_keypair/0 function. After generating, usually you'd want to save the keypair somewhere as a base64- or hex-encoded string:

iex> keypair = :enacl.sign_keypair() %{ public: <<215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, 14, 225, 114, 243, 218, 166, 35, 37, 175, 2, 26, 104, 247, 7, 81, 26>>, secret: <<157, 97, 177, 157, 239, 253, 90, 96, 186, 132, 74, 244, 146, 236, 44, 196, 68, 73, 197, 105, 123, 50, 105, 25, 112, 59, 172, 3, 28, 174, 127, 96, 215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, ...>> } iex> <<seed::binary-size(32), public_key::binary>> = keypair.secret <<157, 97, 177, 157, 239, 253, 90, 96, 186, 132, 74, 244, 146, 236, 44, 196, 68, 73, 197, 105, 123, 50, 105, 25, 112, 59, 172, 3, 28, 174, 127, 96, 215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, 14, 225, ...>> iex> public_key == keypair.public true iex> seed <> public_key == keypair.secret true iex> public_key_base64 = public_key |> Base.encode64() "11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=" iex> public_key_hex = public_key |> Base.encode16(case: :lower) "d75a980182b10ab7d54bfed3c964073a0ee172f3daa62325af021a68f707511a" iex> private_key_base64 = seed |> Base.encode64() "nWGxne/9WmC6hEr0kuwsxERJxWl7MmkZcDusAxyuf2A=" iex> private_key_hex = seed |> Base.encode16(case: :lower) "9d61b19deffd5a60ba844af492ec2cc44449c5697b326919703bac031cae7f60"

You can also reconstitute a keypair from just the private part (the "seed") with the enacle sign_seed_keypair/1 function:

iex> reloaded_keypair = ( ...> "9d61b19deffd5a60ba844af492ec2cc44449c5697b326919703bac031cae7f60" ...> |> Base.decode16!(case: :lower) ...> |> :enacl.sign_seed_keypair() ...>) %{ public: <<215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, 14, 225, 114, 243, 218, 166, 35, 37, 175, 2, 26, 104, 247, 7, 81, 26>>, secret: <<157, 97, 177, 157, 239, 253, 90, 96, 186, 132, 74, 244, 146, 236, 44, 196, 68, 73, 197, 105, 123, 50, 105, 25, 112, 59, 172, 3, 28, 174, 127, 96, 215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, ...>> } iex> reloaded_keypair == keypair true

Signing

Libsodium has a series of functions for signing large documents that won't fit into memory or otherwise have to be split into chunks — but for most cases, the simpler enacl sign/2 or sign_detached/2 functions are what you want to use.

The enacl sign/2 function produces a binary string that combines the original message with the message signature, which the sign_open/2 function can later unpack and verify. This is ideal for preventing misuse, since it makes it harder to just use the message without verifying the signature first.

The enacl sign_detached/2 function produces the message signature as a stand-alone 64-byte binary string — if you need to store or send the signature separately from the message itself, this is the function you'd use. And often when using detached signatures, you will also base64- or hex-encode the resulting signature:

iex> message = "test" "test" iex> signed_message = :enacl.sign(message, keypair.secret) <<143, 152, 176, 38, 66, 39, 246, 31, 9, 107, 120, 221, 227, 176, 240, 13, 25, 1, 236, 254, 16, 80, 94, 65, 71, 57, 6, 144, 122, 82, 53, 107, 233, 83, 26, 215, 109, 77, 1, 219, 7, 67, 77, 72, 147, 94, 245, 81, 222, 80, ...>> iex> signature = :enacl.sign_detached(message, keypair.secret) <<143, 152, 176, 38, 66, 39, 246, 31, 9, 107, 120, 221, 227, 176, 240, 13, 25, 1, 236, 254, 16, 80, 94, 65, 71, 57, 6, 144, 122, 82, 53, 107, 233, 83, 26, 215, 109, 77, 1, 219, 7, 67, 77, 72, 147, 94, 245, 81, 222, 80, ...>> iex> signature <> message == signed_message true iex> signature |> Base.encode64() "j5iwJkIn9h8Ja3jd47DwDRkB7P4QUF5BRzkGkHpSNWvpUxrXbU0B2wdDTUiTXvVR3lBULDNm0/t1DY8GBoxfCA==" iex> signature |> Base.encode16(case: :lower) "8f98b0264227f61f096b78dde3b0f00d1901ecfe10505e41473906907a52356be9531ad76d4d01db07434d48935ef551de50542c3366d3fb750d8f06068c5f08"

Verifying

To verify a signed message (the message combined with the signature), and then access the message itself, you'd use the enacl sign_open/2 function:

iex> unpacked_message = :enacl.sign_open(signed_message, public_key) {:ok, "test"}

If you try to verify the signed message with a different public key (or if the message is otherwise improperly signed or not signed at all), you'll get an error result from the sign_open/2 function:

iex> wrong_public_key = ( ...> "3d4017c3e843895a92b70aa74d1b7ebc9c982ccf2ec4968cc0cd55f12af4660c" ...> |> Base.decode16!(case: :lower) ...> ) <<61, 64, 23, 195, 232, 67, 137, 90, 146, 183, 10, 167, 77, 27, 126, 188, 156, 152, 44, 207, 46, 196, 150, 140, 192, 205, 85, 241, 42, 244, 102, 12>> iex> error_result = :enacl.sign_open(signed_message, wrong_public_key) {:error, :failed_verification}

To verify a message with a detached signature, you need the original message itself (in the same binary form with which it was signed), and the signature (in binary form as well). You pass them both, plus the public key, to the sign_verify_detached/3 function; sign_verify_detached/3 returns true if the signature is legit, and false otherwise:

iex> :enacl.sign_verify_detached(signature, message, public_key) true iex> :enacl.sign_verify_detached(signature, "wrong message", public_key) false iex> :enacl.sign_verify_detached(signature, message, wrong_public_key) false

Full Example

To put it all together, if you have an ed25519 private key, like "nWGxne/9WmC6hEr0kuwsxERJxWl7MmkZcDusAxyuf2A=", and you want to sign a message ("test") that someone else already has in their possession, you'd do the following to produce a stand-alone signature that you can send them:

iex> secret_key = ( ...> "nWGxne/9WmC6hEr0kuwsxERJxWl7MmkZcDusAxyuf2A=" ...> |> Base.decode64!() ...> |> :enacl.sign_seed_keypair() ...> |> Map.get(:secret) ...> ) <<157, 97, 177, 157, 239, 253, 90, 96, 186, 132, 74, 244, 146, 236, 44, 196, 68, 73, 197, 105, 123, 50, 105, 25, 112, 59, 172, 3, 28, 174, 127, 96, 215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, 14, 225, ...>> iex> signature_base64 = ( ...> "test" ...> |> :enacl.sign_detached(secret_key) ...> |> Base.encode64() ...> ) "j5iwJkIn9h8Ja3jd47DwDRkB7P4QUF5BRzkGkHpSNWvpUxrXbU0B2wdDTUiTXvVR3lBULDNm0/t1DY8GBoxfCA=="

And if you're the one given an ed25519 public key ("11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=") and signature ("j5iwJkIn9h8Ja3jd47DwDRkB7P4QUF5BRzkGkHpSNWvpUxrXbU0B2wdDTUiTXvVR3lBULDNm0/t1DY8GBoxfCA=="), with the original message ("test") in hand you can verify the signature like the following:

iex> public_key = ( ...> "11qYAYKxCrfVS/7TyWQHOg7hcvPapiMlrwIaaPcHURo=" ...> |> Base.decode64!() ...> ) <<215, 90, 152, 1, 130, 177, 10, 183, 213, 75, 254, 211, 201, 100, 7, 58, 14, 225, 114, 243, 218, 166, 35, 37, 175, 2, 26, 104, 247, 7, 81, 26>> iex> signature_legitimate? = ( ...> "j5iwJkIn9h8Ja3jd47DwDRkB7P4QUF5BRzkGkHpSNWvpUxrXbU0B2wdDTUiTXvVR3lBULDNm0/t1DY8GBoxfCA==" ...> |> Base.decode64!() ...> |> :enacl.sign_verify_detached("test", public_key) ...> ) true

Friday, August 28, 2020

Postgrex Ecto Types

One thing I found confusing about Postgrex, the excellent PostgreSQL adapter for Elixir, was how to use PostgresSQL-specific data types (cidr, inet, interval, lexeme, range, etc) with Ecto. As far as I can tell, while Postgrex includes structs that can be converted to each type (like Postgrex.INET etc), you still have to write your own Ecto.Type implementation for each type you use in an Ecto schema.

For example, to implement an Ecto schema for a table like the following:

CREATE TABLE hits ( id BIGSERIAL NOT NULL PRIMARY KEY, url TEXT NOT NULL, ip INET NOT NULL, inserted_at TIMESTAMP NOT NULL );

You'd need minimally to create an Ecto type implementation like the following:

# lib/my_app/inet_type.ex defmodule MyApp.InetType do @moduledoc """ `Ecto.Type` implementation for postgres `INET` type. """ use Ecto.Type def type, do: :inet def cast(term), do: {:ok, term} def dump(term), do: {:ok, term} def load(term), do: {:ok, term} end

So that you could then define the Ecto schema with your custom Ecto type:

# lib/my_app/hits/hit.ex defmodule MyApp.Hits.Hit do @moduledoc """ The Hit schema. """ use Ecto.Schema schema "hits" do field :url, :string field :ip, MyApp.InetType timestamps updated_at: false end def changeset(hit, attrs) do hit |> cast(attrs, [:url, :ip]) |> validate_required([:url, :ip]) end end

Note that in your migration code, you use the native database type name (eg inet), not your custom type name:

# priv/repo/migrations/202001010000_create_hits.exs defmodule MyApp.Repo.Migrations.CreateHits do use Ecto.Migration def change do create table(:hits) do add :url, :text, null: false add :ip, :inet, null: false timestamps updated_at: false end end end

A Fancier Type

However, the above basic InetType implementation limits this example Hit schema to working only with Postgrex.INET structs for its ip field — so, while creating a Hit record with the IP address specified via a Postgrex.INET struct works nicely:

iex> ( ...> %MyApp.Hits.Hit{} ...> |> MyApp.Hits.Hit.changeset(%{url: "/", ip: %Postgrex.INET{address: {127, 0, 0, 1}}}) ...> |> MyApp.Repo.insert!() ...> |> Map.get(:ip) ...> ) %Postgrex.INET{address: {127, 0, 0, 1}}

Creating one with the IP address specified as a string (or even a plain tuple like {127, 0, 0, 1}) won't work:

iex> ( iex> %MyApp.Hits.Hit{} ...> |> MyApp.Hits.Hit.changeset(%{url: "/", ip: "127.0.0.1"}}) ...> |> MyApp.Repo.insert!() ...> |> Map.get(:ip) ...> ) ** (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.

This can be solved by implementing a fancier version of the cast function in the MyApp.InetType module, enabling Ecto to cast strings (and tuples) to the Postgrex.INET type. Here's a version of MyApp.InetType that does that, as well as allows the Postgrex.INET struct to be serialized as a string (including when serialized to JSON with the Jason library, or when rendered as part of a Phoenix HTML template):

# lib/my_app/inet_type.ex defmodule MyApp.InetType do @moduledoc """ `Ecto.Type` implementation for postgres `INET` type. """ use Bitwise use Ecto.Type alias Postgrex.INET def type, do: :inet def cast(nil), do: {:ok, nil} def cast(""), do: {:ok, nil} def cast(%INET{address: nil}), do: {:ok, nil} def cast(%INET{} = term), do: {:ok, term} def cast(term) when is_tuple(term), do: {:ok, %INET{address: term}} def cast(term) when is_binary(term) do [addr | mask] = String.split(term, "/", parts: 2) with {:ok, address} <- parse_address(addr), {:ok, number} <- parse_netmask(mask), {:ok, netmask} <- validate_netmask(number, address) do {:ok, %INET{address: address, netmask: netmask}} else message -> {:error, [message: message]} end end def cast(_), do: :error def dump(term), do: {:ok, term} def load(term), do: {:ok, term} defp parse_address(addr) do case :inet.parse_strict_address(String.to_charlist(addr)) do {:ok, address} -> {:ok, address} _ -> "not a valid IP address" end end defp parse_netmask([]), do: {:ok, nil} defp parse_netmask([mask]) do case Integer.parse(mask) do {number, ""} -> {:ok, number} _ -> "not a CIDR netmask" end end defp validate_netmask(nil, _addr), do: {:ok, nil} defp validate_netmask(mask, _addr) when mask < 0 do "CIDR netmask cannot be negative" end defp validate_netmask(mask, addr) when mask > 32 and tuple_size(addr) == 4 do "CIDR netmask cannot be greater than 32" end defp validate_netmask(mask, _addr) when mask > 128 do "CIDR netmask cannot be greater than 128" end defp validate_netmask(mask, addr) do ipv4 = tuple_size(addr) == 4 max = if ipv4, do: 32, else: 128 subnet = if ipv4, do: 8, else: 16 bits = addr |> Tuple.to_list() |> Enum.reverse() |> Enum.with_index() |> Enum.reduce(0, fn {value, index}, acc -> acc + (value <<< (index * subnet)) end) bitmask = ((1 <<< max) - 1) ^^^ ((1 <<< (max - mask)) - 1) if (bits &&& bitmask) == bits do {:ok, mask} else "masked bits of IP address all must be 0s" end end end defimpl String.Chars, for: Postgrex.INET do def to_string(%{address: address, netmask: netmask}) do "#{address_to_string(address)}#{netmask_to_string(netmask)}" end defp address_to_string(nil), do: "" defp address_to_string(address), do: address |> :inet.ntoa() defp netmask_to_string(nil), do: "" defp netmask_to_string(netmask), do: "/#{netmask}" end defimpl Jason.Encoder, for: Postgrex.INET do def encode(term, opts), do: term |> to_string() |> Jason.Encode.string(opts) end defimpl Phoenix.HTML.Safe, for: Postgrex.INET do def to_iodata(term), do: term |> to_string() end

Alternative Canonical Representation

Note that an alternative way of implementing your Ecto type would be to make the dump and load functions round-trip the Postgrex.INET struct to and from some more convenient canonical representation (like a plain string). For example, a MyApp.InetType like the following would allow you to use plain strings to represent IP address values in your schemas (instead of Postgrex.INET structs). It would dump each such string to a Postgrex.INET struct when Ecto attempts to save the value to the database, and load the value from a Postgrex.INET struct into a string when Ecto attempts to load the value from the database:

# lib/my_app/inet_type.ex defmodule MyApp.InetType do @moduledoc """ `Ecto.Type` implementation for postgres `INET` type. """ use Ecto.Type alias Postgrex.INET def type, do: :inet def cast(nil), do: {:ok, ""} def cast(term) when is_tuple(term), do: {:ok, address_to_string(term)} def cast(term) when is_binary(term), do: {:ok, term} def cast(_), do: :error def dump(nil), do: {:ok, nil} def dump(""), do: {:ok, nil} def dump(term) when is_binary(term) do [addr | mask] = String.split(term, "/", parts: 2) with {:ok, address} <- parse_address(addr), {:ok, number} <- parse_netmask(mask), {:ok, netmask} <- validate_netmask(number, address) do {:ok, %INET{address: address, netmask: netmask}} else message -> {:error, [message: message]} end end def dump(_), do: :error def load(nil), do: {:ok, ""} def load(%INET{address: address, netmask: netmask}) do "#{address_to_string(address)}#{netmask_to_string(netmask)}" end def load(_), do: :error defp parse_address(addr) do case :inet.parse_strict_address(String.to_charlist(addr)) do {:ok, address} -> {:ok, address} _ -> "not a valid IP address" end end defp parse_netmask([]), do: {:ok, nil} defp parse_netmask([mask]) do case Integer.parse(mask) do {number, ""} -> {:ok, number} _ -> "not a CIDR netmask" end end defp validate_netmask(nil, _addr), do: {:ok, nil} defp validate_netmask(mask, _addr) when mask < 0 do "CIDR netmask cannot be negative" end defp validate_netmask(mask, addr) when mask > 32 and tuple_size(addr) == 4 do "CIDR netmask cannot be greater than 32" end defp validate_netmask(mask, _addr) when mask > 128 do "CIDR netmask cannot be greater than 128" end defp validate_netmask(mask, addr) do ipv4 = tuple_size(addr) == 4 max = if ipv4, do: 32, else: 128 subnet = if ipv4, do: 8, else: 16 bits = addr |> Tuple.to_list() |> Enum.reverse() |> Enum.with_index() |> Enum.reduce(0, fn {value, index}, acc -> acc + (value <<< (index * subnet)) end) bitmask = ((1 <<< max) - 1) ^^^ ((1 <<< (max - mask)) - 1) if (bits &&& bitmask) == bits do {:ok, mask} else "masked bits of IP address all must be 0s" end end defp address_to_string(nil), do: "" defp address_to_string(address), do: address |> :inet.ntoa() defp netmask_to_string(nil), do: "" defp netmask_to_string(netmask), do: "/#{netmask}" end

Friday, August 14, 2020

Elixir Event Queue

As I'm learning Elixir, I was trying to search for the idiomatic way for building a event queue in Elixir. After a few twists and turns, I found that it's easy, elegant, and pretty well documented — you just need to know what to look for.

There are a number of nifty "job queue" libraries for Elixir (like Honeydew or Oban), but they're directed more toward queueing jobs themselves, rather than enabling a single job to work on a queue of items. What I was looking for was this:

  1. A singleton queue that would have events enqueued from multiple processes (in Elixir-world, this would take the form of an Agent).
  2. A client app running multiple processes that would enqueue events (in my case, a Phoenix app).
  3. A worker process that dequeues a batch of events and processes them (in Elixir-world, this would be a GenServer).

An Example

Following is an example of what I found to be the idiomatic Elixir way of implementing this, with 1) a generic agent that holds a queue as its state (QueueAgent), 2) the bits of a Phoenix app that listens for Phoenix telemetry events and enqueues some data from them onto this queue (RequestListener), and 3) a gen-server worker that dequeues those events and saves them to the DB (RequestSaver). These three components each are started by the Phoenix application's supervisor (4).

1. The Queue Agent

The QueueAgent module holds the state of the queue, as a Qex struct. Qex is a wrapper around the native Erlang/OTP :queue module, adding some Elixir syntactic sugar and implementing the Inpsect, Collectable, and Enumerable protocols.

The QueueAgent module can pretty much just proxy basic Qex calls through the core agent get, update, and get_and_update functions. Each of these functions accepts a function itself, to which current state of the agent (the Qex queue) is passed. The functions accepted by update and get_and_update also return the new state of the agent (the updated Qex queue).

# lib/my_app/queue_agent.ex defmodule MyApp.QueueAgent do @moduledoc """ Agent that holds a queue as its state. """ use Agent @doc """ Starts the agent with the specified options. """ @spec start_link(GenServer.options()) :: Agent.on_start() def start_link(opts \\ []) do Agent.start_link(&Qex.new/0, opts) end @doc """ Returns the length of the queue. """ @spec count(Agent.agent()) :: integer def count(agent) do Agent.get(agent, & &1) |> Enum.count() end @doc """ Enqueues the specified item to the end of the queue. """ @spec push(Agent.agent(), any) :: :ok def push(agent, value) do Agent.update(agent, &Qex.push(&1, value)) end @doc """ Dequeues the first item from the front of the queue, and returns it. If the queue is empty, returns the specified default value. """ @spec pop(Agent.agent(), any) :: any def pop(agent, default \\ nil) do case Agent.get_and_update(agent, &Qex.pop/1) do {:value, value} -> value _ -> default end end @doc """ Takes the specified number of items off the front of the queue, and returns them. If the queue has less than the specified number of items, empties the queue and returns all items. """ @spec split(Agent.agent(), integer) :: Qex.t() def split(agent, max) do Agent.get_and_update(agent, fn queue -> Qex.split(queue, Enum.min([Enum.count(queue), max])) end) end end

2. The Request Listener

The RequestListener module attaches a Telemetry listener (with the arbitrary name "my_app_web_request_listener") to handle one specific event (the "response sent" event from the Phoenix Logger, identified by [:phoenix, :endpoint, :stop]). The listener's handle_event function will be called whenever a response is sent (including error responses), and the response's Plug.Conn struct will be included under the :conn key of the event metadata.

In handling the event, the RequestListener simply enqueues a new map containing the details about the request that I want to save to a named QueueAgent queue. The name can be arbitrary — in this example it's MyApp.Request (a module name that doesn't happen to exist) — what's important is that a QueueAgent with that name has been started (it will be started by the application, later on in step 4), and that the RequestSaver (later on in step 3) will use the same name to dequeue events.

# lib/my_app_web/request_listener.ex defmodule MyAppWeb.RequestListener do @moduledoc """ Listens for request telemetry events, and queues them to be saved. """ require Logger @response_sent [:phoenix, :endpoint, :stop] @events [@response_sent] @doc """ Sets up event listener. """ def setup do :telemetry.attach_many("my_app_web_request_listener", @events, &handle_event/4, nil) end @doc """ Telemetry callback to handle specified event. """ def handle_event(@response_sent, measurement, metadata, _config) do handle_response_sent(measurement, metadata, MyApp.RequestQueue) end @doc """ Handles Phoenix response sent event. """ def handle_response_sent(measurement, metadata, queue_name) do conn = metadata.conn reason = conn.assigns[:reason] MyApp.QueueAgent.push(queue_name, %{ inserted_at: DateTime.utc_now(), ip: conn.remote_ip, request_id: Logger.metadata()[:request_id], controller: conn.private[:phoenix_controller], action: conn.private[:phoenix_action], status: conn.status, method: conn.method, path: conn.request_path, query: conn.query_string, error: if(reason, do: Exception.message(reason)), # nanoseconds duration: measurement.duration }) end end

3. The Request Saver

The RequestSaver module is run as a dedicated process, dequeueing batches of up to 100 events, and saving each batch. When done saving, it will "sleep" for a minute, then try to dequeue some more events. Everything but the do_work, save_next_batch, save_batch, and batch_changeset functions are boilerplate gen-server functionality for running a process periodically.

The do_work function uses the same MyApp.RequestQueue name as the RequestListener to identify the queue, ensuring that both modules use the same QueueAgent instance. The save_next_batch function dequeues up to 100 events and saves them via the save_batch function (and continues working until it has emptied the queue). The save_batch and batch_changeset functions create and commit an Ecto.Multi changeset using the app's MyApp.RequestEvent schema (not included in this example, but as you can imagine, it would include fields for the various properties that the RequestListener extracted from the event metadata).

The handle_info callback is the entry point for the gen-server's processing. It ignores the gen-server's state (it doesn't need to maintain any state itself) — it simply does some work, and then calls schedule_work to schedule itself to be called again in another minute.

# lib/my_app/request_saver.ex defmodule MyApp.RequestSaver do @moduledoc """ Saves queued events to the DB. """ use GenServer @doc """ Starts the server with the specified options. """ def start_link(_opts) do GenServer.start_link(__MODULE__, %{}) end @doc """ GenServer callback to start process. """ @impl true def init(state) do schedule_work() {:ok, state} end @doc """ GenServer callback to handle process messages. """ @impl true def handle_info(:work, state) do do_work() schedule_work() {:noreply, state} end @doc """ Does the next unit of work. """ def do_work do save_next_batch(MyApp.RequestQueue) end @doc """ Pops the next 100 events from the specified queue and saves them. """ def save_next_batch(queue_name) do batch = MyApp.QueueAgent.split(queue_name, 100) if Enum.count(batch) > 0 do save_batch(batch) save_next_batch(queue_name) end end @doc """ Saves the specified list of events in one big transaction. """ def save_batch(batch) do batch_changeset(batch) |> MyApp.Repo.transaction() end @doc """ Creates an Ecto.Multi from the specified list of events. """ def batch_changeset(batch) do batch |> Enum.reduce(Ecto.Multi.new(), fn event, multi -> changeset = MyApp.RequestEvent.changeset(event) Ecto.Multi.insert(multi, {:event, event.request_id}, changeset) end) end defp schedule_work do # in 1 minute Process.send_after(self(), :work, 60 * 1000) end end

4. The Application Supervisor

The above three components are all started in my Phoenix app via the standard Phoenix Application module. On start, it calls the RequestListener setup function, registering the RequestListener to receive Phoenix Telemetry events. Then the RequestSaver gen-server is started as a child process of the app (with no arguments, identified by its own module name); and the QueueAgent agent is also started as a child process — but with a name option, so that it can be identified via the MyApp.RequestQueue name. (Lines added to the boilerplate Phoneix Application module are highlighted in green.)

# lib/my_app/application.ex defmodule MyApp.Application do @moduledoc false use Application def start(_type, _args) do MyAppWeb.RequestListener.setup() children = [ MyApp.Repo, MyAppWeb.Endpoint, MyApp.RequestSaver, {MyApp.QueueAgent, name: MyApp.RequestQueue} ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

However, you usually don't want periodic jobs popping up randomly while you run your unit tests; so I added a little extra logic to avoid starting up the RequestSaver in test mode:

# lib/my_app/application.ex defmodule MyApp.Application do @moduledoc false use Application def start(_type, _args) do MyAppWeb.RequestListener.setup() periodic_jobs = if Mix.env != :test do [MyApp.RequestSaver] else [] end children = [ MyApp.Repo, MyAppWeb.Endpoint, {MyApp.QueueAgent, name: MyApp.RequestQueue} ] ++ periodic_jobs opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

The overall processing flow of this queueing system, then, works like this:

  1. A request is handled by Phoenix, which raises a "response sent" telemetry event.
  2. The RequestListener handle_event function is called by the Phoenix process.
  3. The RequestListener calls the QueueAgent push function to queue the event (which the QueueAgent does within its own internal process).
  4. Once a minute, the RequestSaver process runs the handle_info function, which tries to dequeue the next batch of events via the QueueAgent split function (again with the QueueAgent managing the state update in its own internal process).
  5. The RequestSaver, continuing on in its process, saves any dequeued events to the DB.

Friday, July 31, 2020

WeeChat Light Theme

When I started using WeeChat as my IRC client, I searched around for a light theme to use with it (since I use a light theme for my terminal, and the default WeeChat colors make several UI bits difficult to read with a light terminal theme). I couldn't find much in the way of WeeChat light themes, however, so I set up my own — I went through all the color settings listed in the WeeChat User's Guide, and replaced defaults that showed up as dark-on-dark or light-on-light in my terminal with colors that instead would be dark-on-light or light-on-dark.

These are the settings I ended up modifying to build the light theme I now use (you can look up each setting in the User's Guide for a brief description of each, if you're curious).

/set weechat.bar.status.color_bg gray /set weechat.bar.title.color_bg gray /set weechat.color.chat_buffer black /set weechat.color.chat_channel black /set weechat.color.chat_nick_offline_highlight_bg gray /set weechat.color.chat_nick_self darkgray /set weechat.color.chat_prefix_action darkgray /set weechat.color.status_data_msg lightblue /set weechat.color.status_more lightblue /set weechat.color.status_name darkgray /set weechat.color.status_number lightblue /set buflist.format.buffer_current "${color:,yellow}${format_buffer}" /set buflist.format.hotlist_low "${color:cyan}" /set irc.color.topic_new red /set relay.color.text_selected lightblue

There are a lot more color settings, but these were the only ones I needed to change to fix the dark-on-dark and light-on-light issues (I left the other settings alone). I use InWee to manage my custom WeeChat settings; I store these color settings in a file called colors.txt, and then start up WeeChat and run a command like inwee colors.txt whenever I want to make changes.

Here's a screenshot of what it looks like in my termial's color scheme:

Sunday, February 23, 2020

IPv6 for Private AWS Subnets

Instead of setting up "private" subnets in AWS with the use of moderately-expensive AWS NAT Gateways, I've been experimenting with free Egress-Only Internet Gateways (EIGW). The downside with EIGW is that it's IPv6 only — so you can communicate to the outside world from instances in your private subnet only through IPv6.

In theory, that shouldn't be a problem — it's 2020 and IPv6 just works everywhere, right? Well, actually, in practice it does work pretty well — but there are still a few gotchas. Here are a few hurdles I had to work around when setting up a fleet of Ubuntu Linux EC2 instances with Ansible:

APT Repos

The base AMIs that Ubuntu provides are configured to use the Ubuntu APT repos hosted by AWS (like us-east-1.ec2.archive.ubuntu.com); however, these repos only support IPv4. So the first thing you need to do is change the repos listed in /etc/apt/sources.list to use external repos that support IPv6 (like us.archive.ubuntu.com, or other Ubuntu mirrors you might find).

And since you won't be able to use IPv4 to access the repos, you can speed up APT updates by configuring APT to try only IPv6. To do so, add a file in your /etc/apt/apt.conf.d/ directory (call it something like 99force-ipv6) with the following content:

Acquire::ForceIPv6 "true";

Also don't forget that if you do set up a restrictive Network ACL for your private subnet, you'll need to allow inbound TCP access to the standard Linux ethereal port range (32768-61000) from whatever APT repos you use.

NTP Pools

The NTP pools used by the base AMIs also don't support IPv6. I use the traditional NTP daemon provided by the Ubuntu ntp package, rather the default systemd-tymesyncd service. To configure the NTP daemon, I remove all the default pools from the /etc/ntp.conf file, and instead just use the 2.us.pool.ntp.org pool (the convention with NTP is that for domains that have numbered pools, like 0.us.pool.ntp.org, 1.us.pool.ntp.org, 2.us.pool.ntp.org, etc, the pool numbered 2 is the one that supports IPv6).

Specifically, this is how I configure the 2.us.pool.ntp.org pool in /etc/ntp.conf:

pool -6 2.us.pool.ntp.org iburst minpoll 10 maxpoll 12

The -6 flag means to use IPv6; the iburst part is supposed to help speed up initial synchronization; the minpoll 10 part means to poll no more often than every 2^10 seconds (around 17 minutes); and the maxpoll 12 part means to poll no less often than every 2^12 seconds (around 68 minutes).

Also, if you set up a restrictive Network ACL for your private subnet, you'll need to allow inbound access to UDP port 123.

AWS APIs

If you are planning to directly call AWS APIs (either through the various per-language SDKs, or the CLI), a huge gotcha is that very few AWS services as of yet provide IPv6 endpoints. This means that you won't be able to use most AWS services at all from within your private IPv6 subnet (with the exception of services that consist of instances that themselves reside within your VPCs, like RDS; rather than endpoints hosted outside of your VPCs, like DynamoDB).

The only major AWS service I've tried that does support IPv6 through its APIs is S3. When connecting to it via CLI, you can get it to use IPv6 by explicitly specifying the "dualstack" endpoint via command-line flag, like this:

aws --endpoint-url https://s3.dualstack.us-east-1.amazonaws.com --region us-east-1 s3 ls

Or, alternately, you can enable IPv6 usage via the AWS config file (~/.aws/config), like this:

[default] region = us-east-1 s3 = use_dualstack_endpoint = true addressing_style = virtual

Ansible Inventory

To access EC2 instances in a private subnet, typically you'd use a VPN running in a public subnet of the same (or bridged) VPC, with the VPN client set to route the VPC's private IPv4 block through the VPN. For IPv6, I also have my VPN set to route the VPC's IPv6 block through the VPN, too.

Using Ansible through a VPN with IPv4 is pretty much as simple as configuring Ansible's ec2.ini file to set its destination_variable and vpc_destination_variable settings to private_ip_address. But since I decided to disallow any IPv4 access to my private subnets (even from other subnets within the same VPC), I had to jump through a few extra hoops:

1. Custom Internal Domain Names

I use a custom internal domain name for all my servers (I'll use example.net as the custom domain in the following examples), and assign each server its own domain name (like db1.example.net or mail2.example.net, etc). When I launch a new EC2 server, I create a DNS AAAA record for it (via Route53), pointing the DNS record to the IPv6 address of the newly-launched server. In this way I can use the DNS name to refer to the same server throughout the its lifetime.

I also tag my EC2 instances as soon as I launch them with tags from which the DNS name can be constructed. For example, I'd assign the server with the DNS name of fe3.example.net a "node" tag of fe and a "number" tag of 3.

2. SSH Config

In my SSH config file (~/.ssh/config), I have an entry like the following, to make sure SSH (and Ansible) only tries to access my EC2 instances through IPv6:

Host *.example.net AddressFamily inet6
3. Ansible EC2 Config

With the above two elements in place, I can then enable the destination_format (and destination_format_tags) settings in the Ansible ec2.ini configuration file to direct Ansible to use DNS names instead of IP address for EC2 inventory. With the "node" and "number" tags described above, I can use the following configuration in my ec2.ini file:

destination_format = {0}{1}.example.net destination_format_tags = node,number

When the above is set up correctly, you can run the ec2.py script (eg as ./ec2.py), and see your DNS names in its output (like db1.example.net or mail2.example.net, etc), instead of IPv4 addresses. And when you run an ad-hoc Ansible module (like ansible fe3.example.com -i ec2.py -m setup) everything should "just work".

Wednesday, January 8, 2020

Testing Systemd Services with Docker-Compose

I've been using Docker containers to test out the install process for a project I've been working on, and have found it can be a little tricky to get systemd booted up and running in Docker. Normally running a service manager like systemd within a container would be redundant and unnecessary, but in this case I'm specifically trying to test out systemd service files (and directory paths, and user permissions, etc) that have been set up by my install process.

With the base 18.04 Ubuntu image, there are 4 key steps to getting systemd running and testable in a Docker container:

  1. Install systemd
  2. Map a few key system directories
  3. Start up with /lib/systemd/systemd
  4. Use docker exec to test

1. Install systemd

With the base Ubuntu image, it's as simple as installing the systemd package with apt-get — like this Dockerfile:

FROM ubuntu:18.04 ENV DEBIAN_FRONTEND noninteractive RUN apt-get update && apt-get install -y systemd CMD ["/lib/systemd/systemd"]

2. Map a few key system directories

I found I had to mount /run and /run/lock as tmpfs directories and map /sys/fs/cgroup to my local /sys/fs/cgroup directory. You can do that with this docker-compose.yml file:

version: '3' services: my_test_container: build: . image: my_test_image tmpfs: - /run - /run/lock volumes: - /sys/fs/cgroup:/sys/fs/cgroup:ro

Or, alternately, when using the docker run command, specifying the --tmpfs /run --tmpfs /run/lock --volume /sys/fs/cgroup:/sys/fs/cgroup:ro flags.

3. Start up with /lib/systemd/systemd

I added CMD ["/lib/systemd/systemd"] to my Dockerfile to start the container with /lib/systemd/systemd by default; but you can instead add command: /lib/systemd/systemd to a service in your docker-compose.yml file, or just run /lib/systemd/systemd directly with the docker run command.

4. Use docker exec to test

With the above docker-compose.yml and Dockerfile, you can start up the test container with one command:

docker-compose up -d my_test_container

And then, with systemd running, use a second command to execute a shell on the container to test it out:

docker-compose exec my_test_container bash

Or use exec to run whatever other commands you need to test systemd:

docker-compose exec my_test_container systemctl list-units

Alternately, if you built and ran the above Dockerfile with docker commands instead of docker-compose, you'd use the following command to test the container out:

docker exec -it my_test_container bash

Cleaning up

To clean everything up, stop the container with docker-compose:

docker-compose stop my_test_container

Then remove the container:

docker-compose rm my_test_container

And finally, remove the image:

docker image rm my_test_image

Or execute all 3 clean-up steps at once (as well as removing all other containers/images referenced by your docker-compose.yml file), in a single command:

docker-compose down --rmi all

Without docker-compose

The following docker commands would allow you to build, run, and clean up the above Dockerfile without using docker-compose at all:

docker build --tag my_test_image . docker run \ --tmpfs /run --tmpfs /run/lock \ --volume /sys/fs/cgroup:/sys/fs/cgroup:ro \ --detach --rm \ --name my_test_container my_test_image docker exec --interactive --tty my_test_container bash docker stop my_test_container docker image rm my_test_image