Skip to content

Commit

Permalink
ECS Compatibility (elastic#12305)
Browse files Browse the repository at this point in the history
Implements a plugin `ecs_compatibility` option, whose default value is powered
by the pipeline-level setting `pipeline.ecs_compatibility`, in line with the
proposal in elastic#11623:

In order to increase the confidence a user has when upgrading Logstash, this
implementation uses the deprecation logger to warn when `ecs_compatibility` is
used without an explicit directive.

For now, as we continue to add ECS Compatibility Modes, an opting into a
specific ECS Compatibility mode at a pipeline level is considered a BETA
feature. All plugins using the [ECS Compatibility Support][] adapter will
use the setting correctly, but pipelines configured in this way do not
guarantee consistent behaviour across minor versions of Logstash or the
plugins it bundles (e.g., upgraded plugins that have newly-implemented an ECS
Compatibility mode will use the pipeline-level setting as a default, causing
them to potentially behave differently after the upgrade).

This change-set also includes a significant amount of work within the
`PluginFactory`, which allows us to ensure that pipeline-level settings are
available to a Logstash plugin _before_ its `initialize` is executed,
including the maintaining of context for codecs that are routinely cloned.

* JEE: instantiate codecs only once
* PluginFactory: use passed FilterDelegator class
* PluginFactory: require engine name in init
* NOOP: remove useless secondary plugin factory interface
* PluginFactory: simplify, compute java args only when necessary
* PluginFactory: accept explicit id when vertex unavailable
* PluginFactory: make source optional, args required
* PluginFactory: threadsafe refactor of id duplicate tracking
* PluginFactory: make id extraction/geration more abstract/understandable
* PluginFactory: extract or generate ID when source not available
* PluginFactory: inject ExecutionContext before initializing plugins
* Codec: propagate execution_context and metric to clones
* Plugin: intercept string-specified codecs and propagate execution_context
* Plugin: implement `ecs_compatibility` for all plugins
* Plugin: deprecate use of `Config::Mixin::DSL::validate_value(String, :codec)`
  • Loading branch information
yaauie committed Oct 6, 2020
1 parent ccbc569 commit e11b6d3
Show file tree
Hide file tree
Showing 36 changed files with 754 additions and 372 deletions.
1 change: 1 addition & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func normalizeSetting(setting string) (string, error) {
"pipeline.batch.delay",
"pipeline.unsafe_shutdown",
"pipeline.java_execution",
"pipeline.ecs_compatibility"
"pipeline.plugin_classloaders",
"path.config",
"config.string",
Expand Down
8 changes: 8 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
logger.warn("deprecated setting `config.field_reference.parser` set; field reference parsing is strict by default")
end

if @settings.set?('pipeline.ecs_compatibility')
ecs_compatibility_value = settings.get('pipeline.ecs_compatibility')
if ecs_compatibility_value != 'disabled'
logger.warn("Setting `pipeline.ecs_compatibility` given as `#{ecs_compatibility_value}`; " +
"values other than `disabled` are currently considered BETA and may have unintended consequences when upgrading minor versions of Logstash.")
end
end

# This is for backward compatibility in the tests
if source_loader.nil?
@source_loader = LogStash::Config::SourceLoader.new
Expand Down
4 changes: 3 additions & 1 deletion logstash-core/lib/logstash/codecs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def flush(&block)

public
def clone
return self.class.new(params)
LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, self.class, params).tap do |klone|
klone.metric = @metric if klone.instance_variable_get(:@metric).nil?
end
end
end; end # class LogStash::Codecs::Base
6 changes: 3 additions & 3 deletions logstash-core/lib/logstash/config/config_ast.rb
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ def compile_initializer
# If any parent is a Plugin, this must be a codec.

if attributes.elements.nil?
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, {}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
else
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)

attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))" << (plugin_type == "codec" ? "" : "\n")
end
end

Expand All @@ -271,7 +271,7 @@ def compile
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, line_to_source(#{source_meta.line}, #{source_meta.column}), #{attributes_code})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code}, line_to_source(#{source_meta.line}, #{source_meta.column}))"
end
end

Expand Down
19 changes: 18 additions & 1 deletion logstash-core/lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
module LogStash::Config::Mixin

include LogStash::Util::SubstitutionVariables
include LogStash::Util::Loggable

attr_accessor :config
attr_accessor :original_params
Expand Down Expand Up @@ -99,6 +100,17 @@ def config_init(params)
params[name.to_s] = deep_replace(value)
end

# Intercept codecs that have not been instantiated
params.each do |name, value|
validator = self.class.validator_find(name)
next unless validator && validator[:validate] == :codec && value.kind_of?(String)

codec_klass = LogStash::Plugin.lookup("codec", value)
codec_instance = LogStash::Plugins::Contextualizer.initialize_plugin(execution_context, codec_klass)

params[name.to_s] = LogStash::Codecs::Delegator.new(codec_instance)
end

if !self.class.validate(params)
raise LogStash::ConfigurationError,
I18n.t("logstash.runner.configuration.invalid_plugin_settings")
Expand Down Expand Up @@ -190,7 +202,7 @@ def config(name, opts={})
name = name.to_s if name.is_a?(Symbol)
@config[name] = opts # ok if this is empty

if name.is_a?(String)
if name.is_a?(String) && opts.fetch(:attr_accessor, true)
define_method(name) { instance_variable_get("@#{name}") }
define_method("#{name}=") { |v| instance_variable_set("@#{name}", v) }
end
Expand Down Expand Up @@ -429,6 +441,11 @@ def validate_value(value, validator)
case validator
when :codec
if value.first.is_a?(String)
# A plugin's codecs should be instantiated by `PluginFactory` or in `Config::Mixin#config_init(Hash)`,
# which ensure the inner plugin has access to the outer's execution context and metric store.
# This deprecation exists to warn plugins that call `Config::Mixin::validate_value` directly.
self.deprecation_logger.deprecated("Codec instantiated by `Config::Mixin::DSL::validate_value(String, :codec)` which cannot propagate parent plugin's execution context or metrics. ",
self.logger.debug? ? {:backtrace => caller} : {})
value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new
return true, value
else
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ module Environment
Setting::Boolean.new("pipeline.plugin_classloaders", false),
Setting::Boolean.new("pipeline.separate_logs", false),
Setting::CoercibleString.new("pipeline.ordered", "auto", true, ["auto", "true", "false"]),
Setting::CoercibleString.new("pipeline.ecs_compatibility", "disabled", true, %w(disabled v1 v2)),
Setting.new("path.plugins", Array, []),
Setting::NullableString.new("interactive", nil, false),
Setting::Boolean.new("config.debug", false),
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ def metric=(metric)

def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
# Setting the execution context after initialization is deprecated and will be removed in
# a future release of Logstash. While this code is no longer executed from Logstash core,
# we continue to propagate a set execution context to an input's codec, and rely on super's
# deprecation warning.
@codec.execution_context = context
context
end
Expand Down
8 changes: 4 additions & 4 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ def metric=(metric)

def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
# Setting the execution context after initialization is deprecated and will be removed in
# a future release of Logstash. While this code is no longer executed from Logstash core,
# we continue to propagate a set execution context to an output's codec, and rely on super's
# deprecation warning.
@codec.execution_context = context
context
end
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def non_reloadable_plugins
private


def plugin(plugin_type, name, source, *args)
@plugin_factory.plugin(plugin_type, name, source, *args)
def plugin(plugin_type, name, args, source)
@plugin_factory.plugin(plugin_type, name, args, source)
end

def default_logging_keys(other_keys = {})
Expand Down
16 changes: 14 additions & 2 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

require "logstash/config/mixin"
require "logstash/plugins/ecs_compatibility_support"
require "concurrent"
require "securerandom"

Expand All @@ -24,11 +25,12 @@
class LogStash::Plugin
include LogStash::Util::Loggable

attr_accessor :params, :execution_context
attr_accessor :params

NL = "\n"

include LogStash::Config::Mixin
include LogStash::Plugins::ECSCompatibilitySupport

# Disable or enable metric logging for this specific plugin instance
# by default we record all the metrics we can, but you can disable metrics collection
Expand Down Expand Up @@ -60,7 +62,7 @@ def eql?(other)
self.class.name == other.class.name && @params == other.params
end

def initialize(params=nil)
def initialize(params={})
@logger = self.logger
@deprecation_logger = self.deprecation_logger
# need to access settings statically because plugins are initialized in config_ast with no context.
Expand Down Expand Up @@ -177,4 +179,14 @@ def self.lookup(type, name)
def plugin_metadata
LogStash::PluginMetadata.for_plugin(self.id)
end

# Deprecated attr_writer for execution_context
def execution_context=(new_context)
@deprecation_logger.deprecated("LogStash::Plugin#execution_context=(new_ctx) is deprecated. Use LogStash::Plugins::Contextualizer#initialize_plugin(new_ctx, klass, args) instead", :caller => caller.first)
@execution_context = new_context
end

def execution_context
@execution_context || LogStash::ExecutionContext::Empty
end
end # class LogStash::Plugin
53 changes: 53 additions & 0 deletions logstash-core/lib/logstash/plugins/ecs_compatibility_support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module LogStash
module Plugins
module ECSCompatibilitySupport
def self.included(base)
base.extend(ArgumentValidator)
base.config(:ecs_compatibility, :validate => :ecs_compatibility_argument,
:attr_accessor => false)
end

MUTEX = Mutex.new
private_constant :MUTEX

def ecs_compatibility
@_ecs_compatibility || MUTEX.synchronize do
@_ecs_compatibility ||= begin
# use config_init-set value if present
break @ecs_compatibility unless @ecs_compatibility.nil?

pipeline = execution_context.pipeline
pipeline_settings = pipeline && pipeline.settings
pipeline_settings ||= LogStash::SETTINGS

if !pipeline_settings.set?('pipeline.ecs_compatibility')
deprecation_logger.deprecated("Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. " +
"To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.")
end

pipeline_settings.get_value('pipeline.ecs_compatibility').to_sym
end
end
end

module ArgumentValidator
V_PREFIXED_INTEGER_PATTERN = %r(\Av[1-9][0-9]?\Z).freeze
private_constant :V_PREFIXED_INTEGER_PATTERN

def validate_value(value, validator)
return super unless validator == :ecs_compatibility_argument

value = deep_replace(value)
value = hash_or_array(value)

if value.size == 1
return true, :disabled if value.first.to_s == 'disabled'
return true, value.first.to_sym if value.first.to_s =~ V_PREFIXED_INTEGER_PATTERN
end

return false, "Expected a v-prefixed integer major-version number (e.g., `v1`) or the literal `disabled`, got #{value.inspect}"
end
end
end
end
end
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ class LogStash::Runner < Clamp::StrictCommand
:attribute_name => "pipeline.unsafe_shutdown",
:default => LogStash::SETTINGS.get_default("pipeline.unsafe_shutdown")

option ["--pipeline.ecs_compatibility"], "STRING",
I18n.t("logstash.runner.flag.ecs_compatibility"),
:attribute_name => "pipeline.ecs_compatibility",
:default => LogStash::SETTINGS.get_default('pipeline.ecs_compatibility')

# Data Path Setting
option ["--path.data"] , "PATH",
I18n.t("logstash.runner.flag.datapath"),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Settings
"pipeline.system",
"pipeline.workers",
"pipeline.ordered",
"pipeline.ecs_compatibility",
"queue.checkpoint.acks",
"queue.checkpoint.interval",
"queue.checkpoint.writes",
Expand Down
15 changes: 15 additions & 0 deletions logstash-core/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,21 @@ en:
if there are still inflight events in memory.
By default, logstash will refuse to quit until all
received events have been pushed to the outputs.
ecs_compatibility: |+
Sets the pipeline's default value for `ecs_compatibility`,
a setting that is available to plugins that implement
an ECS Compatibility mode for use with the Elastic Common
Schema.
Possible values are:
- disabled (default)
- v1
- v2
This option allows the early opt-in (or preemptive opt-out)
of ECS Compatibility modes in plugins, which is scheduled to
be on-by-default in a future major release of Logstash.
Values other than `disabled` are currently considered BETA,
and may produce unintended consequences when upgrading Logstash.
rubyshell: |+
Drop to shell instead of running as normal.
Valid shells are "irb" and "pry"
Expand Down
25 changes: 25 additions & 0 deletions logstash-core/spec/logstash/config/mixin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,31 @@
end
end

context 'DSL::validate_value(String, :codec)' do
subject(:plugin_class) { Class.new(LogStash::Filters::Base) { config_name "test_deprecated_two" } }
let(:codec_class) { Class.new(LogStash::Codecs::Base) { config_name 'dummy' } }
let(:deprecation_logger) { double("DeprecationLogger").as_null_object }

before(:each) do
allow(plugin_class).to receive(:deprecation_logger).and_return(deprecation_logger)
allow(LogStash::Plugin).to receive(:lookup).with("codec", codec_class.config_name).and_return(codec_class)
end

it 'instantiates the codec' do
success, codec = plugin_class.validate_value(codec_class.config_name, :codec)

expect(success).to be true
expect(codec.class).to eq(codec_class)
end

it 'logs a deprecation' do
plugin_class.validate_value(codec_class.config_name, :codec)
expect(deprecation_logger).to have_received(:deprecated) do |message|
expect(message).to include("validate_value(String, :codec)")
end
end
end

context "when validating :bytes successfully" do
subject do
local_num_bytes = num_bytes # needs to be locally scoped :(
Expand Down
59 changes: 59 additions & 0 deletions logstash-core/spec/logstash/execution_context_factory_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

require "spec_helper"

describe LogStash::Plugins::ExecutionContextFactory do
let(:pipeline) { double('Pipeline') }
let(:agent) { double('Agent') }
let(:inner_dlq_writer) { nil }

subject(:factory) { described_class.new(agent, pipeline, inner_dlq_writer) }

context '#create' do
let(:plugin_id) { SecureRandom.uuid }
let(:plugin_type) { 'input' }

context 'the resulting instance' do
subject(:instance) { factory.create(plugin_id, plugin_type) }

it 'retains the pipeline from the factory' do
expect(instance.pipeline).to be(pipeline)
end

it 'retains the agent from the factory' do
expect(instance.agent).to be(agent)
end

it 'has a dlq_writer' do
expect(instance.dlq_writer).to_not be_nil
end

context 'dlq_writer' do
subject(:instance_dlq_writer) { instance.dlq_writer }

it 'retains the plugin id' do
expect(instance_dlq_writer.plugin_id).to eq(plugin_id)
end

it 'retains the plugin type' do
expect(instance_dlq_writer.plugin_type).to eq(plugin_type)
end
end
end
end
end
Loading

0 comments on commit e11b6d3

Please sign in to comment.