Gibbs Kim's playground

[Data Engineering-1] Toy level test (KAFKA-Flink pipeline) 본문

Tech 기록지/Data Engineering

[Data Engineering-1] Toy level test (KAFKA-Flink pipeline)

Lio Grande 2024. 7. 11. 16:46

Apache Flink를 사용해볼 일이 생겨 pipeline 구현에 대한 감을 잡기 위해 작업을 수행하였다.

관련 내용을 정리해본다.

# Test Environment

## OS: Windows 11 pro
## VM: WSL2

 

가상환경(이하 wsl)에서 Ubuntu를 구동하기 위해 Windows powerShell에서 다음 명령어를 수행하여 Ubuntu 설치

wsl --install

# Install Ubuntu through this command, version is 22.04.3 LTS at the test

가상환경 초기화가 필요한 경우 다음을 참고 -> https://webnautes.tistory.com/2013


Pipeline 구성요소 (DB, Message broker, data processing tool)

 

1. DB (postgreSQL)

# 1. INSTALL
$ sudo apt update
$ sudo apt upgrade
$ sudo apt install postgresql

윈도우 PC와 WSL-ubuntu 간의 localhost loopback을 방지하기 위한 추가 설정

# 2. Database external access configure
$ cd /etc/postgresql/{version}/{class}			# For example, version is 14, class is main
$ sudo vi postgresql.conf

# Edit line 60 of 'postgresql.conf'
#listen_addresses = 'localhost'
listen_addresses = '*'

# (After save in 'postgresql.conf')
$ sudo vi pg_hba.conf

# Add follow context at the last line in configure file
#-------------------------------------------------------
# TYPE	DATABASE	USER	ADDRESS		METHOD
  host	all			all		0.0.0.0/0   md5
#-------------------------------------------------------

Database 실행 및 프로세스 구동 상태 확인

# Run database process
$ sudo service postgresql start

# Check status of database process
$ sudo service postgresql status

DB 접속 및 테스트 환경 구성하기

# Access database (postgreSQL)
$ sudo -u postgres psql

# Create user
postgres=# CREATE USER flink_user WITH PASSWORD 'flink_password';
CREATE ROLE

# Create database
postgres=# CREATE DATABASE testdb OWNER flink_user;
CREATE DATABASE

# Set user privilege for dattabase
postgres=# GRANT ALL PRIVILEGES ON DATABASE testdb TO flink_user;
GRANT

# Create table
postgres=# \c testdb
You are now connected to database "testdb" as your "postgres"

testdb=# CREATE TABLE test_table(id SERIAL PRIMARY KEY, obj JSONB);
CREATE TABLE

DB table 및 column 권한 추가하기

# Add INSERT privilege for specific user
testdb=# GRANT INSERT ON TABLE test_table TO flink_user;

## If you check privilege for specific table
testdb=# \dp test_table

                              Access privileges
 Schema |    Name    | Type  |     Access privileges     | Column privileges | Policies 
--------+------------+-------+---------------------------+-------------------+----------
 public | test_table | table | postgres=arwdDxt/postgres+|                   | 
        |            |       | =arwdDxt/postgres         |                   | 
(1 row)

# Add column privilege about auto increment for specific user
testdb=# GRANT USAGE ON SEQUENCE test_table_id_seq TO flink_user;

### 위 권한부여가 안되어 있는 경우, 외부 INSERT 요청에서 id를 생략할 수 없음 ###

## If you check privilege for sequence object of id column
testdb=# \dp test_table_id_seq

                        Access privileges
 Schema  |     Name      | Type  |     Access privileges     | Column privileges | Policies 
---------+---------------+-------+---------------------------+-------------------+----------
 public  | test_table_id_seq | seq   | postgres=rwU/postgres    |                   | 
(1 row)

2. Message broker (KAFKA)

# GET KAFKA
$ wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
$ tar -xvf kafka_2.13-2.8.2.tgz

## --- To avoid WSL localhost loopback, edit 'server.properties' --- ##

$ cd ${KAFKA_HOME}/config
$ sudo vi server.properties

# Edit line 36 of 'server.properties'
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://172.31.20.82:9092

Kafka 실행

# 1) Start zookeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

# 2) Start kafka service
$ bin/kafka-server-start.sh config/server.properties

Kafka Topic 생성 및 확인

# Create topic as named 'test-topic'
$ bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
Created topic test-topic.

# Check topic
$ bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092

## Example based on the result of 'chatGPT' ##
Topic: test-topic	PartitionCount: 3	ReplicationFactor: 2	Configs: segment.bytes=1073741824
	Topic: test-topic	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: test-topic	Partition: 1	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: test-topic	Partition: 2	Leader: 3	Replicas: 3,1	Isr: 3,1

3. Data processing tool (Apache Flink)

# 1. GET Flink
$ wget -c https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-bin-scala_2.12.tgz
$ tar -xvzf flink-1.15.4-bin-scala_2.12.tgz

# 2. Setup the environment value for Apache flink
$ sudo vi /etc/environment

# -----------------------------------
export FLINK_HOME=/path/to/flink-1.15.4
export PATH=$PATH:$FLINK_HOME/bin
# -----------------------------------

# 3. Apply configure of Apache flink
$ source /etc/environment

윈도우 PC와 WSL-ubuntu 간의 localhost loopback을 방지하기 위한 추가 설정

$ cd {Flink_HOME}/conf
$ sudo vi flink-conf.yaml

# Edit line 33 of 'flink-conf.yaml'
# ---------------------------------
jobmanager.rpc.address: localhost
jobmanager.rpc.address: 172.31.20.82
# ---------------------------------

# Edit line 47 of 'flink-conf.yaml'
# ---------------------------------
jobmanager.bind-host: localhost
jobmanager.bind-host: 172.31.20.82
# ---------------------------------

# Edit line 64 of 'flink-conf.yaml'
# ---------------------------------
taskmanager.bind-host: localhost
taskmanager.bind-host: 0.0.0.0
# ---------------------------------

# Edit line 76 of 'flink-conf.yaml'
# ---------------------------------
taskmanager.host: localhost
taskmanager.host: 172.31.20.82
# ---------------------------------

# Edit line 190 of 'flink-conf.yaml'
# ---------------------------------
rest.address: localhost
rest.address: 172.31.20.82
# ---------------------------------

# Edit line 203 of 'flink-conf.yaml'
# ---------------------------------
rest.bind-address: localhost
rest.bind-address: 0.0.0.0
# ---------------------------------

Flink process 실행 & 종료

# Start
$ ./{FLINK_HOME}\bin\start-cluster.sh

# Stop
$ ./{FLINK_HOME}\bin\stop-cluster.sh

테스트

1. message 등록을 위한 Producer

# Create sample data
$ sudo vi topic-input.txt

#--------------------
Hello World
My name is blabla
#--------------------

# Kafka producer CLI
$ bin/kafka-console-producer.sh --bootstrap-server 172.31.20.82:9092 --topic test-topic < topic-input.txt

## Kafka Consumer가 구동중일 때, 위 CLI를 실행하여 데이터를 등록 ##

2. message 확인 및 처리를 위한 Consumer

private void kafkaFlinkPostgreSQL() throws Exception {
        // Arguments for Kafka, PostgreSQL
        String kafkaTopic = "test-topic";
        String kafkaURL = "172.31.20.82:9092";  // WSL ip value
        String kafkaGroup = "test-consumer-group";

        // Flink execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka properties
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURL);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup);
        kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Kafka consumer
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), kafkaProps);

        // Add Kafka consumer to Flink environment
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        kafkaStream.map(value -> {
        	// Processing Kafkastream convert to JSON
            JsonObject json = new JsonObject ();
            json.addProperty("message", value);
            return new Gson().toJson(json);
        }).addSink(new PostgreSQLSink());	// Call postgreSQL JDBC process

        // Execute Flink Job
        env.execute("Kafka to PostgreSQL Flink Job");
    }

3. Database INSERT를 위한 jdbc 처리

class PostgreSQLSink extends RichSinkFunction<String> {
    private static final String postgresUrl = "jdbc:postgresql://172.31.20.82:5432/testdb";
    private static final String postgresUser = "flink_user";
    private static final String postgresPassword = "flink_password";

    private Connection connection;
    private PreparedStatement preparedStatement;

    @Override
    public void invoke(String value, Context context) throws Exception {
        try (Connection connection = DriverManager.getConnection(postgresUrl, postgresUser, postgresPassword)) {
            String sql = "INSERT INTO test_table (obj) VALUES (?::jsonb)";
            try (PreparedStatement statement = connection.prepareStatement(sql)) {
                statement.setString(1, value);
                statement.executeUpdate();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

# All code

/* JSON Handle */
import org.apache.kafka.clients.producer.*;
import java.io.FileReader;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
import java.sql.DriverManager;
/* Kafka producer */
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/* Kafka consumer with Flink */
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/* gson */
import com.google.gson.JsonObject;
import com.google.gson.Gson;

public class JKP {
	public static void main(String[] args) throws Exception {
        JKP producerInstance = new JKP();
        producerInstance.kafkaFlinkPostgreSQL();	// Kafka consumer + postgreSQL INSERT part
    }
}

class PostgreSQLSink extends RichSinkFunction<String> {
    private static final String postgresUrl = "jdbc:postgresql://172.31.20.82:5432/testdb";
    private static final String postgresUser = "flink_user";
    private static final String postgresPassword = "flink_password";

    private Connection connection;
    private PreparedStatement preparedStatement;

    @Override
    public void invoke(String value, Context context) throws Exception {
        try (Connection connection = DriverManager.getConnection(postgresUrl, postgresUser, postgresPassword)) {
            String sql = "INSERT INTO test_table (obj) VALUES (?::jsonb)";
            try (PreparedStatement statement = connection.prepareStatement(sql)) {
                statement.setString(1, value);
                statement.executeUpdate();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

pom.xml

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>11</java.version>
    <flink.version>1.15.4</flink.version>
    <kafka.version>2.8.1</kafka.version>
    <postgres.version>42.2.24</postgres.version>
    <spring.boot.version>2.6.4</spring.boot.version>
  </properties>

  <repositories>
    <repository>
      <id>apache-flink</id>
      <name>Apache Flink Repository</name>
      <url>https://repo.maven.apache.org/maven2/</url>
    </repository>
    <repository>
      <id>central</id>
      <name>Maven Central Repository</name>
      <url>https://repo.maven.apache.org/maven2</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>

  <dependencies>
    <!-- slf4j -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>javax.annotation</groupId>
      <artifactId>javax.annotation-api</artifactId>
      <version>1.2</version>
    </dependency>
    <!-- Spring Boot Dependencies -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>${spring.boot.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
      <version>${spring.boot.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
      <version>${spring.boot.version}</version>
    </dependency>
    <!-- Apache Flink Dependencies -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-sql-client</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- Apache Kafka Dependencies -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- PostgreSQL Dependency -->
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>${postgres.version}</version>
    </dependency>
    <!-- JSON -->
    <dependency>
      <groupId>com.googlecode.json-simple</groupId>
      <artifactId>json-simple</artifactId>
      <version>1.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.5</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <!-- Maven Compiler Plugin -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
        </configuration>
      </plugin>
      <!-- Spring Boot Maven Plugin -->
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <version>${spring.boot.version}</version>
      </plugin>
    </plugins>
  </build>

DB Table 결과

 testdb=# select * from test_table;
 
 id |               obj
----+----------------------------------
  1 | {"message": "Hello World"}
  2 | {"message": "My name is blabla"}
(2 rows)