
The Grok plugin is one of the more cooler plugins. It enables you to parse unstructured log data into something structured and queryable. Grok is looking for patterns in the data it’s receiving, so we have to configure it to identify the patterns that interest us. Grok comes with some built in patterns. The pattern we are using in this case is %{COMBINEDAPACHELOG}which can be used when Logstash is receiving log data from Apache HTTP.
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
Deals with syslog line input and listens to port 5044. | |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{SYSLOGLINE}"} | |
} | |
} | |
output { | |
stdout { codec => rubydebug } | |
} |
Short Example of Logstash Multiple Pipelines | |
http://shinaisan.github.io/2018/08/25/short-example-of-logstash-multiple-pipelines.html | |
https://gist.github.com/shinaisan/78f3a3ad1ab50cab1d3ff32983454987 |
input { | |
file { | |
path => "/var/log/apache2/access.log" | |
start_position => "beginning" | |
sincedb_path => "/dev/null" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}" } | |
} | |
date { | |
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] | |
} | |
geoip { | |
source => "clientip" | |
} | |
} | |
output { | |
elasticsearch { | |
hosts => ["localhost:9200"] | |
} | |
} |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}"} | |
} | |
geoip { | |
source => "clientip" | |
} | |
} | |
output { | |
elasticsearch { | |
hosts => [ "localhost:9200" ] | |
} | |
} |
Example of Elastic Logstash pipeline input, filter and output | |
============================================== | |
Example 1: File → Logstash → Elasticsearch | |
input { | |
file { | |
path => "/var/log/apache2/access.log" | |
start_position => "beginning" | |
sincedb_path => "/dev/null" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}" } | |
} | |
date { | |
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] | |
} | |
geoip { | |
source => "clientip" | |
} | |
} | |
output { | |
elasticsearch { | |
hosts => ["localhost:9200"] | |
} | |
} | |
============================================== | |
Example 2: Filebeat → Logstash → Kafka | |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}" } | |
} | |
date { | |
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ] | |
} | |
geoip { | |
source => "clientip" | |
} | |
} | |
output { | |
kafka { | |
bootstrap_servers => "localhost" | |
codec => plain { | |
format => "%{message}" | |
} | |
topic_id => "apache" | |
} | |
} | |
============================================== | |
Example 3: Beats → Logstash → Logz.io (TCP) | |
input { | |
beats { | |
port => "5044" | |
} | |
type => apache_access | |
} | |
filter { | |
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" } | |
} | |
tcp { | |
host => "listener.logz.io" | |
port => 5050 | |
codec => json_lines | |
} | |
============================================== | |
Example 4: Beats → Logstash → Logz.io (SSL) | |
input { | |
beats { | |
port => "5044" | |
} | |
type => apache_access | |
} | |
filter { | |
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" } | |
} | |
output { | |
lumberjack { | |
host => "listener.logz.io" | |
port => 5006 | |
ssl_certificate => "/usr/share/logstash/keys/TrustExternalCARoot.crt" | |
codec => json_lines | |
} | |
============================================== | |
============================================== | |
input { | |
file { | |
path => ["/home/logstash/testdata.log"] | |
sincedb_path => "/dev/null" | |
start_position => "beginning" | |
} | |
} | |
filter { | |
} | |
output { | |
stdout { | |
codec => rubydebug | |
} | |
} | |
============================================== | |
#output | |
file { | |
codec => line { format => "%{field1},%{field2}"} | |
path => "/path/to/data_export.csv" | |
} | |
============================================== | |
filebeat.inputs: | |
- type: log | |
paths: | |
- /path/to/file/logstash-tutorial.log | |
output.logstash: | |
hosts: ["localhost:5044"] | |
============================================== | |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
# The filter part of this file is commented out to indicate that it is | |
# optional. | |
# filter { | |
# | |
# } | |
output { | |
stdout { codec => rubydebug } | |
} | |
============================================== | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}"} | |
} | |
} | |
============================================== | |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}"} | |
} | |
} | |
output { | |
stdout { codec => rubydebug } | |
} | |
============================================== | |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}"} | |
} | |
geoip { | |
source => "clientip" | |
} | |
} | |
output { | |
stdout { codec => rubydebug } | |
} | |
============================================== | |
input { | |
beats { | |
port => "5044" | |
} | |
} | |
filter { | |
grok { | |
match => { "message" => "%{COMBINEDAPACHELOG}"} | |
} | |
geoip { | |
source => "clientip" | |
} | |
} | |
output { | |
elasticsearch { | |
hosts => [ "localhost:9200" ] | |
} | |
} | |
============================================== | |
input { | |
file { | |
path => ["/home/logstash/testdata.log"] | |
sincedb_path => "/dev/null" | |
start_position => "beginning" | |
} | |
} | |
filter { | |
} | |
output { | |
stdout { | |
codec => rubydebug | |
} | |
} | |
============================================== | |
filter { | |
dissect { | |
mapping => { | |
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}" | |
} | |
remove_field => ["message"] | |
} | |
} | |
============================================== | |
filter { | |
grok { | |
match => { | |
"message" => "%{NUMBER:timestamp}%{SPACE}%{GREEDYDATA:rest}" | |
} | |
} | |
} | |
============================================== | |
filter { | |
grok { | |
match => { | |
"message" => "%{NUMBER:timestamp}%{SPACE}%{NUMBER:duration}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code}\s%{NUMBER:bytes}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}" | |
} | |
remove_field => ["message"] | |
} | |
} | |
============================================== | |
mutate { | |
convert => { | |
"bytes" => "integer" | |
"duration" => "integer" | |
"status_code" => "integer" | |
"timestamp" => "float" | |
} | |
} | |
============================================== | |
filter { | |
dissect { | |
mapping => { | |
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}" | |
} | |
remove_field => ["message"] | |
convert_datatype => { | |
"bytes" => "int" | |
"duration" => "int" | |
"status_code" => "int" | |
"timestamp" => "float" | |
} | |
} | |
} | |
============================================== | |
filter { | |
grok { | |
match => { | |
"message" => "%{NUMBER:timestamp:float}%{SPACE}%{NUMBER:duration:int}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code:int}\s%{NUMBER:bytes:int}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}" | |
} | |
remove_field => ["message"] | |
} | |
} | |
============================================== | |
# config/pipelines.yml | |
- pipeline.id: beats-server | |
config.string: | | |
input { beats { port => 5044 } } | |
output { | |
if [type] == "apache" { | |
pipeline { send_to => weblogs } | |
} else if [type] == "system" { | |
pipeline { send_to => syslog } | |
} else { | |
pipeline { send_to => fallback } | |
} | |
} | |
- pipeline.id: weblog-processing | |
config.string: | | |
input { pipeline { address => weblogs } } | |
filter { | |
# Weblog filter statements here... | |
} | |
output { | |
elasticsearch { hosts => [es_cluster_a_host] } | |
} | |
- pipeline.id: syslog-processing | |
config.string: | | |
input { pipeline { address => syslog } } | |
filter { | |
# Syslog filter statements here... | |
} | |
output { | |
elasticsearch { hosts => [es_cluster_b_host] } | |
} | |
- pipeline.id: fallback-processing | |
config.string: | | |
input { pipeline { address => fallback } } | |
output { elasticsearch { hosts => [es_cluster_b_host] } } | |
============================================== | |
# config/pipelines.yml | |
- pipeline.id: intake | |
queue.type: persisted | |
config.string: | | |
input { beats { port => 5044 } } | |
output { pipeline { send_to => [es, http] } } | |
- pipeline.id: buffered-es | |
queue.type: persisted | |
config.string: | | |
input { pipeline { address => es } } | |
output { elasticsearch { } } | |
- pipeline.id: buffered-http | |
queue.type: persisted | |
config.string: | | |
input { pipeline { address => http } } | |
output { http { } } | |
============================================== | |
# config/pipelines.yml | |
- pipeline.id: intake | |
queue.type: persisted | |
config.string: | | |
input { beats { port => 5044 } } | |
output { pipeline { send_to => ["internal-es", "partner-s3"] } } | |
- pipeline.id: buffered-es | |
queue.type: persisted | |
config.string: | | |
input { pipeline { address => "internal-es" } } | |
# Index the full event | |
output { elasticsearch { } } | |
- pipeline.id: partner | |
queue.type: persisted | |
config.string: | | |
input { pipeline { address => "partner-s3" } } | |
filter { | |
# Remove the sensitive data | |
mutate { remove_field => 'sensitive-data' } | |
} | |
output { s3 { } } # Output to partner's bucket | |
============================================== | |
# config/pipelines.yml | |
- pipeline.id: beats | |
config.string: | | |
input { beats { port => 5044 } } | |
output { pipeline { send_to => [commonOut] } } | |
- pipeline.id: kafka | |
config.string: | | |
input { kafka { ... } } | |
output { pipeline { send_to => [commonOut] } } | |
- pipeline.id: partner | |
# This common pipeline enforces the same logic whether data comes from Kafka or Beats | |
config.string: | | |
input { pipeline { address => commonOut } } | |
filter { | |
# Always remove sensitive data from all input sources | |
mutate { remove_field => 'sensitive-data' } | |
} | |
output { elasticsearch { } } | |
============================================== | |
Reference | |
https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html | |
https://www.elastic.co/blog/a-practical-introduction-to-logstash | |
https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html |
I’m a DevOps/SRE/DevSecOps/Cloud Expert passionate about sharing knowledge and experiences. I am working at Cotocus. I blog tech insights at DevOps School, travel stories at Holiday Landmark, stock market tips at Stocks Mantra, health and fitness guidance at My Medic Plus, product reviews at I reviewed , and SEO strategies at Wizbrand.
Do you want to learn Quantum Computing?
Please find my social handles as below;
Rajesh Kumar Personal Website
Rajesh Kumar at YOUTUBE
Rajesh Kumar at INSTAGRAM
Rajesh Kumar at X
Rajesh Kumar at FACEBOOK
Rajesh Kumar at LINKEDIN
Rajesh Kumar at PINTEREST
Rajesh Kumar at QUORA
Rajesh Kumar at WIZBRAND