Han Jianchun个人博客 Beautiful Architecture in Software Design

如何编写logstash-filter插件

2016-12-30
hanjianchun

本文教你如何编写logstash-filter插件,是我在实际的开发经验中汲取的一些经验,现在分享一下!

怎么安装

先来说说自定义插件怎么安装

假如我自定义一个插件名字叫做test,版本是1.0.0,那么这个插件的目录结构是

logstash-filter-test-1.0.0
|__lib
|  |__logstash
|     |__filter
|        |__test.rb
|__logstash-filter-test.gemspec

定义好之后将此文件夹放到此目录下

logstash/vendor/bundle/jruby/1.9/gems/

再到logstash/Gemfile添加一行数据,声明此插件的位置

gem "logstash-filter-test", :path => "vendor/bundle/jruby/1.9/gems/logstash-filter-test-1.0.0"

到这里插件就已经完成了,在shipper.conf里面使用即可!

怎么编写

logstash-filter-test.gemspec

这个是一个配置文件,可以参考geoip这个插件:

logstash-5.1.1/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.0.4-java/logstash-filter-geoip.gemspec
Gem::Specification.new do |s|

  s.name            = 'logstash-filter-geoip'
  s.version         = '4.0.4'
  s.licenses        = ['Apache License (2.0)']
  s.summary         = "$summary"
  s.description     = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
  s.authors         = ["Elastic"]
  s.email           = 'info@elastic.co'
  s.homepage        = "http://www.elastic.co/guide/en/logstash/current/index.html"
  s.require_paths = ["lib"]
  s.platform      = "java"

  # Files
  s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT', 'maxmind-db-NOTICE.txt']

  # Tests
  s.test_files = s.files.grep(%r{^(test|spec|features)/})

  # Special flag to let us know this is actually a logstash plugin
  s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" }

  # Gem dependencies
  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"

  s.requirements << "jar com.maxmind.geoip2:geoip2, 2.5.0, :exclusions=> [com.google.http-client:google-http-client]"

这里只需要修改s.name和s.version其它的按照自己的需求可以相应的修改,

	s.name		=	'logstash-filter-test'
	s.version	=	'1.0.0'

编写 logstash-filter-test-1.0.0/lib/logstash/filter/test.rb

需求:在logstash shipper里传入remote_ip,自定义插件实现根据IP配置表判断内网IP所属的地区

IP配置表的格式:[10.11.1/200,10.11.1]

这里需要用到json,如果安装了rubygems可以直接使用命令安装

gem install json

如果没有安装rubygems或者gem source在国内失效无法安装json,那就可以使用我这个办法

[root@localhost logstash-5.1.1]# find vendor/* -name json.rb
vendor/bundle/jruby/1.9/gems/json-1.8.3-java/lib/json.rb
vendor/bundle/jruby/1.9/gems/http-0.9.9/lib/http/mime_type/json.rb
vendor/bundle/jruby/1.9/gems/logstash-codec-json-3.0.2/lib/logstash/codecs/json.rb
vendor/bundle/jruby/1.9/gems/logstash-filter-json-3.0.2/lib/logstash/filters/json.rb
vendor/bundle/jruby/1.9/gems/aws-sdk-core-2.3.22/lib/aws-sdk-core/json.rb
vendor/bundle/jruby/1.9/gems/aws-sdk-core-2.3.22/lib/aws-sdk-core/stubbing/protocols/json.rb
vendor/bundle/jruby/1.9/gems/coderay-1.1.1/lib/coderay/scanners/json.rb
vendor/bundle/jruby/1.9/gems/coderay-1.1.1/lib/coderay/encoders/json.rb
vendor/jruby/lib/ruby/shared/json.rb

可以看到在logstash的vendor目录下有很多json.rb文件,我选择的是

/logstash/vendor/bundle/jruby/1.9/gems/json-1.8.3-java/lib/json.rb
#encoding:utf-8

require "logstash/filters/base"

require "logstash/namespace"

require 'net/https'
require '/logstash/vendor/bundle/jruby/1.9/gems/json-1.8.3-java/lib/json.rb'
require 'digest/md5'
require 'uri'

class LogStash::Filters::Test < LogStash::Filters::Base

  config_name "test"

  config :remote_ip, :validate => :string
  config :tag_ip_failure, :validate => :array, :default => ["_test_parseip_failure"]


  public
  def register
	begin
		uri = '获取IP配置表接口的URL地址'
		$strIpInfor = JSON.parse httpPostMethod(uri)
	rescue
		puts "get ininfor error..."
	end

  end

  public

  def filter(event)
	begin
		if nil != event[@remote_ip] && event[@remote_ip].scan(".").length == 3 
			ipRange = getIpRange(event[@remote_ip])
			event.set("source",ipRange[0])
			event.set("sourceType",ipRange[1])
		else
			event.set("source",'other')
			event.set("sourceType",'other')
		end
	rescue
		event.set("source",'other')
		event.set("sourceType",'other')
		@tag_ip_failure.each{|tag| event.tag(tag)}
	end	
  end # def filter

#get ip range
public
def getIpRange(ip)
    for strIp in $strIpInfor
	puts strIp['ipinfor']
	ipinfor = strIp['ipinfor'].split('/')	#get ipinfor
	ipdot = ipinfor.at(0).split('.')	#split with .
	@ipmin = ipdot.at(ipdot.size-1)
	if ipinfor.size == 1
		@ipmax = @ipmin
	else
		@ipmax = ipinfor.at(1)
	end
	client_ip = ip.split('.')	#split with .
	if ipdot.at(0).to_i == client_ip.at(0).to_i && ipdot.at(1).to_i == client_ip.at(1).to_i
		if client_ip.at(2).to_i >= @ipmin.to_i && client_ip.at(2).to_i <= @ipmax.to_i
			return strIp['enname'],strIp['sourcetype']
		end
	end
    end
    return 'other','other'
end

#get http json
public
def httpPostMethod(uri)
    params = {}
    uri = URI.parse(uri)
    res = Net::HTTP.post_form(uri, params)
    return res.body
end

end # class LogStash::Filters::Test

结合上篇日志nginx logstash redis elasticsearch kibana搭建日志平台的shipper.conf使用这个插件:

input {
    file {
      type => "nginx_access_log"
      path => "/var/log/nginx/access.log"
    }
}
 
filter {

    grok {
      match => [ "message", "%{IPV4:remote_ip}, (%{IPV4:http_x_forwarded_for}|-) (%{IPV4:remote_user}|-) \[%{MONTHDAY:day}\/%{MONTH:month}\/%{YEAR}:%{HOUR:hour}:%{MINUTE:min}:%{SECOND:sec} %{ISO8601_TIMEZONE:tz}\] %{BASE16FLOAT:request_time} %{HOSTNAME:domain} \"%{WORD:httpmethod} %{URIPATH:uripath} %{URIPROTO:uriproto1}\/%{BASE16FLOAT:httpversion} %{URIPROTO:uriproto2}\" %{INT:http_status} - (%{INT:body_bytes_sent}|\-) (%{INT:bytes_sent}|\-) (%{INT:sent_http_content_length}|\-)  \"(%{INT:sent_http_content_Range}|\-)\"   \"(%{URI:http_refer}|\-)\" \"%{DATA:user_agent}\" (%{DATA:sent_http_x_cacche}|\-) (%{DATA:sent_http_content_type}|\-) (up_addr:%{URIHOST:upstream_urihost}|\-) (up_resp:%{DATA:request_time2}s|\-) (up_status:%{INT:upstream_status}|\-)" ]
    }
    date {
      match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z","dd/MMM/YYYY:HH:mm:ss ZZ","dd/MMM/YYYY:HH:mm:ss ZZZ","YYYY-MM-dd HH:mm:ss"]
      locale => "en"
    }
    if "_grokparsefailure" in [tags] { drop { } }
 
    test {
	     remote_ip => "remote_ip"
    } 
}
 
output {
  redis {
    host => '192.168.201.132'
    data_type => 'list'
    key => 'logstash'
  }
}

最后查看效果

注意问题

在自定义插件里添加字段的时候有一个坑,那就是logstash的版本问题,我自己在测试的时候使用的最新版的5.0,所以正如我自己写的那个插件那样 event.set(“source”,’other’) ,但是到了我们生产环境里就不好使了,生产使用的是1.5.3版本,所以当时还没有这个方法;然后就去找logstash自带的插件 geoip ,目录是

vendor/bundle/jruby/1.9/gems/logstash-filter-mutate-1.0.0/lib/logstash/filters

在这里面的写法是 event[key]=value ,然后问题就解决了; 所以在logstash filter插件资料很少的情况下,要有新的思路去寻找解决办法,参考自带的插件就是一个最好的办法。


Similar Posts

Comments