Logstash - ingest and parse S3 and CloudFront access logs

Puppet yaml config for the logstash

---

profile_logstash::logstash_configs:
  input_s3_cassandra: # ingest S3 Access Logs of Cassandra static website
      content: |
          input {
            s3 {
              "region" => "eu-west-1"
              "bucket" => "s3-logs-%{::product}-%{::ecosystem}-cassandra"
              "prefix" => "logs/" # subdir to search for log files
              "backup_to_bucket" => "s3-logs-%{::product}-%{::ecosystem}-cassandra" # to the same bucket, but under subdir
              "backup_add_prefix" => "logs-processed/"
              "delete" => true # delete processed log files
              "interval" => 60 # seconds to wait to check the file list again after a run is finished, default 60
              "type" => "s3_access_logs"
              "add_field" => {
                "role" => "cassandra"
              }
            }
          }
  filter_s3: # parse S3 Access Logs
      content: |
          filter {
            if [type] == "s3_access_logs" {
              grok {
                patterns_dir => "/etc/logstash/patterns/"
                tag_on_failure => ["_grokparsefailure_s3"]
                match => [ "message", "%%{}{S3ACCESSLOG}" ] # %%{} - becomes just %, processed by hiera
                remove_field => [ "message" ] # when successfully parsed
              }
            }
          }
  input_cf_cassandra: # ingest CloudFront Access Logs of Cassandra static website
      content: |
          input {
            s3 {
              "region" => "eu-west-1"
              "bucket" => "cf-logs-%{::product}-%{::ecosystem}-cassandra"
              "prefix" => "logs/" # subdir to search for log files
              "backup_to_bucket" => "cf-logs-%{::product}-%{::ecosystem}-cassandra" # to the same bucket, but under subdir
              "backup_add_prefix" => "logs-processed/"
              "delete" => true # delete processed log files
              "interval" => 60 # seconds to wait to check the file list again after a run is finished, default 60
              "type" => "cf_access_logs"
              "add_field" => {
                "role" => "cassandra"
              }
            }
          }
  filter_cf: # parse CF Access Logs
      content: |
          filter {
            if [cloudfront_version] {
              grok {
                match => { "message" => "%%{}{CFACCESSLOG}" }
                patterns_dir => "/etc/logstash/patterns/"
                tag_on_failure => ["_grokparsefailure_cf"]
                remove_field => [ "message" ] # when successfully parsed
              }
              date {
                match => [ "%%{}{year}-%%{}{month}-%%{}{day} %%{}{time}", "yyyy-MM-dd HH:mm:ss" ]
              }
              mutate {
                remove_field => ["year", "month", "day", "time", "cloudfront_version", "cloudfront_fields"]
              }
            }
          }

profile_base::scripts:
  '/etc/logstash/patterns/s3_access_logs':
    source: 'puppet:///modules/data/logstash/patterns/s3_access_logs'
    owner: root
    group: logstash
    mode: '0640'
    notify: 'Service[logstash]'
  '/etc/logstash/patterns/cf_access_logs':
    source: 'puppet:///modules/data/logstash/patterns/cf_access_logs'
    owner: root
    group: logstash
    mode: '0640'
    notify: 'Service[logstash]'

Content of s3_access_logs file

S3ERRORCODE [a-zA-z]+

S3ACCESSLOG1 %{BASE16NUM:s3_owner} (-|%{HOSTNAME:s3_bucket}) \[%{HTTPDATE:timestamp}\] %{IP:s3_remote_ip} %{NOTSPACE:s3_requester}
S3ACCESSLOG2 %{WORD:s3_request_id} %{NOTSPACE:s3_operation} (-|%{NOTSPACE:s3_key}) (?:"%{S3_REQUEST_LINE}"|-)
S3ACCESSLOG3 %{INT:s3_http_status:int} (-|%{S3ERRORCODE:s3_error_code}) (-|%{INT:s3_bytes_sent:int}) (-|%{INT:s3_object_size:int})
S3ACCESSLOG4 (-|%{INT:s3_total_time:int}) (-|%{INT:s3_turnaround_time:int}) (?:%{QUOTEDSTRING:referrer}|-) (-|%{QUOTEDSTRING:s3_user_agent}) (-|%{NOTSPACE:s3_version_id})
S3ACCESSLOG5 %{NOTSPACE:s3_host_id} (-|%{WORD:s3_signature_version}) (-|%{NOTSPACE:s3_cipher_suite}) (-|%{WORD:s3_authentication_type}) (?:%{HOSTNAME:s3_host_header}) (-|%{NOTSPACE:s3_tls_version})

S3ACCESSLOG %{S3ACCESSLOG1} %{S3ACCESSLOG2} %{S3ACCESSLOG3} %{S3ACCESSLOG4} %{S3ACCESSLOG5}

Content of cf_access_logs file

CFACCESSLOG %{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day}\t%{TIME:time}\t%{HOSTNAME:x_edge_location}\t(?:%{NUMBER:sc_bytes:int}|-)\t%{IPORHOST:c_ip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status:int}\t(-|%{NOTSPACE:cs_referrer})\t%{NOTSPACE:cs_user_agent}\t(-|%{NOTSPACE:cs_uri_query})\t(-|%{WORD:cs_cookie})\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes:int}\t%{NUMBER:time_taken}\t(-|%{NOTSPACE:x_forwarded_for})\t(-|%{NOTSPACE:ssl_protocol})\t(-|%{NOTSPACE:ssl_cipher})\t%{NOTSPACE:x_edge_response_result_type}\t%{NOTSPACE:cs_protocol_version}\t(-|%{NOTSPACE:fle_status})\t(-|%{NOTSPACE:fle_encrypted_fields})\t(-|%{NOTSPACE:c_port})\t%{NUMBER:time_to_first_byte}\t%{NOTSPACE:x_edge_detailed_result_type}\t%{NOTSPACE:sc_content_type}\t(-|%{NUMBER:sc_content_len})\t(-|%{NOTSPACE:sc_range_start})\t(-|%{NOTSPACE:sc_range_end})
  • tool to debug grok patterns: Grok Debugger | Autocomplete and Live Match Highlghting
  • standard grok patterns: logstash-patterns/files/grok-patterns at master · hpcugent/logstash-patterns

How to quickly delete lots of log files

(in AWS Console it takes ages)

export BUCKET=s3-logs-fe-dev-static
export PREFIX=logs-processed/
aws s3api list-objects-v2 --bucket $BUCKET --prefix $PREFIX --output text --query \
    'Contents[].[Key]' | grep -v -e "'" | tr '\n' '\0' | xargs -0 -P2 -n500 bash -c \
    'aws s3api delete-objects --bucket $BUCKET --delete "Objects=[$(printf "{Key=%q}," "$@")],Quiet=true"' _