class Fluent::ElasticsearchOutput

Constants

DEFAULT_RELOAD_AFTER

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch.rb, line 96
def initialize
  super
end

Public Instance Methods

append_record_to_messages(op, meta, header, record, msgs) click to toggle source

#append_record_to_messages adds a record to the bulk message payload to be submitted to Elasticsearch. Records that do not include '_id' field are skipped when 'write_operation' is configured for 'create' or 'update'

returns 'true' if record was appended to the bulk message

and 'false' otherwise
# File lib/fluent/plugin/out_elasticsearch.rb, line 297
def append_record_to_messages(op, meta, header, record, msgs)
  case op
  when UPDATE_OP, UPSERT_OP
    if meta.has_key?(ID_FIELD)
      header[UPDATE_OP] = meta
      msgs << @dump_proc.call(header) << BODY_DELIMITER
      msgs << @dump_proc.call(update_body(record, op)) << BODY_DELIMITER
      return true
    end
  when CREATE_OP
    if meta.has_key?(ID_FIELD)
      header[CREATE_OP] = meta
      msgs << @dump_proc.call(header) << BODY_DELIMITER
      msgs << @dump_proc.call(record) << BODY_DELIMITER
      return true
    end
  when INDEX_OP
    header[INDEX_OP] = meta
    msgs << @dump_proc.call(header) << BODY_DELIMITER
    msgs << @dump_proc.call(record) << BODY_DELIMITER
    return true
  end
  return false
end
client() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 198
def client
  @_es ||= begin
    excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
    adapter_conf = lambda {|f| f.adapter :excon, excon_options }
    local_reload_connections = @reload_connections
    if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
      local_reload_connections = @reload_after
    end
    transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
                                                                        options: {
                                                                          reload_connections: local_reload_connections,
                                                                          reload_on_failure: @reload_on_failure,
                                                                          resurrect_after: @resurrect_after,
                                                                          retry_on_failure: 5,
                                                                          logger: @transport_logger,
                                                                          transport_options: {
                                                                            headers: { 'Content-Type' => 'application/json' },
                                                                            request: { timeout: @request_timeout },
                                                                            ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
                                                                          },
                                                                          http: {
                                                                            user: @user,
                                                                            password: @password
                                                                          },
                                                                          sniffer_class: @sniffer_class,
                                                                        }), &adapter_conf)
    es = Elasticsearch::Client.new transport: transport

    begin
      raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})!" unless es.ping
    rescue *es.transport.host_unreachable_exceptions => e
      raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})! #{e.message}"
    end

    log.info "Connection opened to Elasticsearch cluster => #{connection_options_description}"
    es
  end
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elasticsearch.rb, line 100
def configure(conf)
  super
  @time_parser = create_time_parser

  if @remove_keys
    @remove_keys = @remove_keys.split(/\s*,\s*/)
  end

  if @target_index_key && @target_index_key.is_a?(String)
    @target_index_key = @target_index_key.split '.'
  end

  if @target_type_key && @target_type_key.is_a?(String)
    @target_type_key = @target_type_key.split '.'
  end

  if @remove_keys_on_update && @remove_keys_on_update.is_a?(String)
    @remove_keys_on_update = @remove_keys_on_update.split ','
  end

  if @template_name && @template_file
    template_install(@template_name, @template_file, @template_overwrite)
  elsif @templates
    templates_hash_install(@templates, @template_overwrite)
  end

  @meta_config_map = create_meta_config_map

  begin
    require 'oj'
    @dump_proc = Oj.method(:dump)
  rescue LoadError
    @dump_proc = Yajl.method(:dump)
  end

  if @user && m = @user.match(/%{(?<user>.*)}/)
    @user = URI.encode_www_form_component(m["user"])
  end
  if @password && m = @password.match(/%{(?<password>.*)}/)
    @password = URI.encode_www_form_component(m["password"])
  end

  if @hash_config
    raise Fluent::ConfigError, "@hash_config.hash_id_key and id_key must be equal." unless @hash_config.hash_id_key == @id_key
  end

  @transport_logger = nil
  if @with_transporter_log
    @transport_logger = log
    log_level = conf['@log_level'] || conf['log_level']
    log.warn "Consider to specify log_level with @log_level." unless log_level
  end

  @sniffer_class = nil
  begin
    @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
  rescue Exception => ex
    raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
  end

end
connection_options_description() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 282
def connection_options_description
  get_connection_options[:hosts].map do |host_info|
    attributes = host_info.dup
    attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
    attributes.inspect
  end.join(', ')
end
create_meta_config_map() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 162
def create_meta_config_map
  result = []
  result << [@id_key, '_id'] if @id_key
  result << [@parent_key, '_parent'] if @parent_key
  result << [@routing_key, '_routing'] if @routing_key
  result
end
create_time_parser() click to toggle source

once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives

sec,nsec

where as we want something we can call `strftime` on…

# File lib/fluent/plugin/out_elasticsearch.rb, line 173
def create_time_parser
  if @time_key_format
    begin
      # Strptime doesn't support all formats, but for those it does it's
      # blazingly fast.
      strptime = Strptime.new(@time_key_format)
      Proc.new { |value| strptime.exec(value).to_datetime }
    rescue
      # Can happen if Strptime doesn't recognize the format; or
      # if strptime couldn't be required (because it's not installed -- it's
      # ruby 2 only)
      Proc.new { |value| DateTime.strptime(value, @time_key_format) }
    end
  else
    Proc.new { |value| DateTime.parse(value) }
  end
end
flatten_record(record, prefix=[]) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 344
def flatten_record(record, prefix=[])
  ret = {}
  if record.is_a? Hash
    record.each { |key, value|
      ret.merge! flatten_record(value, prefix + [key.to_s])
    }
  elsif record.is_a? Array
    # Don't mess with arrays, leave them unprocessed
    ret.merge!({prefix.join(@flatten_hashes_separator) => record})
  else
    return {prefix.join(@flatten_hashes_separator) => record}
  end
  ret
end
get_connection_options() click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 249
def get_connection_options
  raise "`password` must be present if `user` is present" if @user && !@password

  hosts = if @hosts
    @hosts.split(',').map do |host_str|
      # Support legacy hosts format host:port,host:port,host:port...
      if host_str.match(%r{^[^:]+(\:\d+)?$})
        {
          host:   host_str.split(':')[0],
          port:   (host_str.split(':')[1] || @port).to_i,
          scheme: @scheme.to_s
        }
      else
        # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
        uri = URI(get_escaped_userinfo(host_str))
        %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end
      end
    end.compact
  else
    [{host: @host, port: @port, scheme: @scheme.to_s}]
  end.each do |host|
    host.merge!(user: @user, password: @password) if !host[:user] && @user
    host.merge!(path: @path) if !host[:path] && @path
  end

  {
    hosts: hosts
  }
end
get_escaped_userinfo(host_str) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 237
def get_escaped_userinfo(host_str)
  if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
    m["scheme"] +
      URI.encode_www_form_component(m["user"]) +
      ':' +
      URI.encode_www_form_component(m["password"]) +
      m["path"]
  else
    host_str
  end
end
get_parent_of(record, path) click to toggle source

returns [parent, child_key] of child described by path array in record's tree returns [nil, child_key] if path doesnt exist in record

# File lib/fluent/plugin/out_elasticsearch.rb, line 454
def get_parent_of(record, path)
  parent_object = path[0..-2].reduce(record) { |a, e| a.is_a?(Hash) ? a[e] : nil }
  [parent_object, path[-1]]
end
parse_time(value, event_time, tag) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 191
def parse_time(value, event_time, tag)
  @time_parser.call(value)
rescue => e
  router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e)
  return Time.at(event_time).to_datetime
end
process_message(tag, meta, header, time, record, bulk_message) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 385
def process_message(tag, meta, header, time, record, bulk_message)
  if @flatten_hashes
    record = flatten_record(record)
  end

  if @hash_config
    record = generate_hash_id_key(record)
  end

  dt = nil
  if @logstash_format || @include_timestamp
    if record.has_key?(TIMESTAMP_FIELD)
      rts = record[TIMESTAMP_FIELD]
      dt = parse_time(rts, time, tag)
    elsif record.has_key?(@time_key)
      rts = record[@time_key]
      dt = parse_time(rts, time, tag)
      record[TIMESTAMP_FIELD] = rts unless @time_key_exclude_timestamp
    else
      dt = Time.at(time).to_datetime
      record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision)
    end
  end

  target_index_parent, target_index_child_key = @target_index_key ? get_parent_of(record, @target_index_key) : nil
  if target_index_parent && target_index_parent[target_index_child_key]
    target_index = target_index_parent.delete(target_index_child_key)
  elsif @logstash_format
    dt = dt.new_offset(0) if @utc_index
    target_index = "#{@logstash_prefix}#{@logstash_prefix_separator}#{dt.strftime(@logstash_dateformat)}"
  else
    target_index = @index_name
  end

  # Change target_index to lower-case since Elasticsearch doesn't
  # allow upper-case characters in index names.
  target_index = target_index.downcase
  if @include_tag_key
    record[@tag_key] = tag
  end

  target_type_parent, target_type_child_key = @target_type_key ? get_parent_of(record, @target_type_key) : nil
  if target_type_parent && target_type_parent[target_type_child_key]
    target_type = target_type_parent.delete(target_type_child_key)
  else
    target_type = @type_name
  end

  meta.clear
  meta["_index".freeze] = target_index
  meta["_type".freeze] = target_type

  if @pipeline
    meta["pipeline".freeze] = @pipeline
  end

  @meta_config_map.each do |record_key, meta_key|
    meta[meta_key] = record[record_key] if record[record_key]
  end

  if @remove_keys
    @remove_keys.each { |key| record.delete(key) }
  end

  append_record_to_messages(@write_operation, meta, header, record, bulk_message)
end
remove_keys(record) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 335
def remove_keys(record)
  keys = record[@remove_keys_on_update_key] || @remove_keys_on_update || []
  record.delete(@remove_keys_on_update_key)
  return record unless keys.any?
  record = record.dup
  keys.each { |key| record.delete(key) }
  record
end
send_bulk(data, tag, chunk, bulk_message_count) click to toggle source

#send_bulk given a specific bulk request, the original tag, chunk, and bulk_message_count

# File lib/fluent/plugin/out_elasticsearch.rb, line 461
def send_bulk(data, tag, chunk, bulk_message_count)
  retries = 0
  begin

    log.on_trace { log.trace "bulk request: #{data}" }
    response = client.bulk body: data
    log.on_trace { log.trace "bulk response: #{response}" }

    if response['errors']
      error = Fluent::ElasticsearchErrorHandler.new(self)
      error.handle_error(response, tag, chunk, bulk_message_count)
    end
  rescue RetryStreamError => e
    emit_tag = @retry_tag ? @retry_tag : tag
    router.emit_stream(emit_tag, e.retry_stream)
  rescue *client.transport.host_unreachable_exceptions => e
    if retries < 2
      retries += 1
      @_es = nil
      log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
      sleep 2**retries
      retry
    end
    raise ConnectionFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
  rescue Exception
    @_es = nil if @reconnect_on_error
    raise
  end
end
update_body(record, op) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 322
def update_body(record, op)
  update = remove_keys(record)
  body = {"doc".freeze => update}
  if op == UPSERT_OP
    if update == record
      body["doc_as_upsert".freeze] = true
    else
      body[UPSERT_OP] = record
    end
  end
  body
end
write_objects(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_elasticsearch.rb, line 359
def write_objects(tag, chunk)
  bulk_message_count = 0
  bulk_message = ''
  header = {}
  meta = {}
  chunk.msgpack_each do |time, record|
    next unless record.is_a? Hash
    begin
      if process_message(tag, meta, header, time, record, bulk_message)
        bulk_message_count += 1
      else
        if @emit_error_for_missing_id
          raise MissingIdFieldError, "Missing '_id' field. Write operation is #{@write_operation}"
        else
         log.on_debug { log.debug("Dropping record because its missing an '_id' field and write_operation is #{@write_operation}: #{record}") }
        end
      end
    rescue=>e
      router.emit_error_event(tag, time, record, e)
    end
  end

  send_bulk(bulk_message, tag, chunk, bulk_message_count) unless bulk_message.empty?
  bulk_message.clear
end