EFLK and logstash filtering

Table of Contents

1. How Filebeat works:

2. Why use Filebeat:

3. The difference between Filebeat and Logstash:

4. Logstash filter plug-in:

5. Filebeat + ELK deployment:

1. Install filebeat:

2. Set the main configuration file of filebeat:

3. Start filebeat:

4. Create a new Logstash configuration file:

6. Use of grok:

1. Built-in regular expression call:

2. Custom expression call:

2.1 Format:

7. multiline multi-line merge plug-in:

1. Install the multiline plug-in:

2. Use the multiline plug-in:

8. mutate data modification plug-in:

1.Commonly used configuration options for Mutate filters:

2. Example:

2.1 Rename the field old_field to new_field:

2.2 Add fields:

2.3 Delete the field:

2.4 Convert field type:

2.5 Replacement characters:

9. Date time processing plug-in:

1. Detailed explanation of timestamp:

2.Case:


1. Working principle of Filebeat:

Filebeat can maintain the status of each file and frequently update the file status from the registry to disk. The file status mentioned here is used to record the position read when Harvster read the file last time, to ensure that all log data can be read out and then sent to output. If at a certain moment, ElasticSearch or Logstash as the output becomes unavailable, Filebeat will save the last file reading position until the output is available again, and quickly resume reading the file data. During the running of Filebaet, the status of each Prospector
All information is stored in memory. If Filebeat is restarted, after the restart is completed, the state information before the restart will be restored from the registry file, allowing FIlebeat to continue reading data from the previously known location.

2. Why use Filebeat:

Because logstash is run by jvm, it consumes a lot of resources. Starting a logstash requires about 500M of memory (this is why logstash starts very slowly), while filebeat only requires about 10M of memory resources. Among the commonly used ELK log collection solutions, most of the methods are to send the log content of all nodes to logstash through filebeat, and logstash filters according to the configuration file. Then transfer the filtered files to elasticsearch and display them through kibana.

3. The difference between Filebeat and Logstash:

Memory Large Small
CPU Big Small
Plugins Many Many
Function Collect and analyze and convert data in real time from various input terminals and output to various output terminals Transmission
Filtering ability Powerful filtering ability Filtering is possible but weak
Severity Relatively heavy Lightweight binary file
Process A server only allows one logstash process, which will hang. You need to start it manually afterward
Cluster Single node Single node

The logs collected by filebeat are uniformly filtered by logstash, then stored on es, and finally displayed graphically through kibana web

4. Logstash filter plug-in:

  • grok Regular capture plug-in
  • multiline Multiple line merge plug-in
  • mutate data modification plug-in
  • date time processing plug-in

5. Filebeat + ELK deployment:

node1 node (2C/4G): node1/192.168.88.101 Elasticsearch
node2 node (2C/4G): node2/192.168.80.103 Elasticsearch
Apache node: apache/192.168.88.100 Logstash Kibana Apache
Filebeat node: filebeat/192.168.88.104 Filebeat

1. Install filebeat:

#Upload the software package filebeat-6.7.2-linux-x86_64.tar.gz to the /opt directory
tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64/ /usr/local/filebeat

2. Set the main configuration file of filebeat:

cd /usr/local/filebeat

vim filebeat.yml
filebeat.inputs:
- type: log #Specify the log type and read messages from the log file
  enabled: true
  paths:
    -/var/log/nginx/access.log #Specify the monitored log file
  tags: ["sys"] #Set index tags
fields: #You can use the fields configuration option to set some parameter fields to add to the output.
    service_name: filebeat
    log_type: syslog
    from: 192.168.88.104

---------------Elasticsearch output------------------
(Comment out all)

----------------Logstash output---------------------
output.logstash:
  hosts: ["192.168.88.100:5044"] #Specify the IP and port of logstash

3. Start filebeat:

nohup ./filebeat -e -c filebeat.yml > filebeat.out & amp;
#-e: Output to standard output, disable syslog/file output
#-c: Specify configuration file
#nohup: Run commands in the background of the system without hanging up. Exiting the terminal will not affect the running of the program

4. Create a new Logstash configuration file:

input {
    beats {
        port => "5044"
    }
}

output {
    elasticsearch {
        hosts => ["192.168.88.101:9200","192.168.88.103:9200"]
        index => "nginx_access.log-%{ + YYYY.MM.dd}"
    }
}

logstash -f filebeat.conf ##Start

Visit http://192.168.88.100:5601 with the browser. Log in to Kibana, click the “Create Index Pattern” button to add the index “nginx_access*”, click the “create” button to create, and click the “Discover” button to view chart information and logs. information.

6. grok Use:

Use text fragments to segment log events

1. Built-in regular expression call:

%{SYNTAX:SEMANTIC}

●SYNTAX represents the type of matching value. For example, 0.11 can be matched by NUMBER type, and 10.222.22.25 can be matched by IP.

●SEMANTIC represents a variable declaration that stores the value. It will be stored in elastic search to facilitate field search and statistics by kibana. You can define an IP as the client IP address client_ip_address, such as %{IP:client_ip_address}, which matches The value will be stored in the client_ip_address field, which is similar to the column name of the database. You can also store the numbers in the event log as a numeric type in a specified variable, such as the response time http_response_time. Assume that the event log record is as follows:

message: 192.168.80.10 GET /index.html 15824 0.043

You can use the following grok pattern to match this kind of record
%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:http_response_time}
Create a filter conf file under the logstash conf.d folder with the following content
# /etc/logstash/conf.d/01-filter.conf
filter {
  grok {
    match => { "message" => "%{IP:client_id_address} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:http_response_time}" }
  }
}

The following are the filter results:

client_id_address: 192.168.80.10
method: GET
request: /index.html
bytes: 15824
http_response_time: 0.043

Logstash officially also provides some commonly used constants to express those regular expressions. You can go to this Github address to view the commonly used constants:
https://github.com/logstash-plugins/logstash-patterns-core/blob/main/patterns/ecs-v1/grok-patterns

2. Custom expression call:

2.1 format:

(?<field_name>pattern)

message: 192.168.80.10 GET /index.html 15824 0.043

Use regular matching and custom expressions to split the above log file:

(?<remote_addr>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) (?<http_method>[A-Z ] + ) (?<request_uri>/.*) (?<response_bytes>[0-9] + ) (?<response_time>[0-9\.] + )

filter {
  grok {
    match => { "message" => "(?<remote_addr>%{IP}) (?<http_method>[A-Z] + ) (?<request_uri>/.*) (?<response_bytes>[0-9] + ) (?<response_time>[0-9\.] + )"}
  }
}

Seven. multiline multi-line merge plug-in:

Java error logs generally have many lines in one log, and stack information will be printed out. After being parsed by logstash, each line will be stored in ES as a record, so this situation definitely needs to be dealt with. Here you need to use the multiline plug-in to splice records belonging to the same log.

2022-11-11 17:09:19.774[XNIo-1 task-1]ERROR com.passjava.controlle .NembercController-Failed to query user activity data, the exception information is:
com.passjava.exception.MemberException: No active rules are currently configured
at com.passjava.service.impL.queryAdmin(DailyServiceImpl.java:1444)
at com.passjava.service.impl.dailyserviceImpL$$FastcLass
2022-11-11 17:10:56.256][KxNIo-1 task-1] ERROR com.passjava.controlle .NemberControl1er-Failed to query employee meal activity data, the exception information is:
com.passjava.exception.MemberException: No active rules are currently configured
at com.passjava.service.impL.queryAdmin(DailyServiceImpl.java:1444)
at com.passjava.service.impL.daiLyserviceImpL$$FastcLass

1. Install multiline plug-in:

Install plug-ins online

cd /usr/share/logstash
bin/logstash-plugin install logstash-filter-multiline

Install plug-ins offline

First install the plug-in online on a machine with Internet access, then package it, copy it to the server, and execute the installation command
bin/logstash-plugin prepare-offline-pack --overwrite --output logstash-filter-multiline.zip logstash-filter-multiline

bin/logstash-plugin install file:///usr/share/logstash/logstash-filter-multiline.zip

Check whether the plug-in is installed successfully. You can execute the following command to view the plug-in list.
bin/logstash-plugin list

2. Use multiline plug-in:

Step 1: The first line of each log starts with a time. You can use the regular expression of time to match the first line.
Step 2: Then merge the logs of each subsequent line with the first line.
Step 3: When you encounter a time when the beginning of a certain line can match the regular expression, stop merging the first log and start merging the second log.
Step 4: Repeat steps 2 and 3.

filter {
  multiline {
    pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2 }.\d{3}"
    negate => true
    what => "previous"
  }
}

●pattern: an expression used to match text, or a grok expression

●what: If pattern matching is successful, then the matching row belongs to the previous event or the next event. previous: Belongs to the previous event and merges upward. next: Belongs to the next event and merges downwards

●negate: Whether to negate the result of pattern. false: Not negated, the default value. true: negate. Invert the line matching logic during multi-line event scanning (if pattern matching fails, the current line is considered to be a component of the multi-line event)

8. Mutate data modification plug-in:

It provides rich basic type data processing capabilities. Fields in events can be renamed, deleted, replaced and modified.

1.Commonly used configuration options for Mutate filters:

add_field adds a new field to the event, or multiple fields can be added
remove_field removes any field from the event
add_tag adds any tag to the event and adds a custom content in the tag field. When there is more than one content in the tag field, it will become an array.
remove_tag removes the tag from the event if it exists
convert Convert a field value to another data type
id adds a unique ID to the live event
lowercase converts a string field to its lowercase form
replace replaces a field with a new value
strip removes leading and trailing spaces
uppercase converts a string field to its uppercase equivalent
update updates an existing field with a new value
rename renames the field in the event
gsub replaces matching values in fields with regular expressions
merge merges arrays or hash events
split splits the string in the field into an array by specifying the delimiter

2.Example:

2.1 Rename the field old_field to new_field:

filter {
    mutate {
#Writing method 1, use square brackets to enclose
        rename => ["old_field" => "new_field"]

        #Writing method 2, use curly brackets {} to enclose
rename => { "old_field" => "new_field" }
    }
}

2.2 Add fields:

filter {
    mutate {
        add_field => {
        "f1" => "field1"
        "f2" => "field2"
        }
    }
}

2.3 Delete the field:

filter {
    mutate {
        remove_field => ["message", "@version", "tags"]
    }
}

2.4 Convert field type:

Convert the filedName1 field data type to string type and the filedName2 field data type to float type

filter {
    mutate {
        #Writing method 1, use square brackets to enclose
        convert => ["filedName1", "string"]
\t\t
        #Writing method 2, use curly brackets {} to enclose
convert => { "filedName2" => "float" }
    }
}

2.5 Replacement character:

filter {
    mutate {
        gsub => ["filedName", "/" , "_"]
    }
}

9. Date time processing plug-in:

Used to parse the date in a field and then use that date or timestamp as the logstash timestamp of the event.

When Logstash generates an Event object, it will set a time for the Event with the field “@timestamp”. At the same time, our log content generally also has time, but the two times are different because the log content The time is the time when the log is printed, and the time in the “@timestamp” field is the time when the input plug-in receives a piece of data and creates an Event. Generally speaking, the time of “@timestamp” is later than the time of the log content. , because Logstash monitors data changes, data input, and time delays caused by event creation. Both times can be used, depending on your needs.

Used with gork

filter {
    date {
        match => ["access_time", "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm: ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
    }
}

●match: used to configure specific matching content rules. The first half of the content indicates the name that matches the timestamp in the actual log, and the second half is used to match the timestamp format in the actual log. This place is the core content of the entire configuration. , if the rule matching here is invalid, the generated log timestamp will be replaced by the time read by the input plugin.
If the time format matching fails, a tags field will be generated with the field value _dateparsefailure. You need to recheck whether the above match configuration parsing is correct.

●target: Store matching timestamps into the given target field. If not provided, the @timestamp field of the update event is defaulted.

●timezone: When the date to be configured does not contain time zone information and is not UTC time, the timezone parameter needs to be set.

1. Detailed explanation of timestamp:

●Year
yyyy #year-round number. For example: 2015.
yy #Two-digit year. For example: 15 in 2015.

●Month
M #minimum numeric month. For example: 1 for January and 12 for December.
MM #two-digit month. Fill with zeros if necessary. For example: 01 for January and 12 for December
MMM #Shortened month text. For example: Jan for January. Note: The language used depends on your locale. See Regional Settings to learn how to change the language.
MMMM #Full month text, for example: January. Note: The language used depends on your locale.

●Day
d #Minimal number of days. For example: the first day of January 1.
dd #Two-digit day, you can fill in zeros if necessary. For example: 01 for the 1st of the month.

●When
H # Minimum numeric hour. For example: 0 means midnight.
HH # Two digit hours, fill in zero if necessary. For example: midnight 00.

●Min.
m #The smallest numeric minute. For example: 0.
mm # Two-digit minutes, fill in zeros if necessary. For example: 00.

●Seconds
s #minimum numeric seconds. For example: 0.
ss #Two digits, fill in zeros if necessary. For example: 00.

●Milliseconds (The maximum precision for fractional seconds is milliseconds (SSS). Otherwise, zero is appended.)
S # Tenth of a second. For example: 0 is the sub-second value 012
SS #Hundredth of a second For example: 01 is the sub-second value 01
SSS #One thousandth of a second For example: 012 is the sub-second value 012

●Time zone offset or identity
Z # Time zone offset, structured as HHmm (hour and minute offset from Zulu/UTC). For example: -0700.
ZZ #The time zone offset structure is HH:mm (the colon between the hour offset and the minute offset). For example: -07:00.
ZZZ #TimeZoneIdentity. For example: America/Los_Angeles. NOTE: Valid IDs are listed in the list http://joda-time.sourceforge.net/timezones.html

2. Case:

192.168.80.10 – – [07/Feb/2022:16:24:19 + 0800] “GET /HTTP/1.1” 403 5039

Now we want to convert the time, then we need to write “dd/MMM/yyyy:HH:mm:ss Z”
You find that there are three M’s in the middle. It won’t work if you write out two, because the two capital M’s represent the two-digit month. However, in the text we want to parse, the month is in abbreviated English, so we can only go Find three M’s. And why do we need to add a capital letter Z at the end? Because the text to be parsed contains the “+0800” time zone offset, so we have to add it, otherwise the filter will not be able to parse the text data correctly, and the timestamp conversion will fail.

filter{
      grok{
           match => {"message" => ".* -\ -\ \[%{HTTPDATE:timestamp}\]"}
      }
      date{
           match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

operation result:
{
          "host" => "localhost",
     "timestamp" => "07/Feb/2022:16:24:19 + 0800",
    "@timestamp" => 2022-02-07T08:24:19.000Z,
       "message" => "192.168.80.10 - - [07/Feb/2022:16:24:19 + 0800] "GET /HTTP/1.1" 403 5039",
      "@version" => "1"
}

In the rubydebug encoding format output above, although the @timestamp field has obtained the time of the timestamp field, it is still 8 hours later than Beijing time. This is because within Elasticsearch, UTC is uniformly used for time type fields. Time, and logs are stored uniformly in UTC time, which is a consensus in the international security and operation and maintenance circles. In fact, this does not affect anything, because ELK has already provided a solution, that is, on the Kibana platform, the program will automatically read the current time zone of the browser, and then automatically convert the UTC time to the current time zone on the web page.