Cloud native has become one of the major trends in the industry. Migrating Flink from Yarn to the Kubernetes platform brings many advantages. Under this architecture, computing and storage are decoupled, and the computing part runs on Kubernetes, while the storage uses distributed storage systems such as HDFS. The advantage of such an architecture is that computing and storage resources can be independently adjusted according to actual conditions, thereby improving overall efficiency and elasticity. (Enter the flink group + v: zoomake1024)
This article will introduce four deployment modes of Flink on Kubernetes. Among them, two are deployed based on Native Kubernetes, with Session mode and Application mode respectively. The other two are deployed based on Flink Kubernetes Operator, also including Session mode and Application mode.
First introduce the Application mode based on Flink Kubernetes Operator deployment. If you want to run the jar package written by yourself, you need to build a mirror first. If you use HDFS, Hudi and other components, you also need to copy the Hadoop client and configuration files to the image in the Dockerfile, and set the corresponding environment variables. At the same time, copy all dependent jar packages to the lib directory of Flink Home.
FROM flink:1.16.1-scala_2.12 USER root RUN rm -f /etc/localtime & amp; & amp; ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime & amp; & amp; echo "Asia/Shanghai" > /etc/timezone ADD hadoop-3.1.1.tar.gz /opt ADD jdk1.8.0_121.tar.gz /opt RUN rm /opt/hadoop-3.1.1/share/hadoop/common/lib/commons-math3-3.1.1.jar RUN rm /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/commons-math3-3.1.1.jar COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/common/lib/ COPY commons-math3-3.6.1.jar /opt/hadoop-3.1.1/share/hadoop/hdfs/lib/ RUN chmod -R 777 /opt/hadoop-3.1.1/share/hadoop/common/lib/ RUN mkdir -p /opt/hadoop/conf/ COPY yarn-site.xml /opt/hadoop/conf/ COPY core-site.xml /opt/hadoop/conf/ COPY hdfs-site.xml /opt/hadoop/conf/ COPY flink-shaded-hadoop-3-uber-3.1.1.7.0.3.0-79-7.0.jar $FLINK_HOME/lib/ COPY commons-cli-1.5.0.jar $FLINK_HOME/lib/ RUN mkdir $FLINK_HOME/mylib COPY xxx-1.0-SNAPSHOT.jar $FLINK_HOME/mylib RUN chown -R flink:flink $FLINK_HOME/mylib RUN echo 'export JAVA_HOME=/opt/jdk1.8.0_121 \\ \ export HADOOP_HOME=/opt/hadoop-3.1.1 \\ \ PATH=$PATH:$JAVA_HOME/bin\\ \ PATH=$PATH:$HADOOP_HOME/bin'\ >> ~/.bashrc EXPOSE 8081
To build the image, execute the following command in the directory where the Dockerfile is located to ensure that the directory contains the files used to build the image.
docker build -t flink-native/flink-on-k8s-xxxx .
install helm
curl https://baltocdn.com/helm/signing.asc | sudo apt-key add -sudo apt-get install apt-transport-https --yes echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list sudo apt-get update sudo apt-get install helm
Install the cert-manager component, which provides certificate services.
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
Install Flink Kubernetes Operator
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --namespace flink --create-namespace
After executing the above command, the image will be pulled from the ghcr.io/apache/flink-kubernetes-operator:7fc23a1 mirror warehouse. Due to the slow download speed, you can try to pull the image from the apache/flink-kubernetes-operator:7fc23a1 warehouse, and then add tags to it
docker tag apache/flink-kubernetes-operator:7fc23a1 ghcr.io/apache/flink-kubernetes-operator:7fc23a1
If you encounter a situation where kubectl delete cannot be deleted during reinstallation, you can try the following command to delete:
kubectl patch crd/flinksessionjobs.flink.apache.org -p '{"metadata":{"finalizers":[]}}' --type=merge
By executing the above command, the resource can be successfully deleted.
View custom resources kubectl get customresourcedefinition
Build a YAML file to submit tasks. Among them, image specifies the image, jarURI specifies the location of the jar package in the image, entryClass specifies the class to be executed, and args specifies the parameters required by the class:
kind: FlinkDeployment metadata: name: flink-application-xxx spec: image: flink-native/flink-on-k8s-xxxx flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "n" serviceAccount: flink jobManager: resource: memory: "nm" cpu: n taskManager: resource: memory: "nm" cpu: n job: jarURI: local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar entryClass: com.xxx.run.XXXJob parallelism: n upgradeMode: stateless args: ["hdfs://host:9000/data/input","hdfs://host:9000/data/output","n"]
Submit the task kubectl create -f xxx.yaml
View flinkdeployment kubectl get flinkdeployment
View logs kubectl logs -f deploy/flink-application-xxx
Flink on K8S Session mode and Application mode
You need to install the Flink client, download the flink compressed package, and unzip it
Set namespace preferences, grants, etc.
kubectl create ns flink-native kubectl config set-context --current --namespace=flink-native kubectl create serviceaccount flink kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=cluster-admin --serviceaccount=flink-native:flink --namespace=flink-native
Session mode, start Flink cluster
bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP\ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dresourcemanager.taskmanager-timeout=60000 \ -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
port forwarding
nohup kubectl -n flink-native port-forward --address 0.0.0.0 service/my-first-flink-cluster-rest 8081:8081 >port-forward.log &
Open the Flink Web UI, you can see that there is only jobmanager at this time
Submit tasks to the cluster and run test tasks
bin/flink run -e kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx -Dkubernetes.rest-service.exposed.type=NodePort -Dkubernetes.cluster-id=my-first-flink-cluster examples/streaming/TopSpeedWindowing.jar
Run your own jar package
bin/flink run -e kubernetes-session \ -Dkubernetes.namespace=flink-native \ -Dkubernetes.rest-service.exposed.type=NodePort \ -Dkubernetes.cluster-id=xxx\ -c com.xxx.run.XXXJob \ mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2
Check the pod, you can see that taskmanager is generated
kubectl get pod -o wide -A
View the log, use the following command to see the output of the test program TopSpeedWindowing
kubectl logs my-first-flink-cluster-taskmanager-1-1 -n flink-native
View task list:
bin/flink list --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxx
Delete tasks by ID:
bin/flink cancel --target kubernetes-session -Dkubernetes.namespace=flink-native -Dkubernetes.jobmanager.service-account=flink -Dkubernetes.cluster-id=xxxxr 3ff3c5a5e3c2f47e024e2771dc108f77
Application mode
bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=xxx\ -Dkubernetes.container.image=flink-native/flink-on-k8s-xxxx \ -Dkubernetes.namespace=flink-native\ -Dkubernetes.service-account=flink \ -Dclassloader.check-leaked-classloader=false \ -Dkubernetes.rest-service.exposed.type=ClusterIP\ -c com.sohu.longuserprofile.run.TestJob \ local:///opt/flink/mylib/xxx-1.0-SNAPSHOT.jar hdfs://host:9000/data/input hdfs://host:9000/data/output 2
Session mode can only submit local (host) jar packages, and Application mode can only use local:///
Common commands
k8s web ui login token acquisition kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep dashboard-admin | awk '{print $1}') | grep token: View a list of all pods kubectl get pod -o wide -A View pod details kubectl describe pod pod_name -n flink-native delete deployment kubectl delete deployment/my-first-flink-cluster enter the pod kubectl exec -it -n flink-native pod_name /bin/bash get all namespaces kubectl get namespace copy out kubectl cp -n application psqls-0:/var/lib/postgresql/data/pg_wal /home copy in kubectl cp /home/dades/pg_wal -n application psqls-0:/var/lib/postgresql/data/pg_wal
The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge Cloud native entry skill treeHomepageOverview 12885 people are learning systematically