Skip to content

Commit

Permalink
Merge pull request #80 from cloud-gov/pull-from-cloudwatch
Browse files Browse the repository at this point in the history
Pull from cloudwatch
  • Loading branch information
JasonTheMain authored Sep 17, 2024
2 parents b701a37 + 58cc6b7 commit 5d83e68
Show file tree
Hide file tree
Showing 15 changed files with 500 additions and 3 deletions.
8 changes: 8 additions & 0 deletions config/blobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ logstash/logstash-input-tcp-6.4.1.zip:
size: 14501935
object_id: 77443abb-adce-433c-4789-bc507504e596
sha: sha256:ca440f035617138c53dc40450802ff058187a4734e177d3f68944bade68acdd4
logstash/logstash-integration-aws-7.1.6.zip:
size: 2143813
object_id: da9212f7-36ef-46b6-5480-8f5eaeb9fa9b
sha: sha256:b80e9f5812aa07c472782193e61ce402b30fa965440e25f002a5af2cf10261b1
logstash/logstash-output-opensearch-2.0.2.zip:
size: 24204545
object_id: f15ae09c-ac8c-4dba-51ce-952917dedad6
Expand All @@ -242,6 +246,10 @@ logstash/logstash-output-syslog-3.0.5.zip:
size: 41727
object_id: 21474ee6-824a-4547-7f45-b236864358ab
sha: sha256:80a2b08f2b58ae81c823cc0bb1dd7956fbc173037fc621bfa31af34140724e02
logstash/logstash-input-cloudwatch_logs-1.0.4.zip:
size: 73273631
object_id: 6f80a2b6-c3bf-4c2e-6c4e-5b99eee310b3
sha: sha256:2d80363ae50bbec723d6424f23f722b887105d4aded09189f5cbe3fef85b8a8a
opensearch-2.14.0-linux-x64.tar.gz:
size: 887872915
object_id: 46c6a9e0-7ae0-4bf0-4bbb-524ddaed1941
Expand Down
5 changes: 5 additions & 0 deletions jobs/ingestor_cloudwatch/monit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
check process ingestor_cloudwatch
with pidfile /var/vcap/sys/run/bpm/ingestor_cloudwatch/ingestor_cloudwatch.pid
start program "/var/vcap/jobs/bpm/bin/bpm start ingestor_cloudwatch"
stop program "/var/vcap/jobs/bpm/bin/bpm stop ingestor_cloudwatch"
group vcap
192 changes: 192 additions & 0 deletions jobs/ingestor_cloudwatch/spec
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
---
name: ingestor_cloudwatch

description: |
This job runs Logstash process which ingests data by from cloudwatch

packages:
- logstash
- base-logstash-filters
- openjdk-17

templates:
bin/ingestor_cloudwatch: bin/ingestor_cloudwatch.sh
bin/pre-start: bin/pre-start
config/bpm.yml.erb: config/bpm.yml
config/input_and_output.conf.erb: config/input_and_output.conf
config/logstash.yml.erb: config/logstash.yml
config/jvm.options.erb: config/jvm.options
config/ingestor-crt.erb: config/ssl/ingestor.crt
config/ingestor-pem.erb: config/ssl/ingestor.pem
config/ca.erb: config/ssl/opensearch.ca

provides:
- name: ingestor-cloudwatch
type: ingestor-cloudwatch
properties:
- logstash_ingestor.cloudwatch.port
- logstash_ingestor.cloudwatch.transport
- logstash_ingestor.cloudwatch_tls.port
- logstash_ingestor.relp.port
- name: cloudwatch_forwarder
type: cloudwatch_forwarder
properties:
- logstash_ingestor.cloudwatch.port
consumes:
- name: opensearch
type: opensearch
optional: true

properties:
logstash.ssl_client_authentication:
description: Controls the servers behavior in regard to requesting a certificate from client connections
default: required
logstash.heap_size:
description: sets jvm heap sized
logstash.heap_percentage:
description: The percentage value used in the calculation to set the heap size.
default: 46
logstash.jvm_options:
description: additional jvm options
default: []
logstash.metadata_level:
description: "Whether to include additional metadata throughout the event lifecycle. NONE = disabled, DEBUG = fully enabled"
default: "NONE"
logstash.log_level:
description: The default logging level (e.g. WARN, DEBUG, INFO)
default: info
logstash.plugins:
description: "Array of hashes describing logstash plugins to install"
example:
- {name: logstash-output-cloudwatchlogs, version: 2.0.0}
default: []

logstash.ecs_compatibility:
description: Whether to enable ECS compatibility for geoip filters. See https://www.elastic.co/guide/en/logstash/current/plugins-filters-geoip.html#plugins-filters-geoip-ecs_compatibility
default: "disabled"

logstash.env:
description: "a list of arbitrary key-value pairs to be passed on as process environment variables. eg: FOO: 123"
default: []

logstash.queue.type:
description: Internal queuing model, "memory" for legacy in-memory based queuing and "persisted" for disk-based acked queueing.
default: persisted
logstash.queue.page_capacity:
description: The page data files size. The queue data consists of append-only data files separated into pages.
default: 250mb
logstash.queue.max_events:
description: The maximum number of unread events in the queue.
default: 0
logstash.queue.max_bytes:
description: The total capacity of the queue in number of bytes.
default: 1024mb
logstash.queue.checkpoint.acks:
description: The maximum number of acked events before forcing a checkpoint.
default: 1024
logstash.queue.checkpoint.writes:
description: The maximum number of written events before forcing a checkpoint.
default: 1024
logstash.queue.checkpoint.interval:
description: The interval in milliseconds when a checkpoint is forced on the head page.
default: 1000

logstash_ingestor.filters:
description: Filters to execute on the ingestors
default: ""

logstash_ingestor.cloudwatch.port:
description: Port to listen for cloudwatch messages
logstash_ingestor.cloudwatch.transport:
description: Transport protocol to use
default: "tcp"
logstash_ingestor.cloudwatch.use_keepalive:
description: Instruct the socket to use TCP keep alives

logstash_ingestor.health.disable_post_start:
description: Skip post-start health checks?
default: false
logstash_ingestor.health.interval:
description: Logstash cloudwatch health check interval (seconds)
default: 5
logstash_ingestor.health.timeout:
description: Logstash cloudwatch health check number of attempts (seconds)
default: 300

logstash_ingestor.cloudwatch_tls.port:
description: Port to listen for cloudwatch-TLS messages (omit to disable)
logstash_ingestor.cloudwatch_tls.ssl_cert:
description: cloudwatch-TLS SSL certificate (file contents, not a path) - required if logstash_ingestor.cloudwatch_tls.port set
logstash_ingestor.cloudwatch_tls.ssl_key:
description: cloudwatch-TLS SSL key (file contents, not a path) - required if logstash_ingestor.cloudwatch_tls.port set
logstash_ingestor.cloudwatch_tls.skip_ssl_validation:
description: Verify the identity of the other end of the SSL connection against the CA.
default: false
logstash_ingestor.cloudwatch_tls.use_keepalive:
description: Instruct the socket to use TCP keep alives
logstash_ingestor.cloudwatch.prefix:
description: "What prefix in cloudwatch you want to pull in"
default: ["/aws/rds/instance/"]
logstash_ingestor.cloudwatch.region:
description: "Region for aws"
logstash_ingestor.relp.port:
description: Port to listen for RELP messages
default: 2514

logstash_parser.debug:
description: Debug level logging
default: false
logstash_parser.message_max_size:
description: "Maximum log message length. Anything larger is truncated (TODO: move this to ingestor?)"
default: 1048576
logstash_parser.filters:
description: "The configuration to embed into the logstash filters section. Can either be a set of parsing rules as a string or a list of hashes in the form of [{name: path_to_parsing_rules.conf}]"
default: ''
logstash_parser.outputs:
description: |
A list of output plugins, with a hash of options for each of them. Please refer to example below.
example:
inputs:
- plugin: mongodb
options:
uri: 192.168.1.1
database: logsearch
collection: logs
default: [ { plugin: "opensearch", options: {} } ]
logstash_parser.workers:
description: "The number of worker threads that logstash should use (default: auto = one per CPU)"
default: auto
logstash_parser.opensearch.idle_flush_time:
description: "How frequently to flush events if the output queue is not full."
logstash_parser.opensearch.document_id:
description: "Use a specific, dynamic ID rather than an auto-generated identifier."
default: ~
logstash_parser.opensearch.index:
description: "The specific, dynamic index name to write events to."
default: "logstash-%{+YYYY.MM.dd}"
logstash_parser.opensearch.index_type:
description: "The specific, dynamic index type name to write events to."
default: "%{@type}"
logstash_parser.opensearch.routing:
description: "The routing to be used when indexing a document."
logstash_parser.opensearch.ssl.certificate:
description: Node certificate for communication between logstash_parser and Opensearch
logstash_parser.opensearch.ssl.private_key:
description: Private key for communication between logstash_parser and Opensearch
logstash_parser.opensearch.data_hosts:
description: The list of opensearch data node IPs
logstash_parser.opensearch.verification_mode:
description: the verification mode, can be full or none
default: "full"
logstash_parser.timecop.reject_greater_than_hours:
description: "Logs with timestamps greater than this many hours in the future won't be parsed and will get tagged with fail/timecop"
default: 1
logstash_parser.timecop.reject_less_than_hours:
description: "Logs with timestamps less than this many hours in the past won't be parsed and will get tagged with fail/timecop"
default: 24
logstash_parser.enable_json_filter:
description: "Toggles the if_it_looks_like_json.conf filter rule"
default: false
logstash_parser.wait_for_templates:
description: "A list of index templates that need to be present in opensearch before the process starts"
default: ["index_template"]
83 changes: 83 additions & 0 deletions jobs/ingestor_cloudwatch/templates/bin/ingestor_cloudwatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/bin/bash

set -e # exit immediately if a simple command exits with a non-zero status
set -u # report the usage of uninitialized variables

# Setup env vars and folders for the webapp_ctl script
#source /var/vcap/jobs/ingestor_cloudwatch/helpers/ctl_setup.sh 'ingestor_cloudwatch'
JOB_NAME=ingestor_cloudwatch
export LOG_DIR=/var/vcap/sys/log/$JOB_NAME
export STORE_DIR=/var/vcap/store/$JOB_NAME
export JOB_DIR=/var/vcap/jobs/$JOB_NAME
source /var/vcap/packages/openjdk-17/bosh/runtime.env

<%
es_host = nil
if_p("logstash_parser.opensearch.data_hosts") { |hosts| es_host = hosts.first }
unless es_host
es_host = link("opensearch").instances.first.address
end
%>

function wait_for_template {
local template_name="$1"
local MASTER_URL="<%= es_host %>:9200"

set +e
while true; do
echo "Waiting for index template to be uploaded: $template_name"
curl \
--key ${JOB_DIR}/ssl/ingestor.key \
--cert ${JOB_DIR}/ssl/ingestor.crt \
--cacert ${JOB_DIR}/ssl/opensearch.ca \
-I -f -i "$MASTER_URL"/_template/$template_name > /dev/null 2>&1
[ $? ] && break
sleep 5
done
set -e
echo "Found $template_name"
}

export PORT=${PORT:-5000}
export LANG=en_US.UTF-8
<% if 'auto' == p('logstash_parser.workers') %>
# 1 logstash worker / CPU core
export LOGSTASH_WORKERS=`grep -c ^processor /proc/cpuinfo`
<% else %>
export LOGSTASH_WORKERS=<%= p('logstash_parser.workers') %>
<% end %>
export TIMECOP_REJECT_GREATER_THAN_HOURS=<%= p('logstash_parser.timecop.reject_greater_than_hours') %>
export TIMECOP_REJECT_LESS_THAN_HOURS=<%= p('logstash_parser.timecop.reject_less_than_hours') %>
export HEAP_SIZE=$((( $( cat /proc/meminfo | grep MemTotal | awk '{ print $2 }' ) * <%= p("logstash.heap_percentage") %> ) / 100 ))K
<% if_p('logstash.heap_size') do |heap_size| %>
HEAP_SIZE=<%= heap_size %>
<% end %>
<% p("logstash.env").each do |env| %>
export <%= env.keys[0] %>="<%= env.values[0] %>"
<% end %>


# These are what changes between ingestors
<% p("logstash_parser.wait_for_templates").each do |template| %>
wait_for_template "<%= template %>"
<% end %>

export LS_JAVA_OPTS="-Xms$HEAP_SIZE -Xmx$HEAP_SIZE -DPID=$$"

# construct a complete config file from all the fragments
cat ${JOB_DIR}/config/input_and_output.conf > ${JOB_DIR}/config/logstash.conf

# # clear persistent queue if the upgrade failed last run
if cat $LOG_DIR/$JOB_NAME.stdout.log | grep -a 'QueueUpgrade - Logstash was unable to upgrade your persistent queue data' >/dev/null ; then
mkdir ${STORE_DIR}/oldqueue.$$
mv ${STORE_DIR}/queue ${STORE_DIR}/.lock ${STORE_DIR}/dead_letter_queue ${STORE_DIR}/uuid ${STORE_DIR}/oldqueue.$$
mv $LOG_DIR/$JOB_NAME.stdout.log $LOG_DIR/$JOB_NAME.stdout.log.old
fi

/var/vcap/packages/logstash/bin/logstash \
--path.data ${STORE_DIR} \
--path.config ${JOB_DIR}/config/logstash.conf \
--path.settings ${JOB_DIR}/config \
--pipeline.ecs_compatibility <%= p("logstash.ecs_compatibility") %> \
--pipeline.workers ${LOGSTASH_WORKERS} \
--log.format=json --log.level=<%= p("logstash.log_level") %>
22 changes: 22 additions & 0 deletions jobs/ingestor_cloudwatch/templates/bin/pre-start
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
source /var/vcap/packages/openjdk-17/bosh/runtime.env

export JOB_NAME=ingestor_cloudwatch
export JOB_DIR=/var/vcap/jobs/$JOB_NAME

<% p("logstash.plugins").each do |plugin| %>
/var/vcap/packages/logstash/bin/logstash-plugin install \
<%= plugin.except("name").map { |key, value| "--#{key}=#{value}" }.join(" ") %> \
<%= plugin["name"] %>
<% end %>

<% if_link('opensearch') do |ingestor_cloudwatch| %>
openssl pkcs8 -v1 "PBE-SHA1-3DES" \
-in "${JOB_DIR}/config/ssl/ingestor.pem" -topk8 \
-out "${JOB_DIR}/config/ssl/ingestor.key" -nocrypt
chmod 600 ${JOB_DIR}/config/ssl/ingestor.key
<% end %>

if [ -d ${JOB_DIR}/config/ssl ]; then
chown -R vcap:vcap ${JOB_DIR}/config/ssl
fi
13 changes: 13 additions & 0 deletions jobs/ingestor_cloudwatch/templates/config/bpm.yml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
processes:
- name: ingestor_cloudwatch
hooks:
pre_start: /var/vcap/jobs/ingestor_cloudwatch/bin/pre-start
executable: /var/vcap/jobs/ingestor_cloudwatch/bin/ingestor_cloudwatch.sh
ephemeral_disk: true
persistent_disk: true
additional_volumes:
- path: /var/vcap/sys/tmp/ingestor_cloudwatch
writable: true
allow_executions: true
- path: /var/vcap/jobs/ingestor_cloudwatch/config
writable: true
5 changes: 5 additions & 0 deletions jobs/ingestor_cloudwatch/templates/config/ca.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<% if_link("opensearch") do |opensearch_config| %>
<% opensearch_config.if_p('opensearch.node.ssl.ca') do %>
<%= opensearch_config.p('opensearch.node.ssl.ca', '') %>
<% end %>
<% end %>
1 change: 1 addition & 0 deletions jobs/ingestor_cloudwatch/templates/config/ingestor-crt.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<%= p('logstash_parser.opensearch.ssl.certificate', '') %>
1 change: 1 addition & 0 deletions jobs/ingestor_cloudwatch/templates/config/ingestor-pem.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<%= p('logstash_parser.opensearch.ssl.private_key', '') %>
Loading

0 comments on commit 5d83e68

Please sign in to comment.