Springboot+kafka+elk foolproof installation

Using dokcer-compose installation

1. Install elk + kafka

Note that you need to create several files because they need to be mounted

#File to be created
D:\docker\elasticsearch\data
D:\docker\elasticsearch\logs
d:\docker\elasticsearch\plugins
d:\docker\logstash\pipeline/logstash.conf
version: '3.7'
services:
  elasticsearch:
    image: elasticsearch:7.6.2
    container_name: elasticsearch
    privileged: true
    user: root
    environment:
      #Set the cluster name to elasticsearch
      - cluster.name=elasticsearch
      #Start in single node mode
      - discovery.type=single-node
      #Set the jvm memory size used
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
    volumes:
      - d:\docker\elasticsearch\plugins:/usr/share/elasticsearch/plugins
      - d:\docker\elasticsearch\data:/usr/share/elasticsearch/data
      - d:\docker\elasticsearch\logs:/usr/share/elasticsearch/logs
    ports:
      - 9200:9200
      - 9300:9300

  logstash:
    image: logstash:7.6.2
    container_name: logstash
    ports:
       - 4560:4560
    privileged: true
    environment:
      - TZ=Asia/Shanghai
    logging:
      driver: "local"
      options:
        max-size: "10m" # Control the maximum size of a single log file
        max-file: "3" # Control the number of log files retained
    volumes:
      #Mount logstash configuration file
      - d:\docker\logstash\pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - d:\docker\logstash\logs:/var/log/logstash
    depends_on:
      -elasticsearch
    links:
      #You can use the es domain name to access the elasticsearch service
      -elasticsearch:es
    

  kibana:
    image: kibana:7.6.2
    container_name: kibana
    ports:
        - 5601:5601
    privileged: true
    links:
      #You can use the es domain name to access the elasticsearch service
      -elasticsearch:es
    depends_on:
      -elasticsearch
    environment:
      #Set the address to access elasticsearch
      -elasticsearch.hosts=http://es:9200
      #Set system language Chinese
      - I18N_LOCALE=zh-CN
    volumes:
      #Mount logstash configuration file
      - d:\docker\kibana\logs:/var/log/kibana

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
       #Do not use 127.0.0.1 here. There is a high probability that you will not be able to connect to kafka.
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.7:9092
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
  kafka-manager:
    image: sheepkiller/kafka-manager
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: "zookeeper:2181" # Change to your ZooKeeper address
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
    ports:
      - 2181:2181

   

2.springboot configuration

1. Download dependencies

 <!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--logstash integrates logback-->

<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>4.11</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<!--logback integration kafka-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.1.0</version>
<scope>runtime</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

2.congfig configuration file configures the ip address so that logback.xml can obtain the ip address


import ch.qos.logback.core.PropertyDefinerBase;
import lombok.extern.log4j.Log4j2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * Get the ip address and send it to logback
 */
@Log4j2
public class LogIpPropertyConfig extends PropertyDefinerBase {

    private static String ip;

    static {
        try {
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            log.error("Exception in obtaining log IP address",e);
            ip = null;
        }
    }
    @Override
    public String getPropertyValue() {
        return ip;
    }
}

3.logback.xml configuration

<?xml version="1.0" encoding="UTF-8"?>
<configuration>




    <property name="pattern" value="[?te{yyyy-MM-dd HH:mm:ss.SSS}] %X{logthreadId} %-5level %logger{80} %method %line - % msg%n"/>
    <property name="charsetEncoding" value="UTF-8"/>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${pattern}</pattern>
            <charset>${charsetEncoding}</charset>
        </encoder>
    </appender>

    <springProperty scope="context" name="service" source="spring.application.name" defaultValue="UnknownService"/>
    #Note: You need to use your own path here
    <define name="ip" class="pile.merchant.config.kafka.LogIpPropertyConfig"></define>

    <appender name="KAFKA_APPENDER" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
            <layout class="net.logstash.logback.layout.LogstashLayout">
                <!--If enabled, it will include hostname and other logback context information-->
                <includeContext>true</includeContext>
                <!--Whether to include the log source-->
                <includeCallerData>true</includeCallerData>
                <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/>
                <customFields>{"ip": "${ip}"}</customFields>
            </layout>
            <charset>UTF-8</charset>
        </encoder>

        <!--kafka topic needs to be consistent with the topic in the configuration file, otherwise kafka will not recognize it-->

        <topic>kafka-elk-logg</topic>

        <!--Primary key partition strategy-->

        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>

        <!--kafka message submission strategy, logback-kafka-appender provides us with two strategies,
            AsynchronousDeliveryStrategy
            Blocking Delivery Strategy (BlockingDeliveryStrategy)
        -->

        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>

        <!--bootstrap.servers is the Kafka deployment address. The server needs to use the corresponding IP address. Localhost cannot be used -->

        <producerConfig>bootstrap.servers=192.168.2.7:9092</producerConfig>

    </appender>

    <appender name="kafkaAppenderAsync" class="ch.qos.logback.classic.AsyncAppender">

        <appender-ref ref="KAFKA_APPENDER"/>

    </appender>

    <!--Record behavior logs to kafka-->

    <logger name="KafkaPipeline" level="INFO">

        <appender-ref ref="kafkaAppenderAsync"/>

    </logger>


    <!-- Basic log level -->
    <root level="INFO">
        <appender-ref ref="console"/>
        <appender-ref ref="kafkaAppenderAsync"/>
    </root>

</configuration>


4.yml file configuration

#Log configuration
logging:
  config: classpath:logback.xml

Three.logstash configuration

1. Edit the logstash.conf file

input {
   kafka {
    id => "spring_kafka_elk"
    bootstrap_servers => "kafka:9092"
    topics => ["kafka-elk-logg"]
    auto_offset_reset => "latest"
  }
}

filter {
  #Because the information received from kafka is json and needs some processing
  json {
    source => "message"
  }

   grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601}\] %{LOGLEVEL:level} %{DATA:logger_name} %{DATA:thread_name} %{NUMBER:line} - %{ GREEDYDATA:message}" }
  }

  date {
    match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS"]
    target => "@timestamp"
  }
  #Generate different index names based on different services
  ruby {
    code => "
      case event.get('service')
      when 'merchant', 'business', 'admin', 'logic', 'interconnection'
        index_prefix = event.get('service') + '-'
      else
        index_prefix = 'unknown_project-'
      end

      case event.get('level')
      when 'INFO', 'WARN', 'ERROR'
        level_suffix = event.get('level').downcase
      else
        level_suffix = 'unknown_level'
      end


      event.set('index_name', index_prefix + level_suffix + '-' + Time.now.strftime('%Y.%m.%d'))

  "
}

}


output {
  #Send to es
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => "es:9200"
    index => "%{index_name}"
    #Set template name for scheduled deletion
    template_name => "%{service}"
    #Duplicate template names will not be overwritten
    template_overwrite => false
  }
  
}

4. Configure es template

1. Configure es template

#Create template
# admin bussiness merchant nettyLogic interconnection
PUT_template/merchant
{
  "index_patterns": ["merchant*"],
  "settings": {
    "number_of_replicas": 0,
    "index.lifecycle.name": "test_policy"
  }
}

2. Set life cycle

# admin bussiness merchant nettyLogic interconnection
PUT _ilm/policy/test_policy
{
  "policy": {
    "phases": {
      "delete": {
        "min_age": "1m",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}