Wednesday, December 30, 2020

Ecto RDS SSL Connection with Certificate Verification

It's nice and easy to connect to an AWS RDS instance with Elixir Ecto over SSL/TLS, as long as you're not worried about verifying the database server's certificate. You just add a ssl: true setting when your configure the Ecto Repo, like this snippet from a config/releases.exs file for a hypothetical "myapp":

# config/releases.exs config :myapp, MyApp.Repo, hostname: System.get_env("DB_HOSTNAME"), database: System.get_env("DB_DATABASE"), username: System.get_env("DB_USERNAME"), password: System.get_env("DB_PASSWORD"), ssl: true

That's probably good enough for most cloud environments; but if you want to defend against a sophisticated attacker eavesdropping on or manipulating the SSL connections between your DB client and the RDS server, you also need to configure your Ecto Repo's ssl_opts setting to verify the server's certificate.

Unfortunately, this is not so straightforward. You need to either write your own certificate verification function (not trivial), or use one supplied by another library — like the ssl_verify_fun.erl library.

To use the :ssl_verify_hostname verification function from the ssl_verify_fun.erl library, first add the library as a dependency to your mix.exs file:

# mix.exs defp deps do [ {:ecto_sql, "~> 3.5"}, {:ssl_verify_fun, ">= 0.0.0"} ] end

Then add the following ssl_opts setting to your Ecto Repo config:

# config/releases.exs check_hostname = String.to_charlist(System.get_env("DB_HOSTNAME")) config :myapp, MyApp.Repo, hostname: System.get_env("DB_HOSTNAME"), database: System.get_env("DB_DATABASE"), username: System.get_env("DB_USERNAME"), password: System.get_env("DB_PASSWORD"), ssl: true, ssl_opts: [ cacertfile: "/etc/ssl/certs/rds-ca-2019-root.pem", server_name_indication: check_hostname, verify: :verify_peer, verify_fun: {&:ssl_verify_hostname.verify_fun/3, [check_hostname: check_hostname]} ]

Note the RDS server hostname (which would be something like my-rds-cluster.cluster-abcd1234efgh.us-east-1.rds.amazonaws.com) needs to be passed to the server_name_indication and check_hostname options as a charlist. The above example also assumes that you have downloaded the root RDS SSL certificate to /etc/ssl/certs/rds-ca-2019-root.pem on your DB client hosts.

I'd also suggest pulling out the generation of ssl_opts into a function, to make it easy to set up multiple repos. This is the way I'd do it with out our hypothetical "myapp" repo: I'd add one environment variable (DB_SSL) to trigger the Ecto ssl setting (with or without verifying the server cert), and another environment variable (DB_SSL_CA_CERT) to specify the path for the cacertfile option (triggering cert verification):

# config/releases.exs make_ssl_opts = fn "", _hostname -> [] cacertfile, hostname -> check_hostname = String.to_charlist(hostname) [ cacertfile: cacertfile, server_name_indication: check_hostname, verify: :verify_peer, verify_fun: {&:ssl_verify_hostname.verify_fun/3, [check_hostname: check_hostname]} ] end db_ssl_ca_cert = System.get_env("DB_SSL_CA_CERT", "") db_ssl = db_ssl_ca_cert != "" or System.get_env("DB_SSL", "") != "" db_hostname = System.get_env("DB_HOSTNAME") config :myapp, MyApp.Repo, hostname: db_hostname, database: System.get_env("DB_DATABASE"), username: System.get_env("DB_USERNAME"), password: System.get_env("DB_PASSWORD"), ssl: db_ssl, ssl_opts: make_ssl_opts.(db_ssl_ca_cert, db_hostname)

With this verification in place, you'd see an error like the following if your DB client tries to connect to a server with a SSL certificate signed by a CA other than the one you configured:

{:tls_alert, {:unknown_ca, 'TLS client: In state certify at ssl_handshake.erl:1950 generated CLIENT ALERT: Fatal - Unknown CA\n'}}

And you'd see an error like the following if the certificate was signed by the expected CA, but for a different hostname:

{bad_cert,unable_to_match_altnames} - {:tls_alert, {:handshake_failure, 'TLS client: In state certify at ssl_handshake.erl:1952 generated CLIENT ALERT: Fatal - Handshake Failure\n {bad_cert,unable_to_match_altnames}'}}

Wednesday, December 16, 2020

Using Logstash to Ingest CloudFront Logs Into Elasticsearch

Elasticsearch can be a good way of monitoring usage of your AWS CloudFront websites. There are some fairly straightforward paths to shipping CloudFront logs to hosted Elasticsearch services like Logz.io or Amazon Elasticsearch. Here's how to do it with your own self-hosted Elasticsearch and Logstash instances:

  1. Set up CloudFront logging
  2. Set up SQS notifications
  3. Set up test Logstash pipeline
  4. Set up main Logstash pipeline
  5. View logs in Kibana

Set up CloudFront logging

First, you need an S3 bucket to store your CloudFront logs. You can use an existing bucket, or create a new one. You don't need to set up any special permissions for the bucket — but you probably will want to make sure the bucket denies public access to its content by default. In this example, we'll use an S3 bucket for logs called my-log-bucket, and we'll store our CloudFront logs under a directory of the bucket called my-cloudfront-logs. Also, we'll store each CloudFront distribution's logs in their own subdirectory of that directory; so for the distribution serving the www.example.com domain, we'll store the distributions logs under the my-cloudfront-logs/www.example.com subdirectory.

With the S3 logging bucket created and available, update each of your CloudFront distributions to log to it. You can do this via the AWS console by editing the distribution, turning the "Standard Logging" setting on, setting the "S3 Bucket for Logs" to your S3 logging bucket (my-log-bucket.s3.amazonaws.com), and setting the "Log Prefix" to the directory path of the subdirectory of the S3 bucket under which you'll store the logs (my-cloudfront-logs/www.example.com/). Save your changes, and every few minutes CloudFront will save a new .gz file to the my-cloudfront-logs/www.example.com/ subdirectory of the my-log-bucket (see the CloudFront access logs docs for details).

Set up SQS notifications

Next, create a new SQS queue. We'll call ours my-cloudfront-log-notifications, and we'll create it in the us-east-1 AWS region. When you create the queue, configure its "Receive message wait time" setting to 10 seconds or so; this will ensure the SQS client doesn't make way more SQS requests than needed (a setting of 10 seconds should keep the cost of this queue down to less than $1/month).

The only other thing special you need to do when you create the queue is add an access policy to it that allows S3 to send messages to it. The policy should look like this (replace my-cloudfront-log-notifications with the name of your queue, us-east-1 with your queue's region, my-log-bucket with the name of your log bucket, and 123456789012 with your AWS account ID):

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SQS:SendMessage", "Resource": "arn:aws:sqs:us-east-1:123456789012:my-cloudfront-log-notifications", "Condition": { "StringEquals": { "aws:SourceAccount": "123456789012" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:my-log-bucket" } } } ] }

With the SQS queue created, update the S3 bucket to send all object-create events to the queue. You can do this via the AWS console by selecting the bucket and opening the "Events" block in the "Advanced Settings" section of the "Properties" tab of the bucket. There you can add a notification; name it my-cloudfront-log-configuration, check the "All object create events" checkbox, set the "Prefix" to my-cloudfront-logs/, and send it to your SQS queue my-cloudfront-log-notifications.

Alternately, you can add a notification with the same settings as above via the put-bucket-notification-configuration command of the s3api CLI, using a notification-configuration JSON file like the following:

{ "QueueConfigurations": [ { "Id": "my-cloudfront-log-configuration", "QueueArn": "arn:aws:sqs:us-east-1:123456789012:my-cloudfront-log-notifications", "Events": [ "s3:ObjectCreated:*" ], "Filter": { "Key": { "FilterRules": [ { "Name": "prefix", "Value": "my-cloudfront-logs/" } ] } } } ] }

Now that you've hooked up S3 bucket notifications to the SQS queue, if you look in the AWS console for the SQS queue, under the Monitoring tab's charts you'll start to see messages received every few minutes.

Set up test Logstash pipeline

Download a sample .gz log file from your S3 logging bucket, and copy it over to the machine you have Logstash running on. Move the file to a directory that Logstash can access, and make sure it has read permissions on the file. Our sample file will live at /var/log/my-cloudfront-logs/www.example.com/E123456789ABCD.2020-01-02-03.abcd1234.gz.

Copy the following my-cloudfront-pipeline.conf file into the /etc/logstash/conf.d directory on your Logstash machine (replacing the input path with your sample .gz log file), tail the Logstash logs (journalctl -u logstash -f if managed with systemd), and restart the Logstash service (sudo systemctl restart logstash):

# /etc/logstash/conf.d/my-cloudfront-pipeline.conf input { file { file_completed_action => "log" file_completed_log_path => "/var/lib/logstash/cloudfront-completed.log" mode => "read" path => "/var/log/my-cloudfront-logs/www.example.com/E123456789ABCD.2020-01-02-03.abcd1234.gz" sincedb_path => "/var/lib/logstash/cloudfront-since.db" type => "cloudfront" } } filter { if [type] == "cloudfront" { if (("#Version: 1.0" in [message]) or ("#Fields: date" in [message])) { drop {} } mutate { rename => { "type" => "[@metadata][type]" } # strip dashes that indicate empty fields gsub => ["message", "\t-(?=\t)", " "] # literal tab } #Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end csv { separator => " " # literal tab columns => [ "date", "time", "x_edge_location", "sc_bytes", "c_ip", "cs_method", "cs_host", "cs_uri_stem", "sc_status", "cs_referer", "cs_user_agent", "cs_uri_query", "cs_cookie", "x_edge_result_type", "x_edge_request_id", "x_host_header", "cs_protocol", "cs_bytes", "time_taken", "x_forwarded_for", "ssl_protocol", "ssl_cipher", "x_edge_response_result_type", "cs_protocol_version", "fle_status", "fle_encrypted_fields", "c_port", "time_to_first_byte", "x_edge_detailed_result_type", "sc_content_type", "sc_content_len", "sc_range_start", "sc_range_end" ] convert => { "c_port" => "integer" "cs_bytes" => "integer" "sc_bytes" => "integer" "sc_content_len" => "integer" "sc_range_end" => "integer" "sc_range_start" => "integer" "sc_status" => "integer" "time_taken" => "float" "time_to_first_byte" => "float" } add_field => { "datetime" => "%{date} %{time}" "[@metadata][document_id]" => "%{x_edge_request_id}" } remove_field => ["cloudfront_fields", "cloudfront_version", "message"] } # parse datetime date { match => ["datetime", "yy-MM-dd HH:mm:ss"] remove_field => ["datetime", "date", "time"] } # lookup geolocation of client ip address geoip { source => "c_ip" target => "geo" } # parse user-agent into subfields urldecode { field => "cs_user_agent" } useragent { source => "cs_user_agent" target => "ua" add_field => { "user_agent.name" => "%{[ua][name]}" "user_agent.version" => "%{[ua][major]}" "user_agent.device.name" => "%{[ua][device]}" "user_agent.os.name" => "%{[ua][os_name]}" "user_agent.os.version" => "%{[ua][os_major]}" } remove_field => ["cs_user_agent", "ua"] } # pull logfile path from s3 metadata, if present if [@metadata][s3][object_key] { mutate { add_field => { "path" => "%{[@metadata][s3][object_key]}" } } } # strip directory path from logfile path, and canonicalize field name mutate { rename => { "path" => "log.file.path" } gsub => ["log.file.path", ".*/", ""] remove_field => "host" } # canonicalize field names, and drop unwanted fields mutate { rename => { "c_ip" => "client.ip" "cs_bytes" => "http.request.bytes" "sc_content_len" => "http.response.body.bytes" "sc_content_type" => "http.response.body.type" "cs_method" => "http.request.method" "cs_protocol" => "url.scheme" "cs_protocol_version" => "http.version" "cs_referer" => "http.request.referrer" "cs_uri_query" => "url.query" "cs_uri_stem" => "url.path" "sc_bytes" => "http.response.bytes" "sc_status" => "http.response.status_code" "ssl_cipher" => "tls.cipher" "ssl_protocol" => "tls.protocol_version" "x_host_header" => "url.domain" } gsub => [ "http.version", "HTTP/", "", "tls.protocol_version", "TLSv", "" ] remove_field => [ "c_port", "cs_cookie", "cs_host", "fle_encrypted_fields", "fle_status", "sc_range_end", "sc_range_start", "x_forwarded_for" ] } } } output { stdout { codec => "rubydebug" } }

You should see a bunch of entries in the Logstash logs like the following, one for each entry from your sample log file (note the fields will appear in a different order every time you run this):

Jan 02 03:04:05 logs1 logstash[12345]: { Jan 02 03:04:05 logs1 logstash[12345]: "x_edge_detailed_result_type" => "Hit", Jan 02 03:04:05 logs1 logstash[12345]: "@timestamp" => 2020-01-02T03:01:02.000Z, Jan 02 03:04:05 logs1 logstash[12345]: "user_agent.device.name" => "EML-AL00", Jan 02 03:04:05 logs1 logstash[12345]: "time_taken" => 0.001, Jan 02 03:04:05 logs1 logstash[12345]: "http.version" => "2.0", Jan 02 03:04:05 logs1 logstash[12345]: "user_agent.os.version" => "8", Jan 02 03:04:05 logs1 logstash[12345]: "http.response.body.bytes" => nil, Jan 02 03:04:05 logs1 logstash[12345]: "tls.cipher" => "ECDHE-RSA-AES128-GCM-SHA256", Jan 02 03:04:05 logs1 logstash[12345]: "http.response.bytes" => 2318, Jan 02 03:04:05 logs1 logstash[12345]: "@version" => "1", Jan 02 03:04:05 logs1 logstash[12345]: "time_to_first_byte" => 0.001, Jan 02 03:04:05 logs1 logstash[12345]: "http.request.method" => "GET", Jan 02 03:04:05 logs1 logstash[12345]: "x_edge_request_id" => "s7lmJasUXiAm7w2oR34Gfg5zTgeQSTkYwiYV1pnz5Hzv8mRmBzyGrw==", Jan 02 03:04:05 logs1 logstash[12345]: "log.file.path" => "EML9FBPJY2494.2020-01-02-03.abcd1234.gz", Jan 02 03:04:05 logs1 logstash[12345]: "x_edge_result_type" => "Hit", Jan 02 03:04:05 logs1 logstash[12345]: "http.request.bytes" => 388, Jan 02 03:04:05 logs1 logstash[12345]: "http.request.referrer" => "http://baidu.com/", Jan 02 03:04:05 logs1 logstash[12345]: "client.ip" => "192.0.2.0", Jan 02 03:04:05 logs1 logstash[12345]: "user_agent.name" => "UC Browser", Jan 02 03:04:05 logs1 logstash[12345]: "user_agent.version" => "11", Jan 02 03:04:05 logs1 logstash[12345]: "url.query" => nil, Jan 02 03:04:05 logs1 logstash[12345]: "http.response.body.type" => "text/html", Jan 02 03:04:05 logs1 logstash[12345]: "url.domain" => "www.example.com", Jan 02 03:04:05 logs1 logstash[12345]: "x_edge_location" => "LAX50-C3", Jan 02 03:04:05 logs1 logstash[12345]: "http.response.status_code" => 200, Jan 02 03:04:05 logs1 logstash[12345]: "geo" => { Jan 02 03:04:05 logs1 logstash[12345]: "ip" => "192.0.2.0", Jan 02 03:04:05 logs1 logstash[12345]: "region_name" => "Shanghai", Jan 02 03:04:05 logs1 logstash[12345]: "country_name" => "China", Jan 02 03:04:05 logs1 logstash[12345]: "timezone" => "Asia/Shanghai", Jan 02 03:04:05 logs1 logstash[12345]: "longitude" => 121.4012, Jan 02 03:04:05 logs1 logstash[12345]: "country_code3" => "CN", Jan 02 03:04:05 logs1 logstash[12345]: "location" => { Jan 02 03:04:05 logs1 logstash[12345]: "lon" => 121.4012, Jan 02 03:04:05 logs1 logstash[12345]: "lat" => 31.0449 Jan 02 03:04:05 logs1 logstash[12345]: }, Jan 02 03:04:05 logs1 logstash[12345]: "region_code" => "SH", Jan 02 03:04:05 logs1 logstash[12345]: "country_code2" => "CN", Jan 02 03:04:05 logs1 logstash[12345]: "continent_code" => "AS", Jan 02 03:04:05 logs1 logstash[12345]: "latitude" => 31.0449 Jan 02 03:04:05 logs1 logstash[12345]: }, Jan 02 03:04:05 logs1 logstash[12345]: "url.scheme" => "https", Jan 02 03:04:05 logs1 logstash[12345]: "tls.protocol_version" => "1.2", Jan 02 03:04:05 logs1 logstash[12345]: "user_agent.os.name" => "Android", Jan 02 03:04:05 logs1 logstash[12345]: "x_edge_response_result_type" => "Hit", Jan 02 03:04:05 logs1 logstash[12345]: "url.path" => "/" Jan 02 03:04:05 logs1 logstash[12345]: }

These entries show you what Logstash will push to Elasticsearch, once you hook it up. You can adjust this my-cloudfront-pipeline.conf file and restart Logstash again and again until you get the exact field names and values that you want to push to Elasticsearch.

Let's look at each part of the pipeline individually.

In the input section, we're using the file input to read just our one sample file:

input { file { file_completed_action => "log" file_completed_log_path => "/var/lib/logstash/cloudfront-completed.log" mode => "read" path => "/var/log/my-cloudfront-logs/www.example.com/E123456789ABCD.2020-01-02-03.abcd1234.gz" sincedb_path => "/var/lib/logstash/cloudfront-since.db" type => "cloudfront" } }

The key bit here is that we set the type field to cloudfront, which we'll use in the filter section below to apply our filtering logic only to entries of this type. If you're only going to process CloudFront log files in this pipeline, you can omit all the bits of the pipeline that deal with "type", which would simplify it some.

In the filter section, the first step is to check if the type field was set to "cloudfront", and only execute the rest of the filter block if so:

filter { if [type] == "cloudfront" {

Then the next step in filter section is to drop the two header lines in each CloudFront log file, the first beginning with #Version, and the second beginning with #Fields:

if (("#Version: 1.0" in [message]) or ("#Fields: date" in [message])) { drop {} }

After that, the next step renames the type field to [@metadata][type], so that it won't be pushed to the Elasticsearch index. I've opted to use Elasticsearch indexes that are for my CloudFront logs only; however, if you want to push your CloudFront logs into indexes that are shared with other data, you may want to keep the type field.

mutate { rename => { "type" => "[@metadata][type]" }

The second half of this mutate filter strips out the - characters that indicate empty field values from all the columns in the log entry. Note that the last argument of this gsub function is a literal tab character — make sure your text editor does not convert it to spaces!

# strip dashes that indicate empty fields gsub => ["message", "\t-(?=\t)", " "] # literal tab }

For example, it will convert a entry like this:

2020-01-02 03:03:03 HIO50-C1 6564 192.0.2.0 GET d2c4n4ttot8c65.cloudfront.net / 200 - Mozilla/5.0%20(Windows%20NT%206.1;%20WOW64;%20rv:40.0)%20Gecko/20100101%20Firefox/40.1 - - Miss nY0knXse4vDxS5uOBe3YAhDpH809bqhsILUUFAtE_4ZLlfXCiYcD0A== www.example.com https 170 0.164 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Miss HTTP/1.1 - - 62684 0.164 Miss text/html 6111 - -

Into this (removing the dashes that indicate empty values, but not the dashes in non-empty values like the date or ciphersuite):

2020-01-02 03:03:03 HIO50-C1 6564 192.0.2.0 GET d2c4n4ttot8c65.cloudfront.net / 200 Mozilla/5.0%20(Windows%20NT%206.1;%20WOW64;%20rv:40.0)%20Gecko/20100101%20Firefox/40.1 Miss nY0knXse4vDxS5uOBe3YAhDpH809bqhsILUUFAtE_4ZLlfXCiYcD0A== www.example.com https 170 0.164 TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Miss HTTP/1.1 62684 0.164 Miss text/html 6111

The next step is the meat of the process, using the csv filter to convert each tab-separated log line into named fields. Note that the separator property value is also a literal tab character:

#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end csv { separator => " " # literal tab columns => [ "date", "time", "x_edge_location", "sc_bytes", "c_ip", "cs_method", "cs_host", "cs_uri_stem", "sc_status", "cs_referer", "cs_user_agent", "cs_uri_query", "cs_cookie", "x_edge_result_type", "x_edge_request_id", "x_host_header", "cs_protocol", "cs_bytes", "time_taken", "x_forwarded_for", "ssl_protocol", "ssl_cipher", "x_edge_response_result_type", "cs_protocol_version", "fle_status", "fle_encrypted_fields", "c_port", "time_to_first_byte", "x_edge_detailed_result_type", "sc_content_type", "sc_content_len", "sc_range_start", "sc_range_end" ] }

The columns property lists out each field name, in order. Later on in this pipeline, we'll rename many of these fields to use the ECS nomenclature, but this step uses the field names as defined by CloudFront, for clarity.

The middle part of the csv filter converts the numeric fields to actual numbers, via the convert property mapping:

convert => { "c_port" => "integer" "cs_bytes" => "integer" "sc_bytes" => "integer" "sc_content_len" => "integer" "sc_range_end" => "integer" "sc_range_start" => "integer" "sc_status" => "integer" "time_taken" => "float" "time_to_first_byte" => "float" }

The add_field part of the csv filter combines the individual date and time fields into a combined datetime field (to be converted to a timestamp object later); and also copies the x_edge_request_id field value as the [@metadata][document_id] field:

add_field => { "datetime" => "%{date} %{time}" "[@metadata][document_id]" => "%{x_edge_request_id}" }

The [@metadata][document_id] field will be used later on when we push the record to Elasticsearch (to be used as the record's ID). Like with the [@metadata][type] field, this is another case where if you're only going to process CloudFront log files in this pipeline, you could omit this extra metadata field, and just use the x_edge_request_id directly when configuring the Elasticsearch record ID.

The final part of the csv filter removes some fields that are redundant once the log entry has been parsed: message (the full log entry text itself), and cloudfront_fields and cloudfront_version (which the s3snssqs input we'll add later automatically includes):

remove_field => ["cloudfront_fields", "cloudfront_version", "message"] }

The next filter step is to convert the datetime field (created from the date and time fields above) into a proper datetime object:

# parse datetime date { match => ["datetime", "yy-MM-dd HH:mm:ss"] remove_field => ["datetime", "date", "time"] }

This sets the datetime as the value of the @timestamp field. We'll also remove the datetime, date, and time fields, since we won't need them now that we have the parsed datetime in the @timestamp field.

The next filter uses the client IP address to lookup a probable physical location for the client:

# lookup geolocation of client ip address geoip { source => "c_ip" target => "geo" }

This creates a geo field with a bunch of subfields (like [geo][country_name], [geo][city_name], etc) containing the probable location details. Note that many IP address won't have a mapping value for many of the subfields; see the Geoip filter docs for more details.

The next filter decodes the user-agent field, and the filter after that parses it. The useragent filter parses the cs_user_agent field into the ua field, which, like the geo field, will contain a bunch of subfields. We'll pull out a few of those subfields, and add fields with ECS names for them:

# parse user-agent into subfields urldecode { field => "cs_user_agent" } useragent { source => "cs_user_agent" target => "ua" add_field => { "user_agent.name" => "%{[ua][name]}" "user_agent.version" => "%{[ua][major]}" "user_agent.device.name" => "%{[ua][device]}" "user_agent.os.name" => "%{[ua][os_name]}" "user_agent.os.version" => "%{[ua][os_major]}" } remove_field => ["cs_user_agent", "ua"] }

Since the user-agent info we want are now in those newly added user_agent.* fields, the last part of the useragent filter removes the cs_user_agent field and intermediate ua field.

When using the file input, like we are while testing this pipeline, the file input will add a path field to each record, containing the path to the file its reading. Later on, when we use the s3snssqs input, the s3snssqs input will pass the same path as the [@metadata][s3][object_key] field. So that we can access this value uniformly, regardless of which input we used, we have this next filter step, where if the [@metadata][s3][object_key] field is present, we set the path field to the [@metadata][s3][object_key] field's value:

# pull logfile path from s3 metadata, if present if [@metadata][s3][object_key] { mutate { add_field => { "path" => "%{[@metadata][s3][object_key]}" } } }

With the path field now containing the file path, regardless of input, we use the next filter to chop the path down to just the log file name (like E123456789ABCD.2020-01-02-03.abcd1234.gz):

# strip directory path from logfile path, and canonicalize field name mutate { rename => { "path" => "log.file.path" } gsub => ["log.file.path", ".*/", ""] remove_field => "host" }

We also have the filter rename the path field to log.file.path (the canonical ECS name for it); and have the filter remove the host field (added by the file input along with the path field, based on the host Logstash is running on — which we don't really care to have as part of our log record in Elasticsearch).

The last filter in our pipeline renames all CloudFront fields that have equivalent ECS (Elastic Common Schema) field names:

# canonicalize field names, and drop unwanted fields mutate { rename => { "c_ip" => "client.ip" "cs_bytes" => "http.request.bytes" "sc_content_len" => "http.response.body.bytes" "sc_content_type" => "http.response.body.type" "cs_method" => "http.request.method" "cs_protocol" => "url.scheme" "cs_protocol_version" => "http.version" "cs_referer" => "http.request.referrer" "cs_uri_query" => "url.query" "cs_uri_stem" => "url.path" "sc_bytes" => "http.response.bytes" "sc_status" => "http.response.status_code" "ssl_cipher" => "tls.cipher" "ssl_protocol" => "tls.protocol_version" "x_host_header" => "url.domain" }

To match the ECS field specs, the middle part of the filter removes the HTTP/ prefix from the http.version field values (converting values like HTTP/2.0 to just 2.0); and removes the TLSv prefix from the tls.protocol_version field values (converting values like TLSv1.2 to just 1.2):

gsub => [ "http.version", "HTTP/", "", "tls.protocol_version", "TLSv", "" ]

And finally, the last part of the filter removes miscellaneous CloudFront fields that we don't care about:

remove_field => [ "c_port", "cs_cookie", "cs_host", "fle_encrypted_fields", "fle_status", "sc_range_end", "sc_range_start", "x_forwarded_for" ] } } }

The output section of the pipeline simply outputs each log record to Logstash's own log output — which is what you see when you tail Logstash's logs:

output { stdout { codec => "rubydebug" } }

Set up main Logstash pipeline

Once you have this test pipeline working to your satisfaction, it's time to change the output section of the pipeline to push the output to Elasticsearch. Replace the output block of the /etc/logstash/conf.d/my-cloudfront-pipeline.conf file with this block (substituting your own host, user, and password settings, as well as any custom SSL settings you need — see the Elasticsearch output plugin docs for details):

output { # don't try to index anything that didn't get a document_id if [@metadata][document_id] { elasticsearch { hosts => ["https://elasticsearch.example.com:9243"] user => "elastic" password => "password123" document_id => "%{[@metadata][document_id]}" ecs_compatibility => "v1" index => "ecs-logstash-%{[@metadata][type]}-%{+YYYY.MM.dd}" } } }

This following line in this block serves as one more guard to avoid indexing anything that didn't get parsed properly (you may want to send such log entries to a dedicated errors index, to keep an eye on entries that failed to parse):

if [@metadata][document_id] {

And this line uses the [@metadata][document_id] field to set the record ID for each entry (recall in the pipeline filters, we copied the value of the CloudFront x_edge_request_id, which should be unique for each request, to the [@metadata][document_id] field):

document_id => "%{[@metadata][document_id]}"

And since our output block includes setting ecs_compatibility to v1, which directs Logstash to use ECS-compatible index templates, this line directs Logstash to create a separate index for each day and type of log entry we process:

index => "ecs-logstash-%{[@metadata][type]}-%{+YYYY.MM.dd}"

For example, Logstash will create an index named ecs-logstash-cloudfront-2020.01.02 if we process a CloudFront log entry for January 2, 2020 (or use the existing index with that name, if it already exists).

Restart Logstash once you change the output block. In Logstash's own log output, you should see entries indicating succesful connections to your Elasticsearch host, as well as a ginormous entry for the index template it installs in Elasticsearch. Once you see that, check your Elasticsearch instance — you should see a new ecs-logstash-cloudfront-YYYY.MM.DD index created, with entries from your sample CloudFront log file.

You can use this same mechanism to backfill your existing CloudFront log files to Elastic search — manually download the log files to backfill to your Logstash machine (like via the sync command of the s3 CLI), and customize the file input block's path property (with wildcards) to direct Logstash to read them in.

For future CloudFront log files, however, we're going to make one more change to our pipeline, and use the S3 via SNS/SQS input (aka s3snssqs) to pull CloudFront log files from S3 as soon as CloudFront publishes them.

First, create a new IAM policy for your Logstash machine to use that will allow it to both read from your logging bucket, and to read and delete items from the SQS queue we set up above. The policy should look like this (change the Resource elements to point to your own S3 log bucket and SQS log queue, set up in the first two sections of this article):

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::my-log-bucket" }, { "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::my-log-bucket/my-cloudfront-logs/*" }, { "Effect": "Allow", "Action": [ "sqs:Get*", "sqs:List*", "sqs:ReceiveMessage", "sqs:ChangeMessageVisibility", "sqs:DeleteMessage" ], "Resource": [ "arn:aws:sqs:us-east-1:123456789012:my-cloudfront-log-notifications" ] } ] }

Then install the logstash-input-s3-sns-sqs plugin on your Logstash machine:

cd /usr/share/logstash sudo -u logstash bin/logstash-plugin install logstash-input-s3-sns-sqs

Then update the input section of your pipeline to be the following (substituting your own SQS queue name and its AWS region):

input { # pull new logfiles from s3 when notified s3snssqs { region => "us-east-1" queue => "my-cloudfront-log-notifications" from_sns => false type => "cloudfront" } }

If you're running the Logstash machine in AWS, you can use the usual EC2 instance profiles or IAM roles for tasks to grant the machine access to the policy you created above. Otherwise, you'll need to add some AWS credential settings to the s3snssqs input as well; consult the S3 input plugins docs for options (the s3snssqs input allows for the same AWS credential options as the s3 input does, but the s3 input has better documentation for them).

Now restart Logstash. You should see the same output in Logstash's own log as before; but if you check Elasticsearch, you should see new records being added.

View logs in Kibana

Eventually you'll want to create fancy dashboards in Kibana for your new CloudFront data; but for now we'll just get started by setting up a listing where you can view them in the "Discover" section of Kibana.

First log into Kibana, and navigate to the "Management" > "Stack Management" section of Kibana. Within the "Stack Management" section, if you navigate to the "Data" > "Index management" subsection, you should see a bunch of new indexes named in the form of ecs-logstash-cloudfront-YYYY.MM.DD (like ecs-logstash-cloudfront-2020.01.01 and so on):

Once you've verified Kibana is seeing the indexes, navigate to the "Kibana" > "Index Patterns" subsection, and click the "Create index pattern" button. Specify ecs-logstash-cloudfront-* as the pattern, and select @timestamp as the time field:

With the new index pattern created, navigate out of the "Stack Management" section of Kibana into the main "Kibana" > "Discover" section. This will show your most recent "Discover" search. On the left side of the page, change the selected index pattern to the pattern you just created (ecs-logstash-cloudfront-*). You should now see your most recent CloudFront entries listed (if not, use the time window selector in the top right of the page to expand the time window to include a range you know should include some entries). You can use this page to create a list with custom columns and custom filter settings for your CloudFront logs:

Friday, December 4, 2020

Building a Logstash Offline Plugin Pack with Docker

If you run a Logstash node in an environment where it doesn't have access to the public Internet, and need to install some extra plugins, you have to build an "offline plugin pack" (a zip containing the plugins and their dependencies) on a machine that does have public Internet access. You can then copy the pack to your Logstash node, and install the plugins from it directly.

Here's a quick little script I whipped up to build the offline plugin pack using the official Logstash docker container:

#!/bin/sh -e logstash_version=7.10.0 logstash_plugins=$(echo ' logstash-codec-cloudfront logstash-input-s3-sns-sqs ' | xargs) echo " bin/logstash-plugin install $logstash_plugins bin/logstash-plugin prepare-offline-pack \ --output /srv/logstash/logstash-plugins.zip \ $logstash_plugins " | docker run -i -u $(id -u) -v $(pwd):/srv/logstash --rm \ docker.elastic.co/logstash/logstash:$logstash_version /bin/sh

Set the script's logstash_version variable to the version of Logstash you're using, and set the (whitespace-separated) list of plugins in the logstash_plugins variable to the plugins you need. Run the script, and it will output a logstash-plugins.zip into your working directory.

You can then copy the logstash-plugins.zip file to your Logstash node (for example, to the /usr/share/logstash directory of the machine), and install the contained plugins like this:

cd /usr/share/logstash sudo -u logstash bin/logstash-plugin install file://logstash-plugins.zip

Make sure you run the logstash-plugin command as the same user you use to run Logstash itself (typically the logstash user) — otherwise the plugins will be installed with the wrong filesystem permissions (and you'll see errors about it when you run the main Logstash process).