Using dokcer-compose installation
1. Install elk + kafka
Note that you need to create several files because they need to be mounted
#File to be created D:\docker\elasticsearch\data D:\docker\elasticsearch\logs d:\docker\elasticsearch\plugins d:\docker\logstash\pipeline/logstash.conf
version: '3.7' services: elasticsearch: image: elasticsearch:7.6.2 container_name: elasticsearch privileged: true user: root environment: #Set the cluster name to elasticsearch - cluster.name=elasticsearch #Start in single node mode - discovery.type=single-node #Set the jvm memory size used - ES_JAVA_OPTS=-Xms1g -Xmx1g volumes: - d:\docker\elasticsearch\plugins:/usr/share/elasticsearch/plugins - d:\docker\elasticsearch\data:/usr/share/elasticsearch/data - d:\docker\elasticsearch\logs:/usr/share/elasticsearch/logs ports: - 9200:9200 - 9300:9300 logstash: image: logstash:7.6.2 container_name: logstash ports: - 4560:4560 privileged: true environment: - TZ=Asia/Shanghai logging: driver: "local" options: max-size: "10m" # Control the maximum size of a single log file max-file: "3" # Control the number of log files retained volumes: #Mount logstash configuration file - d:\docker\logstash\pipeline/logstash.conf:/usr/share/logstash/pipeline/logstash.conf - d:\docker\logstash\logs:/var/log/logstash depends_on: -elasticsearch links: #You can use the es domain name to access the elasticsearch service -elasticsearch:es kibana: image: kibana:7.6.2 container_name: kibana ports: - 5601:5601 privileged: true links: #You can use the es domain name to access the elasticsearch service -elasticsearch:es depends_on: -elasticsearch environment: #Set the address to access elasticsearch -elasticsearch.hosts=http://es:9200 #Set system language Chinese - I18N_LOCALE=zh-CN volumes: #Mount logstash configuration file - d:\docker\kibana\logs:/var/log/kibana kafka: image: confluentinc/cp-kafka:latest container_name: kafka environment: - KAFKA_BROKER_ID=1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 #Do not use 127.0.0.1 here. There is a high probability that you will not be able to connect to kafka. - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.2.7:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 ports: - 9092:9092 depends_on: - zookeeper kafka-manager: image: sheepkiller/kafka-manager ports: - "9000:9000" environment: ZK_HOSTS: "zookeeper:2181" # Change to your ZooKeeper address depends_on: - zookeeper zookeeper: image: confluentinc/cp-zookeeper:latest container_name: zookeeper environment: - ZOOKEEPER_CLIENT_PORT=2181 ports: - 2181:2181
2.springboot configuration
1. Download dependencies
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--logstash integrates logback--> <dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder</artifactId> <version>4.11</version> <exclusions> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> </exclusion> </exclusions> </dependency> <!--logback integration kafka--> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.1.0</version> <scope>runtime</scope> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
2.congfig configuration file configures the ip address so that logback.xml can obtain the ip address
import ch.qos.logback.core.PropertyDefinerBase; import lombok.extern.log4j.Log4j2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Lazy; import java.net.InetAddress; import java.net.UnknownHostException; /** * Get the ip address and send it to logback */ @Log4j2 public class LogIpPropertyConfig extends PropertyDefinerBase { private static String ip; static { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { log.error("Exception in obtaining log IP address",e); ip = null; } } @Override public String getPropertyValue() { return ip; } }
3.logback.xml configuration
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="pattern" value="[?te{yyyy-MM-dd HH:mm:ss.SSS}] %X{logthreadId} %-5level %logger{80} %method %line - % msg%n"/> <property name="charsetEncoding" value="UTF-8"/> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${pattern}</pattern> <charset>${charsetEncoding}</charset> </encoder> </appender> <springProperty scope="context" name="service" source="spring.application.name" defaultValue="UnknownService"/> #Note: You need to use your own path here <define name="ip" class="pile.merchant.config.kafka.LogIpPropertyConfig"></define> <appender name="KAFKA_APPENDER" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder"> <layout class="net.logstash.logback.layout.LogstashLayout"> <!--If enabled, it will include hostname and other logback context information--> <includeContext>true</includeContext> <!--Whether to include the log source--> <includeCallerData>true</includeCallerData> <fieldNames class="net.logstash.logback.fieldnames.ShortenedFieldNames"/> <customFields>{"ip": "${ip}"}</customFields> </layout> <charset>UTF-8</charset> </encoder> <!--kafka topic needs to be consistent with the topic in the configuration file, otherwise kafka will not recognize it--> <topic>kafka-elk-logg</topic> <!--Primary key partition strategy--> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/> <!--kafka message submission strategy, logback-kafka-appender provides us with two strategies, AsynchronousDeliveryStrategy Blocking Delivery Strategy (BlockingDeliveryStrategy) --> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/> <!--bootstrap.servers is the Kafka deployment address. The server needs to use the corresponding IP address. Localhost cannot be used --> <producerConfig>bootstrap.servers=192.168.2.7:9092</producerConfig> </appender> <appender name="kafkaAppenderAsync" class="ch.qos.logback.classic.AsyncAppender"> <appender-ref ref="KAFKA_APPENDER"/> </appender> <!--Record behavior logs to kafka--> <logger name="KafkaPipeline" level="INFO"> <appender-ref ref="kafkaAppenderAsync"/> </logger> <!-- Basic log level --> <root level="INFO"> <appender-ref ref="console"/> <appender-ref ref="kafkaAppenderAsync"/> </root> </configuration>
4.yml file configuration
#Log configuration logging: config: classpath:logback.xml
Three.logstash configuration
1. Edit the logstash.conf file
input { kafka { id => "spring_kafka_elk" bootstrap_servers => "kafka:9092" topics => ["kafka-elk-logg"] auto_offset_reset => "latest" } } filter { #Because the information received from kafka is json and needs some processing json { source => "message" } grok { match => { "message" => "\[%{TIMESTAMP_ISO8601}\] %{LOGLEVEL:level} %{DATA:logger_name} %{DATA:thread_name} %{NUMBER:line} - %{ GREEDYDATA:message}" } } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" } #Generate different index names based on different services ruby { code => " case event.get('service') when 'merchant', 'business', 'admin', 'logic', 'interconnection' index_prefix = event.get('service') + '-' else index_prefix = 'unknown_project-' end case event.get('level') when 'INFO', 'WARN', 'ERROR' level_suffix = event.get('level').downcase else level_suffix = 'unknown_level' end event.set('index_name', index_prefix + level_suffix + '-' + Time.now.strftime('%Y.%m.%d')) " } } output { #Send to es stdout { codec => rubydebug } elasticsearch { hosts => "es:9200" index => "%{index_name}" #Set template name for scheduled deletion template_name => "%{service}" #Duplicate template names will not be overwritten template_overwrite => false } }
4. Configure es template
1. Configure es template
#Create template # admin bussiness merchant nettyLogic interconnection PUT_template/merchant { "index_patterns": ["merchant*"], "settings": { "number_of_replicas": 0, "index.lifecycle.name": "test_policy" } }
2. Set life cycle
# admin bussiness merchant nettyLogic interconnection PUT _ilm/policy/test_policy { "policy": { "phases": { "delete": { "min_age": "1m", "actions": { "delete": {} } } } } }