Flink on Kubernetes Computing and Storage Separation Practice

Cloud native has become one of the major trends in the industry. Migrating Flink from Yarn to the Kubernetes platform brings many advantages. Under this architecture, computing and storage are decoupled, and the computing part runs on Kubernetes, while the storage uses distributed storage systems such as HDFS. The advantage of such an architecture is that computing and storage resources can be independently adjusted according to actual conditions, thereby improving overall efficiency and elasticity. (Enter the flink group + v: zoomake1024)

This article will introduce four deployment modes of Flink on Kubernetes. Among them, two are deployed based on Native Kubernetes, with Session mode and Application mode respectively. The other two are deployed based on Flink Kubernetes Operator, also including Session mode and Application mode.

First introduce the Application mode based on Flink Kubernetes Operator deployment. If you want to run the jar package written by yourself, you need to build a mirror first. If you use HDFS, Hudi and other components, you also need to copy the Hadoop client and configuration files to the image in the Dockerfile, and set the corresponding environment variables. At the same time, copy all dependent jar packages to the lib directory of Flink Home.

FROM flink:1.16.1-scala_2.12

USER root
RUN rm -f /etc/localtime & amp; & amp; ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime & amp; & amp; echo "Asia/Shanghai" > /etc/timezone
ADD hadoop-3.1.1.tar.gz /opt
ADD jdk1.8.0_121.tar.gz /opt

RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar
RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar
COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/
COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/

RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/

RUN mkdir -p /opt/hadoop/conf/
COPY yarn-site.xml /opt/hadoop/conf/
COPY core-site.xml /opt/hadoop/conf/
COPY hdfs-site.xml /opt/hadoop/conf/

COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/
COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/

RUN mkdir $FLINK_HOME/mylib
COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib

RUN chown -R flink:flink $FLINK_HOME/mylib

RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \\
\
export HADOOP_HOME=/opt/hadoop-3.1.1 \\
\
PATH=$PATH:$JAVA_HOME/bin\\
\
PATH=$PATH:$HADOOP_HOME/bin'\
>> ~/.bashrc

EXPOSE 8081

To build the image, execute the following command in the directory where the Dockerfile is located to ensure that the directory contains the files used to build the image.

docker build -t flink-native/flink-on-k8s-xxxx .

install helm

curl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes
echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list
sudo apt-get update
sudo apt-get install helm

Install the cert-manager component, which provides certificate services.

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml

Install Flink Kubernetes Operator

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace

After executing the above command, the image will be pulled from the ghcr.io/apache/flink-kubernetes-operator:7fc23a1 mirror warehouse. Due to the slow download speed, you can try to pull the image from the apache/flink-kubernetes-operator:7fc23a1 warehouse, and then add tags to it

docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1

If you encounter a situation where kubectl delete cannot be deleted during reinstallation, you can try the following command to delete:

kubectl patch crd/flinksessionjobs.flink.apache.org -p '{"metadata":{"finalizers":[]}}' --type=merge

By executing the above command, the resource can be successfully deleted.

View custom resources kubectl get customresourcedefinition

Build a YAML file to submit tasks. Among them, image specifies the image, jarURI specifies the location of the jar package in the image, entryClass specifies the class to be executed, and args specifies the parameters required by the class:

kind: FlinkDeployment
metadata:
  name: flink-application-xxx
spec:
  image: flink-native/flink-on-k8s-xxxx
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "n"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "nm"
      cpu: n
  taskManager:
    resource:
      memory: "nm"
      cpu: n
  job:
    jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar
    entryClass: com.xxx.run.XXXJob
    parallelism: n
    upgradeMode: stateless
    args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]

Submit the task kubectl create -f xxx.yaml

View flinkdeployment kubectl get flinkdeployment

View logs kubectl logs -f deploy/flink-application-xxx

Flink on K8S Session mode and Application mode

You need to install the Flink client, download the flink compressed package, and unzip it

Set namespace preferences, grants, etc.

kubectl create ns flink-native
kubectl config set-context --current --namespace=flink-native
kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-native

Session mode, start Flink cluster

bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=xxx\
  -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \
  -Dkubernetes.namespace=flink-native\
  -Dkubernetes.service-account=flink \
  -Dclassloader.check-leaked-classloader=false \
  -Dkubernetes.rest-service.exposed.type=ClusterIP\
  -Dtaskmanager.memory.process.size=4096m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=60000 \
  -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

port forwarding

nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &

Open the Flink Web UI, you can see that there is only jobmanager at this time

Submit tasks to the cluster and run test tasks

bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar

Run your own jar package

bin/flink run -e kubernetes-session \
    -Dkubernetes.namespace=flink-native \
    -Dkubernetes.rest-service.exposed.type=NodePort \
    -Dkubernetes.cluster-id=xxx\
    -c com.xxx.run.XXXJob \
   mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2

Check the pod, you can see that taskmanager is generated

kubectl get pod -o wide -A

View the log, use the following command to see the output of the test program TopSpeedWindowing

kubectl logs my-first-flink-cluster-taskmanager-1-1 -n flink-native

View task list:

bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx 

Delete tasks by ID:

bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr 3ff3c5a5e3c2f47e024e2771dc108f77

Application mode

bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=xxx\
    -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \
    -Dkubernetes.namespace=flink-native\
    -Dkubernetes.service-account=flink \
    -Dclassloader.check-leaked-classloader=false \
    -Dkubernetes.rest-service.exposed.type=ClusterIP\
    -c com.sohu.longuserprofile.run.TestJob \
    local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2

Session mode can only submit local (host) jar packages, and Application mode can only use local:///

Common commands

k8s web ui login token acquisition
kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token:

View a list of all pods
kubectl get pod -o wide -A

View pod details
kubectl describe pod pod_name -n flink-native

delete deployment
kubectl delete deployment/my-first-flink-cluster

enter the pod
kubectl exec -it -n flink-native pod_name /bin/bash

get all namespaces
kubectl get namespace

copy out
kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home

copy in
kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge Cloud native entry skill treeHomepageOverview 12885 people are learning systematically