CDC Connectors for Apache Flink® is a set of source connectors for Apache Flink®, ingesting changes from different databases using change data capture (CDC). CDC Connectors for Apache Flink® integrates Debezium as the engine to capture data changes. So it can fully leverage the ability of Debezium. See more about what is Debezium.
This README is meant as a brief walkthrough on the core features of CDC Connectors for Apache Flink®. For a fully detailed documentation, please see Documentation.
Connector | Database | Driver |
---|---|---|
mongodb-cdc | MongoDB: 3.6, 4.x, 5.0, 6.0 | MongoDB Driver: 4.3.4 |
mysql-cdc | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 | JDBC Driver: 8.0.28 |
oceanbase-cdc | OceanBase CE: 3.1.x, 4.x OceanBase EE: 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
oracle-cdc | Oracle: 11, 12, 19, 21 | Oracle Driver: 19.3.0.0 |
postgres-cdc | PostgreSQL: 9.6, 10, 11, 12, 13, 14 | JDBC Driver: 42.5.1 |
sqlserver-cdc | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 9.4.1.jre8 |
tidb-cdc | TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
Db2-cdc | Db2: 11.5 | Db2 Driver: 11.5.0.0 |
Vitess-cdc | Vitess: 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.26 |
The example shows how to continuously synchronize data, including snapshot data and incremental data, from multiple business tables in MySQL database to Doris for creating the ODS layer.
export FLINK_HOME=/path/to/your/flink/home
source:
type: mysql
host: localhost
port: 3306
username: admin
password: pass
tables: db0.commodity, db1.user_table_[0-9]+, [app|web]_order_\.*
sink:
type: doris
fenodes: FE_IP:HTTP_PORT
username: admin
password: pass
pipeline:
name: mysql-sync-doris
parallelism: 4
# Submit Pipeline
$ ./bin/flink-cdc.sh mysql-to-doris.yaml
Pipeline "mysql-sync-doris" is submitted with Job ID "DEADBEEF".
During the execution of the flink-cdc.sh script, the CDC task configuration is parsed and translated into a DataStream job, which is then submitted to the specified Flink cluster.
We need several steps to setup a Flink cluster with the provided connector.
FLINK_HOME/lib/
.The example shows how to create a MySQL CDC source in Flink SQL Client and execute queries on it.
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
);
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
Include following Maven dependency (available through Maven Central):
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release- branches by yourself. -->
<version>2.5-SNAPSHOT</version>
</dependency>
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
}
}
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
The dependencies are now available in your local .m2
repository.
The code in this repository is licensed under the Apache Software License 2.
CDC Connectors for Apache Flink® welcomes anyone that wants to help out in any way, whether that includes reporting problems, helping with documentation, or contributing code changes to fix bugs, add tests, or implement new features. You can report problems to request features in the GitHub Issues.
git clone https://github.com/your_name/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
git checkout -b my_feature
-- develop and commit
git push origin my_feature
You need to install the google-java-format plugin. Spotless together with google-java-format is used to format the codes.
It is recommended to automatically format your code by applying the following settings:
For earlier IntelliJ IDEA versions, the step 4 to 7 will be changed as follows.
.*\.java
to avoid formatting other file types.
Then the whole project could be formatted by command mvn spotless:apply
.Checkstyle is used to enforce static coding guidelines.
tools/maven/checkstyle.xml
which is located within your cloned repository.checkstyle.suppressions.file
with the value suppressions.xml
and click "Next".You can now import the Checkstyle configuration for the Java code formatter.
tools/maven/checkstyle.xml
located within your cloned repository.Then you could click "View" → "Tool Windows" → "Checkstyle" and find the "Check Module" button in the opened tool window to validate checkstyle. Or you can use the command mvn clean compile checkstyle:checkstyle
to validate.
Flink cdc documentations locates at docs/content
.
The contribution step is the same as the code contribution. We use markdown as the source code of the document.
DingTalk Chinese User Group
You can search the group number [33121212] or scan the following QR code to join in the group.
To get started, please see https://ververica.github.io/flink-cdc-connectors/
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。