Skip to content

Commit

Permalink
feat: Added flink demo
Browse files Browse the repository at this point in the history
  • Loading branch information
tamis-laan committed Mar 26, 2024
1 parent 17c7f3c commit 49bac18
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 3 deletions.
49 changes: 49 additions & 0 deletions data/flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Check https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker for more details
FROM flink:1.19

# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source, \
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.

RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# install PyFlink
RUN pip3 install "apache-flink"

# add python script
USER flink
RUN mkdir /opt/flink/usrlib

# Add all pipelines
ADD pipelines /pipelines

# Set the pipeline directory as the workdir
WORKDIR /pipelines
7 changes: 7 additions & 0 deletions data/flink/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

build-local:
docker build . --tag pipelines:latest
kind load docker-image pipelines:latest

install:
python -m venv .venv
38 changes: 38 additions & 0 deletions data/flink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Flink on Kubernetes

The `Dockerfile` is the main docker image that we use to run any pipeline we want.

The `pipelines` folder contains all pipelines and is copied into the `Dockerfile` when it is build. The makefile contains handy commands for development.

The `setup.sh` script helps with installing the Flink kubernetes operator.

## Setup
Start a cluster using kind:
``` bash
kind create cluster
```

Install the flink operator
``` bash
./setup.sh
```

Build and load the docker image containing the pipelines
``` bash
make build-local
```

Apply your pipeline
``` bash
kubectl apply -f pipelines/python/demo/deploy.yaml
```

To stop the pipeline from running
``` bash
kubectl delete -f pipelines/python/demo/deploy.yaml
```

To stop and wipe the local cluster
``` bash
kind delete cluster
```
24 changes: 24 additions & 0 deletions data/flink/pipelines/python/demo/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: demo
spec:
image: pipelines:latest
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/pipelines/python/demo/main.py"]
parallelism: 1
upgradeMode: stateless
49 changes: 49 additions & 0 deletions data/flink/pipelines/python/demo/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import logging
import sys

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment


def demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE orders (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen'
)""")

t_env.execute_sql("""
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE orders""")
t_env.execute_sql("""
INSERT INTO print_table SELECT * FROM orders""")


if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
demo()
1 change: 1 addition & 0 deletions data/flink/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
apache-flink==1.19.0
13 changes: 13 additions & 0 deletions data/flink/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#! /bin/bash

# Add the Apache Flink k8s operator helm repository
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/

# Install certificate manager
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.14.4/cert-manager.yaml

# Wait for cert manager
kubectl wait --for=condition=Ready pods --all --namespace=cert-manager --timeout=300s

# Install Flink k8s operator
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
19 changes: 16 additions & 3 deletions mnist/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def forward(self, x):
x = self.fc2(x)
return torch.log_softmax(x, dim=1)

def main(epochs:int = 10, batch_size:int=64, learning_rate:float=0.001, model_file="model.onnx"):
def main(epochs:int = 10, batch_size:int=64, learning_rate:float=0.001, model_filename="model"):
# Define the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Expand Down Expand Up @@ -97,15 +97,28 @@ def main(epochs:int = 10, batch_size:int=64, learning_rate:float=0.001, model_fi
# Evaluation mode
model.eval()

# Define the trace
trace = torch.zeros(1, 1, 28, 28, dtype=torch.float)

# Export model in onnx format
torch.onnx.export(
model,
torch.zeros(1, 1, 28, 28, dtype=torch.float),
model_file,
trace,
model_filename + ".onnx",
input_names=['input'],
output_names=['output']
)

# Log
print('[*] Export model to torch script')
script_module = torch.jit.trace(model, trace)

# Test the script module
script_module(trace)

# Save the torch script
script_module.save(model_filename + ".pt")

# Log
print('[*] done')

Expand Down
Binary file modified mnist/model.onnx
Binary file not shown.
Binary file added mnist/model.pt
Binary file not shown.

0 comments on commit 49bac18

Please sign in to comment.