🚀 DevOps & SRE Certification Program 📅 Starting: 1st of Every Month 🤝 +91 8409492687 🔍 Contact@DevOpsSchool.com

Upgrade & Secure Your Future with DevOps, SRE, DevSecOps, MLOps!

We spend hours on Instagram and YouTube and waste money on coffee and fast food, but won’t spend 30 minutes a day learning skills to boost our careers.
Master in DevOps, SRE, DevSecOps & MLOps!

Learn from Guru Rajesh Kumar and double your salary in just one year.


Get Started Now!

Example of Elastic Logstash pipeline input, filter and output

The Grok plugin is one of the more cooler plugins. It enables you to parse unstructured log data into something structured and queryable. Grok is looking for patterns in the data it’s receiving, so we have to configure it to identify the patterns that interest us. Grok comes with some built in patterns. The pattern we are using in this case is %{COMBINEDAPACHELOG}which can be used when Logstash is receiving log data from Apache HTTP.

input {
    beats {
        port => "5044"
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    }
    stdout { 
        codec => rubydebug 
      }
}
Deals with syslog line input and listens to port 5044.
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{SYSLOGLINE}"}
}
}
output {
stdout { codec => rubydebug }
}
Short Example of Logstash Multiple Pipelines
http://shinaisan.github.io/2018/08/25/short-example-of-logstash-multiple-pipelines.html
https://gist.github.com/shinaisan/78f3a3ad1ab50cab1d3ff32983454987
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
}
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
Example of Elastic Logstash pipeline input, filter and output
==============================================
Example 1: File → Logstash → Elasticsearch
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
}
==============================================
Example 2: Filebeat → Logstash → Kafka
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
}
output {
kafka {
bootstrap_servers => "localhost"
codec => plain {
format => "%{message}"
}
topic_id => "apache"
}
}
==============================================
Example 3: Beats → Logstash → Logz.io (TCP)
input {
beats {
port => "5044"
}
type => apache_access
}
filter {
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" }
}
tcp {
host => "listener.logz.io"
port => 5050
codec => json_lines
}
==============================================
Example 4: Beats → Logstash → Logz.io (SSL)
input {
beats {
port => "5044"
}
type => apache_access
}
filter {
add_field => { "token" => "aaWTINmMspBUetRoGUrxEApzQkkoMWMn" }
}
output {
lumberjack {
host => "listener.logz.io"
port => 5006
ssl_certificate => "/usr/share/logstash/keys/TrustExternalCARoot.crt"
codec => json_lines
}
==============================================
==============================================
input {
file {
path => ["/home/logstash/testdata.log"]
sincedb_path => "/dev/null"
start_position => "beginning"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
==============================================
#output
file {
codec => line { format => "%{field1},%{field2}"}
path => "/path/to/data_export.csv"
}
==============================================
filebeat.inputs:
- type: log
paths:
- /path/to/file/logstash-tutorial.log
output.logstash:
hosts: ["localhost:5044"]
==============================================
input {
beats {
port => "5044"
}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
stdout { codec => rubydebug }
}
==============================================
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
==============================================
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
}
output {
stdout { codec => rubydebug }
}
==============================================
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
stdout { codec => rubydebug }
}
==============================================
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
==============================================
input {
file {
path => ["/home/logstash/testdata.log"]
sincedb_path => "/dev/null"
start_position => "beginning"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
==============================================
filter {
dissect {
mapping => {
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}"
}
remove_field => ["message"]
}
}
==============================================
filter {
grok {
match => {
"message" => "%{NUMBER:timestamp}%{SPACE}%{GREEDYDATA:rest}"
}
}
}
==============================================
filter {
grok {
match => {
"message" => "%{NUMBER:timestamp}%{SPACE}%{NUMBER:duration}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code}\s%{NUMBER:bytes}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}"
}
remove_field => ["message"]
}
}
==============================================
mutate {
convert => {
"bytes" => "integer"
"duration" => "integer"
"status_code" => "integer"
"timestamp" => "float"
}
}
==============================================
filter {
dissect {
mapping => {
"message" => "%{timestamp->} %{duration} %{client_address} %{cache_result}/%{status_code} %{bytes} %{request_method} %{url} %{user} %{hierarchy_code}/%{server} %{content_type}"
}
remove_field => ["message"]
convert_datatype => {
"bytes" => "int"
"duration" => "int"
"status_code" => "int"
"timestamp" => "float"
}
}
}
==============================================
filter {
grok {
match => {
"message" => "%{NUMBER:timestamp:float}%{SPACE}%{NUMBER:duration:int}\s%{IP:client_address}\s%{WORD:cache_result}/%{POSINT:status_code:int}\s%{NUMBER:bytes:int}\s%{WORD:request_method}\s%{NOTSPACE:url}\s%{NOTSPACE:user}\s%{WORD:hierarchy_code}/%{NOTSPACE:server}\s%{NOTSPACE:content_type}"
}
remove_field => ["message"]
}
}
==============================================
# config/pipelines.yml
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [type] == "apache" {
pipeline { send_to => weblogs }
} else if [type] == "system" {
pipeline { send_to => syslog }
} else {
pipeline { send_to => fallback }
}
}
- pipeline.id: weblog-processing
config.string: |
input { pipeline { address => weblogs } }
filter {
# Weblog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_a_host] }
}
- pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
# Syslog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_b_host] }
}
- pipeline.id: fallback-processing
config.string: |
input { pipeline { address => fallback } }
output { elasticsearch { hosts => [es_cluster_b_host] } }
==============================================
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => es } }
output { elasticsearch { } }
- pipeline.id: buffered-http
queue.type: persisted
config.string: |
input { pipeline { address => http } }
output { http { } }
==============================================
# config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => "internal-es" } }
# Index the full event
output { elasticsearch { } }
- pipeline.id: partner
queue.type: persisted
config.string: |
input { pipeline { address => "partner-s3" } }
filter {
# Remove the sensitive data
mutate { remove_field => 'sensitive-data' }
}
output { s3 { } } # Output to partner's bucket
==============================================
# config/pipelines.yml
- pipeline.id: beats
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
config.string: |
input { kafka { ... } }
output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
# This common pipeline enforces the same logic whether data comes from Kafka or Beats
config.string: |
input { pipeline { address => commonOut } }
filter {
# Always remove sensitive data from all input sources
mutate { remove_field => 'sensitive-data' }
}
output { elasticsearch { } }
==============================================
Reference
https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html
https://www.elastic.co/blog/a-practical-introduction-to-logstash
https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html

Certification Courses

DevOpsSchool has introduced a series of professional certification courses designed to enhance your skills and expertise in cutting-edge technologies and methodologies. Whether you are aiming to excel in development, security, or operations, these certifications provide a comprehensive learning experience. Explore the following programs:

DevOps Certification, SRE Certification, and DevSecOps Certification by DevOpsSchool

Explore our DevOps Certification, SRE Certification, and DevSecOps Certification programs at DevOpsSchool. Gain the expertise needed to excel in your career with hands-on training and globally recognized certifications.