Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- typescript
- CSV
- windows
- node.js
- KoA
- airflow
- kibana
- Data Engineering
- 7.7.1
- elastic
- grok
- configure
- filebeat
- Tutorial
- Crontab
- path.data
- elasticsearch
- package.json
- json
- DSL
- logstash
- PYTHON
- OPCUA
- query
- venv
- dense_vector
- ELK
- devtools
- framework
- ubuntu
Archives
- Today
- Total
Gibbs Kim's playground
[Data Engineering-1] Toy level test (KAFKA-Flink pipeline) 본문
Tech 기록지/Data Engineering
[Data Engineering-1] Toy level test (KAFKA-Flink pipeline)
Lio Grande 2024. 7. 11. 16:46Apache Flink를 사용해볼 일이 생겨 pipeline 구현에 대한 감을 잡기 위해 작업을 수행하였다.
관련 내용을 정리해본다.
# Test Environment
## OS: Windows 11 pro
## VM: WSL2
가상환경(이하 wsl)에서 Ubuntu를 구동하기 위해 Windows powerShell에서 다음 명령어를 수행하여 Ubuntu 설치
wsl --install
# Install Ubuntu through this command, version is 22.04.3 LTS at the test
가상환경 초기화가 필요한 경우 다음을 참고 -> https://webnautes.tistory.com/2013
Pipeline 구성요소 (DB, Message broker, data processing tool)
1. DB (postgreSQL)
# 1. INSTALL
$ sudo apt update
$ sudo apt upgrade
$ sudo apt install postgresql
윈도우 PC와 WSL-ubuntu 간의 localhost loopback을 방지하기 위한 추가 설정
# 2. Database external access configure
$ cd /etc/postgresql/{version}/{class} # For example, version is 14, class is main
$ sudo vi postgresql.conf
# Edit line 60 of 'postgresql.conf'
#listen_addresses = 'localhost'
listen_addresses = '*'
# (After save in 'postgresql.conf')
$ sudo vi pg_hba.conf
# Add follow context at the last line in configure file
#-------------------------------------------------------
# TYPE DATABASE USER ADDRESS METHOD
host all all 0.0.0.0/0 md5
#-------------------------------------------------------
Database 실행 및 프로세스 구동 상태 확인
# Run database process
$ sudo service postgresql start
# Check status of database process
$ sudo service postgresql status
DB 접속 및 테스트 환경 구성하기
# Access database (postgreSQL)
$ sudo -u postgres psql
# Create user
postgres=# CREATE USER flink_user WITH PASSWORD 'flink_password';
CREATE ROLE
# Create database
postgres=# CREATE DATABASE testdb OWNER flink_user;
CREATE DATABASE
# Set user privilege for dattabase
postgres=# GRANT ALL PRIVILEGES ON DATABASE testdb TO flink_user;
GRANT
# Create table
postgres=# \c testdb
You are now connected to database "testdb" as your "postgres"
testdb=# CREATE TABLE test_table(id SERIAL PRIMARY KEY, obj JSONB);
CREATE TABLE
DB table 및 column 권한 추가하기
# Add INSERT privilege for specific user
testdb=# GRANT INSERT ON TABLE test_table TO flink_user;
## If you check privilege for specific table
testdb=# \dp test_table
Access privileges
Schema | Name | Type | Access privileges | Column privileges | Policies
--------+------------+-------+---------------------------+-------------------+----------
public | test_table | table | postgres=arwdDxt/postgres+| |
| | | =arwdDxt/postgres | |
(1 row)
# Add column privilege about auto increment for specific user
testdb=# GRANT USAGE ON SEQUENCE test_table_id_seq TO flink_user;
### 위 권한부여가 안되어 있는 경우, 외부 INSERT 요청에서 id를 생략할 수 없음 ###
## If you check privilege for sequence object of id column
testdb=# \dp test_table_id_seq
Access privileges
Schema | Name | Type | Access privileges | Column privileges | Policies
---------+---------------+-------+---------------------------+-------------------+----------
public | test_table_id_seq | seq | postgres=rwU/postgres | |
(1 row)
2. Message broker (KAFKA)
# GET KAFKA
$ wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
$ tar -xvf kafka_2.13-2.8.2.tgz
## --- To avoid WSL localhost loopback, edit 'server.properties' --- ##
$ cd ${KAFKA_HOME}/config
$ sudo vi server.properties
# Edit line 36 of 'server.properties'
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://172.31.20.82:9092
Kafka 실행
# 1) Start zookeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
# 2) Start kafka service
$ bin/kafka-server-start.sh config/server.properties
Kafka Topic 생성 및 확인
# Create topic as named 'test-topic'
$ bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
Created topic test-topic.
# Check topic
$ bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
## Example based on the result of 'chatGPT' ##
Topic: test-topic PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test-topic Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
3. Data processing tool (Apache Flink)
# 1. GET Flink
$ wget -c https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-bin-scala_2.12.tgz
$ tar -xvzf flink-1.15.4-bin-scala_2.12.tgz
# 2. Setup the environment value for Apache flink
$ sudo vi /etc/environment
# -----------------------------------
export FLINK_HOME=/path/to/flink-1.15.4
export PATH=$PATH:$FLINK_HOME/bin
# -----------------------------------
# 3. Apply configure of Apache flink
$ source /etc/environment
윈도우 PC와 WSL-ubuntu 간의 localhost loopback을 방지하기 위한 추가 설정
$ cd {Flink_HOME}/conf
$ sudo vi flink-conf.yaml
# Edit line 33 of 'flink-conf.yaml'
# ---------------------------------
jobmanager.rpc.address: localhost
jobmanager.rpc.address: 172.31.20.82
# ---------------------------------
# Edit line 47 of 'flink-conf.yaml'
# ---------------------------------
jobmanager.bind-host: localhost
jobmanager.bind-host: 172.31.20.82
# ---------------------------------
# Edit line 64 of 'flink-conf.yaml'
# ---------------------------------
taskmanager.bind-host: localhost
taskmanager.bind-host: 0.0.0.0
# ---------------------------------
# Edit line 76 of 'flink-conf.yaml'
# ---------------------------------
taskmanager.host: localhost
taskmanager.host: 172.31.20.82
# ---------------------------------
# Edit line 190 of 'flink-conf.yaml'
# ---------------------------------
rest.address: localhost
rest.address: 172.31.20.82
# ---------------------------------
# Edit line 203 of 'flink-conf.yaml'
# ---------------------------------
rest.bind-address: localhost
rest.bind-address: 0.0.0.0
# ---------------------------------
Flink process 실행 & 종료
# Start
$ ./{FLINK_HOME}\bin\start-cluster.sh
# Stop
$ ./{FLINK_HOME}\bin\stop-cluster.sh
테스트
1. message 등록을 위한 Producer
# Create sample data
$ sudo vi topic-input.txt
#--------------------
Hello World
My name is blabla
#--------------------
# Kafka producer CLI
$ bin/kafka-console-producer.sh --bootstrap-server 172.31.20.82:9092 --topic test-topic < topic-input.txt
## Kafka Consumer가 구동중일 때, 위 CLI를 실행하여 데이터를 등록 ##
2. message 확인 및 처리를 위한 Consumer
private void kafkaFlinkPostgreSQL() throws Exception {
// Arguments for Kafka, PostgreSQL
String kafkaTopic = "test-topic";
String kafkaURL = "172.31.20.82:9092"; // WSL ip value
String kafkaGroup = "test-consumer-group";
// Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka properties
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaURL);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroup);
kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Kafka consumer
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), kafkaProps);
// Add Kafka consumer to Flink environment
DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
kafkaStream.map(value -> {
// Processing Kafkastream convert to JSON
JsonObject json = new JsonObject ();
json.addProperty("message", value);
return new Gson().toJson(json);
}).addSink(new PostgreSQLSink()); // Call postgreSQL JDBC process
// Execute Flink Job
env.execute("Kafka to PostgreSQL Flink Job");
}
3. Database INSERT를 위한 jdbc 처리
class PostgreSQLSink extends RichSinkFunction<String> {
private static final String postgresUrl = "jdbc:postgresql://172.31.20.82:5432/testdb";
private static final String postgresUser = "flink_user";
private static final String postgresPassword = "flink_password";
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void invoke(String value, Context context) throws Exception {
try (Connection connection = DriverManager.getConnection(postgresUrl, postgresUser, postgresPassword)) {
String sql = "INSERT INTO test_table (obj) VALUES (?::jsonb)";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, value);
statement.executeUpdate();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
# All code
/* JSON Handle */
import org.apache.kafka.clients.producer.*;
import java.io.FileReader;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
import java.sql.DriverManager;
/* Kafka producer */
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/* Kafka consumer with Flink */
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/* gson */
import com.google.gson.JsonObject;
import com.google.gson.Gson;
public class JKP {
public static void main(String[] args) throws Exception {
JKP producerInstance = new JKP();
producerInstance.kafkaFlinkPostgreSQL(); // Kafka consumer + postgreSQL INSERT part
}
}
class PostgreSQLSink extends RichSinkFunction<String> {
private static final String postgresUrl = "jdbc:postgresql://172.31.20.82:5432/testdb";
private static final String postgresUser = "flink_user";
private static final String postgresPassword = "flink_password";
private Connection connection;
private PreparedStatement preparedStatement;
@Override
public void invoke(String value, Context context) throws Exception {
try (Connection connection = DriverManager.getConnection(postgresUrl, postgresUser, postgresPassword)) {
String sql = "INSERT INTO test_table (obj) VALUES (?::jsonb)";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setString(1, value);
statement.executeUpdate();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<flink.version>1.15.4</flink.version>
<kafka.version>2.8.1</kafka.version>
<postgres.version>42.2.24</postgres.version>
<spring.boot.version>2.6.4</spring.boot.version>
</properties>
<repositories>
<repository>
<id>apache-flink</id>
<name>Apache Flink Repository</name>
<url>https://repo.maven.apache.org/maven2/</url>
</repository>
<repository>
<id>central</id>
<name>Maven Central Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.2</version>
</dependency>
<!-- Spring Boot Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- Apache Flink Dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Apache Kafka Dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- PostgreSQL Dependency -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
</dependency>
<!-- JSON -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Maven Compiler Plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Spring Boot Maven Plugin -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</plugin>
</plugins>
</build>
DB Table 결과
testdb=# select * from test_table;
id | obj
----+----------------------------------
1 | {"message": "Hello World"}
2 | {"message": "My name is blabla"}
(2 rows)
'Tech 기록지 > Data Engineering' 카테고리의 다른 글
[Data Engineering-6] 윈도우 검색 findstr (0) | 2024.07.26 |
---|---|
[Data Engineering-5] Airflow 설치 및 실행 테스트 (with postgreSQL) (1) | 2024.07.26 |
[Data Engineering-4] Ubuntu 고정(static) IP 설정하기 (with Netplan) (0) | 2024.07.15 |
[Data Engineering-3] Crontab 로그 주기별 삭제 (0) | 2024.07.15 |
[Data Engineering-2] Crontab 날짜별 로그 생성 (0) | 2024.07.15 |