Flink on k8s container log generation principle and comparison with log generation mode during Yarn deployment

Detailed explanation of Flink on k8s deployment logs and comparison of log generation modes during deployment with Yarn

Recently, I need to switch flink from the original deployment to the Yarn cluster to the kubernetes cluster. After the switch, I need to be familiar with the operating mode of flink on k8s. During the use of the log module, I found that in the k8s container, flink only had two system logs: jobmanager.log/taskmanager.log. However, when using Yarn cluster deployment, there would be multiple flink logs, such as: jobmanager .log, jobmanager.err and jobmanager.out, the same applies to TaskManager.

Therefore, some colleagues asked why there is only one .log file when deploying in k8s. Can the log files be distinguished similar to how they are deployed in Yarn? If you just look at the container logs, if you don’t know enough about k8s at the beginning, you will feel that the log collection is not accurate enough.

Therefore, it is up to me to research and solve the above problem. There is also relatively little relevant information on the Internet, so after this overall understanding and analysis of the above issue, I will conduct a study record. If you encounter similar problems, you can also refer to this idea.

1. If you think you need to modify the log4j configuration

The first step to thinking about this problem is that since you want to distinguish log categories, you can modify the configuration of log4j and write the INFO category and ERROR category into different log files. Therefore, first modify conf/log4j-console.properties in the flink path (When deploying flink on k8s, the log4j configuration file used is the flink-console.properties file, not log4j.properties) .

Here we are left with a small question: Why is log4j-console.properties used when deploying to k8s instead of log4j.properties when deploying to Yarn? What’s the difference?

A modified log4j-console.properties example looks like this:

############################################## #################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################ ##############################

# Allows this configuration to be modified at runtime. The file will be checked every 30 seconds.
monitorInterval=30

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
rootLogger.appenderRef.errorLogFile.ref = errorLogFile

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = true
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
appender.rolling.filter.threshold.type = LevelMatchFilter
appender.rolling.filter.threshold.level = INFO
appender.rolling.filter.threshold.onMatch = ACCEPT
appender.rolling.filter.threshold.onMisMatch = DENY

appender.errorFile.name = errorLogFile
appender.errorFile.type = RollingFile
appender.errorFile.append = true
appender.errorFile.fileName = ${sys:log.file}.err
appender.errorFile.filePattern = ${sys:log.file}.err.%i
appender.errorFile.layout.type = PatternLayout
appender.errorFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.errorFile.policies.type = Policies
appender.errorFile.policies.size.type = SizeBasedTriggeringPolicy
appender.errorFile.policies.size.size = 100MB
appender.errorFile.policies.startup.type = OnStartupTriggeringPolicy
appender.errorFile.strategy.type = DefaultRolloverStrategy
appender.errorFile.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}
appender.errorFile.filter.threshold.type = ThresholdFilter
appender.errorFile.filter.threshold.level = ERROR
appender.errorFile.filter.threshold.onMatch = ACCEPT
appender.errorFile.filter.threshold.onMisMatch = DENY

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

Compared with the modifications to the original files, the modifications here mainly focus on the following two aspects:

  1. Increase the threshold parameter of RollingFileAppender. Because I initially hoped that the .log log would only display INFO logs and not other types of logs. However, the threshold.level parameter introduced on the log4j official website filters logs that are lower than the set type. For example, when threshold.level=INFO, logs with a lower level than INFO, such as DEBUG, will be filtered, while logs with a higher level, such as ERROR, will still be retained. After searching for some information, I found the configuration of threshold.type = LevelMatchFilter. This configuration can make the current appender only retain logs of the set log type, thus realizing the requirement of retaining only INFO logs.
  2. Added appender of errorLogFile. The configuration is the same as above, so that the current appender only retains log data of ERROR type.

Observing the log4j-console.properties configuration above, we can find that when setting the file name, a system variable ${sys:log.file} is used. Anyone who has used flink configuration should be familiar with this system variable. It specifies the local flink log. Default path, such as /opt/log/jobmanager.log.

After testing, using the above log4j configuration can realize my original idea, which is to separate INFO logs and ERROR logs and write them to different files. However, after comparing it with the files during Yarn deployment, it was found that it actually did not meet the original requirements.

Because in Yarn, ERROR log type data also exists in the .log log, and it does not seem to be separated using log4j configuration. And I checked the log4j.properties configuration and found no configuration similar to this to distinguish log types. At the same time, in Yarn, the .err log outputs task exception information, such as e.printStackTrace(), and the .out log outputs data similar to System.out.println. The configuration of log4j is actually just for configuring and processing the system log when flink is executed, which seems to be different from the above scenario.

Therefore, we need to look for new ideas. After exploring, we decided to start with the system variables based on this log.file and the source code of flink.

2. Flink source code analysis-Yarn

After local git clone the source code of flink, switch to the flink1.12 version branch, conduct a global search for “log.file”, and find the BootstrapTools class under the flink-runtime module. In this class Below, there is a getTaskManagerShellCommand method. In the method, there is a very useful code, as shown below:

startCommandValues.put(
                "redirects",
                "1> "
                  + logDirectory
                  + "/taskmanager.out "
                  + "2> "
                  + logDirectory
                  + "/taskmanager.err"
           );

As you can see, isn’t this the .out and .err files we originally wanted to generate! ! . So what does redirects here mean?

After observing the source code, we know that flink has set up a template module to start the command line, with a placeholder for redirects, so the above is actually the subsequent replacement of the redirects placeholder with the redirect command.

Next, let’s take a look at where this method is called. We find that in addition to being called in the BootstrarpToolsTest test class, it is only called in the flink-yarn project src/main/java/org/apache/ It is used in the flink/yarn/Utils.java class, as shown below:

String launchCommand =
                BootstrapTools.getTaskManagerShellCommand(
                        flinkConfig,
                        tmParams,
                        ".",
                        ApplicationConstants.LOG_DIR_EXPANSION_VAR,
                        hasLogback,
                        hasLog4j,
                        hasKrb5,
                        taskManagerMainClass,
                        taskManagerDynamicProperties);

        if (log.isDebugEnabled()) {<!-- -->
            log.debug("Starting TaskManagers with command: " + launchCommand);
        } else {<!-- -->
            log.info("Starting TaskManagers");
        }

Therefore, when deploying to a Yarn cluster, the above method will be used when building the TaskManager startup command. At the same time, the above code found that when the log.isDebugEnabled() condition is met, this startup command can be printed out. How can this condition be met? In fact, log.isDebugEnabled() means that the current log4j configuration allows printing DEBUG type logs. Therefore, we go to flink’s conf/log4j.properties and modify rootLogger.level = INFO => rootLogger.level = DEBUG, and then rerun the task, you can see this startup command in the log:

As you can see, at the end of the startup command, there is the redirection command in the above code. This redirection command redirects the standard output and standard error to the .out and .err files respectively.

At this point, we have successfully located the reason why .err and .out logs can be generated in Yarn. In fact, it is because of such a redirection statement that the standard output and standard error during the execution of the flink task are redirected to the .out and .err files respectively. This also explains why during Yarn deployment, the .err log displays abnormal information, such as e.printStackTrace(), and the .out file outputs log data including System.out

After understanding Yarn’s log generation mechanism, let’s take a look at how k8s is implemented?

3. Flink source code analysis-Kubernetes

So when deploying k8s, is there also such a redirect statement? In order to find out, we still analyze the source code of flink version 1.12. Under the flink-kubernetes project, there is a src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java class. Under this class, there is a getCommonStartCommand code> method, this method is similar to the getTaskManagerShellCommand method above, and is also used to construct startup command parameters. However, under this method, I found that there is no such redirection statement:

startCommandValues.put(
                "logging",
                getLogging(
                        logDirectory + "/" + logFileName + ".log",
                        configDirectory,
                        hasLogback,
                        hasLog4j));

There is only such a startup command configuration written to the .log file. Unfortunately, when deploying k8s, there is no way to print out the startup command statement in the DEBUG log type like Yarn above. However, we can still make a preliminary conclusion:

When flink is deployed on k8s, there are no .out and .err files. This is because the source code does not redirect standard output and standard error to the specified .out and .err files in the startup command parameters for starting TaskManager/JobManager. err file. The generated .log file is the rolling system log of RollingFile configured in log4j-console.properties.

At the same time, I found that in version 1.11 of flink, the above method still retains the same redirection statement as Yarn. However, since version 1.12, the redirection statement has been removed. What is the reason?

So far, we have found out the reason why there is only one .log file when flink is deployed to k8s. Next, in order to solve the original original problem, a solution needs to be solved.

4. Design solutions

The first solution that comes to mind is to copy the redirection source code from Yarn to the k8s code above, and then repackage Flink before deploying it. However, after trying this solution, I found that many exceptions would occur when using maven to package flink, such as the package could not be found. Moreover, flink has more than 180 poms to be packaged, which should take a very long time. As this requirement does not require large changes to the flink source code, I feel that this kind of debugging will take too much unnecessary time. So the change plan was abandoned.

Another solution is to find a way to manually add a redirection command in the outer layer after the startup command parameters defined in its original source code when packaging flink into an image. For this reason, observing the yaml of the pod, you can find that the parameters for container startup are under args, and the /docker-entrypoint.sh script is executed when starting the command.

With this information, find the startup script of docker-entrypoint.sh, open it and analyze it. From the log, you can know that during the execution of the script, you will enter the following branch:

The args parameter is the args parameter in the container above. You can see that the last line of the original branch is to execute exec $(drop_privs_cmd) bash -c "${args[@]}". Therefore, we can manually add the redirection of standard output and standard error to the specified file here, which is equivalent to adding a redirection statement to the startup parameters.

Here we also need to use the jobmanager or taskmanager displayed in -Dlog.file in the args parameter to determine whether the redirected file name is jobmanager.err or taskmanager.err. For this purpose, use the sed command to first obtain the content of -Dlog.file in args (i.e., the above parameter logFilePath), and then obtain the file name of jobmanager/taskmanager (i.e., logFileName parameter) from logFilePath.

Then, we add the redirect command:

exec $(drop_privs_cmd) bash -c "${args[@]} 1> /opt/flink/log/${logFileName}.out 2> /opt/flink/log/${logFileName}.err

At this point, when we successfully packaged the outer flink into an image, we manually added the redirection command after the startup command parameters, simulating the command during Yarn execution to generate .err and .out files. The next step is to package it into an image and then test it in k8s. After testing, we found that under the /opt/log/ path, three files: .out, .err and .log were actually generated! ! !

At the same time, it can be found through testing that the .err, .out and .log files correspond to the three parts of standard error, standard output and system files respectively. The same scenario as when deployed on Yarn is realized, and the problem originally raised in our article is solved! ! !

However. . But something went wrong.

5. Questions and understanding of k8s logs

After completing the above test, when I clicked on the pod again, or used the kubectl logs command to view the logs, I found that there were only some logs of the startup script in the logs, and the system logs of flink execution were gone! !

There is no other way but to analyze the reasons. In the official website of kubernetes, in the log architecture chapter, the following paragraph is written:

The container runtime processes and forwards all output written to the stdout and stderr streams of the containerized application. Different container runtimes implement this in different ways; however, their integration with the kubelet is standardized to the CRI log format.

By default, the kubelet retains a terminated container and its logs if the container is restarted. If a Pod is evicted from a node, all corresponding containers and their logs will also be evicted.

The kubelet provides logs for client access through special features of the Kubernetes API. The usual way to access this log is to run kubectl logs.

Although I don’t understand k8s enough now, reading the above paragraph made me realize that the container’s log collection may also be generated by monitoring stdout and stderr. . . Since I used the redirection command above to redirect both standard output and standard error to the specified file, stdout and stderr cannot monitor the log data, so the logs in the container cannot be obtained.

In other words, using the above method of redirecting standard output and standard error to write to specified files is equivalent to mapping the logs in the original container to .err, .out and .log log files according to the log type. Come down and show.

After analyzing it this way, I found that the reason why flink removed the redirection command from the source code after version 1.12 may be to use k8s log aggregation to write both stdout and stderr into the container log to facilitate subsequent follow-up. Monitor and analyze container logs and other operations.

Um. . At this point, I feel that the initial analysis above was in vain, because the log of the container itself actually contains all the log data, and there is no need to distinguish between .out and .err at all.

Insert a sentence here, do you still remember the question raised in the first part of the article? Here, let’s think about another question. At this point, we know that the container will process and forward the stdout and stderr streams. stderr contains the exception information when the flink task is executed, and stdout contains the standard output information when the task is executed. So where do the system logs such as INFO and ERROR log data when flink is executed be obtained from the container? The appender of RollingFile type configured in log4j does not belong to standard output.

So the answer to this question is why flink uses the log4j-console.properties configuration when submitting it to k8s deployment.

Because there is a ConsoleAppender configuration in log4j-console.properties, which prints flink’s system log to CONSOLE (System.out), so it is equivalent to printing the system log to standard output, and then the container listens to stdout Thereby obtaining the system log.

When deployed to Yarn, in the configuration of log4j.properties, you can see that there is no configuration of ConsoleAppender, so all its system logs are printed to the .log file.

Having solved this problem, let’s return to the previous analysis. The redirection operation we added above is equivalent to imitating the deployment method on Yarn. The logs in the original container are mapped to .err, .out and .log log files according to the log type for display. However, the logs in the container are lost at this time, which may have an impact on our subsequent log collection and analysis on the container.

So is there any solution?

Double write. Attempts to redirect both stdout and stderr while redirecting standard output and standard error to the specified file. For this purpose, we conducted a test, which is the following line of code in docker-entrypoint.sh:

exec $(drop_privs_cmd) bash -c "${args[@]} 1> >(tee /opt/flink/log/${logFileName}.out >/dev/stdout) 2> >(tee / opt/flink/log/${logFileName}.err >/dev/stderr)"

1> >(tee /opt/flink/log/${logFileName}.out >/dev/stdout) in the command indicates that the standard output will be redirected to an anonymous pipe and the pipe The content in is simultaneously output to the file /opt/flink/log/${logFileName}.out and the standard output device through the tee command.

After testing, the above function can be achieved, that is, both .out and .err files are available, and at the same time, the container log is restored to its original state.

However, it should be noted that since the log4j-console.properties configuration treats the system log as part of the standard output, the generated .out file actually contains the output of System.out and the system file in the task. The .err file only contains the standard error log content.

So far, the log effect achieved is:

  • Container logs: including system logs, standard output, and standard error
  • .out log: includes system log and standard output
  • .err log: Contains standard error

The above is this time, an in-depth exploration and thinking on the k8s log issue originally raised. During the research process, I also gained a deeper understanding of the log configuration of log4j. Due to the initial lack of understanding of containers and k8s technology, the final results seemed to be unsatisfactory, but isn’t technology a process of continuous exploration?

Regarding the above issues, if you encounter similar issues, you are welcome to discuss them with me. Thanks for reading!