Skip to content

Add an Apache Paimon CDC Ingestion using MSF Example #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

Moonlight-CL
Copy link

Purpose of the change

*Add an Example of Apache Paimon CDC Ingestion using MSF *

Verifying this change

  1. Tested on Managed Service for Apache Flink

Significant changes

  • Completely new example
  • Updated an existing example to newer Flink version or dependencies versions
  • Improved an existing example
  • Modified the runtime configuration of an existing example (i.e. added/removed/modified any runtime properties)
  • Modified the expected input or output of an existing example (e.g. modified the source or sink, modified the record schema)

@nicusX
Copy link
Contributor

nicusX commented Apr 3, 2025

Does this work on Amazon Managed Service for Apache Flink?
I see the readme mention "copying hive-site.xml from your EMR cluster. Flink on EMR is a different service. This repo is exclusively about Managed Service for Apache Flink

@Moonlight-CL
Copy link
Author

Does this work on Amazon Managed Service for Apache Flink? I see the readme mention "copying hive-site.xml from your EMR cluster. Flink on EMR is a different service. This repo is exclusively about Managed Service for Apache Flink

Hi nicuX, Yes, this works on Amazon Managed Service for Apache Flink(MSF), if we want to sync Paimon meta data to Glue Data Catalog , we need copy hive-site.xml to project, Paimon Hive Catalog will read connection info from this xml file.

Copy link
Contributor

@nicusX nicusX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many thank for the contribution.
The example is brilliant but a bit hard for a user to deploy and run e2e, especially locally.

I would also suggest to use, for the sake of this example, the configuration that requires fewer external dependencies, which is doing CDC on a databased directly from Flink. As opposed to requiring 1/ a database, 2/ CDC Kafka Connector, 3/ Kafka.
Also, showing you can do CDC directly in Flink without external dependencies is useful.

I would also recommend to drop the step-by-step instructions to create the app via CLI. It still requires several changes by the user for it to work. Also, for the sake of running the example, the user can create the application via Console or use any other automation tool they normally use

supported CDC format(Canal CDC, Debezium CDC, Maxwell CDC, OGG CDC, JSON, aws-dms-json ) data streamed in it.
* If you want to use Apache Paimon Hive catalog with Glue Data Catalog, please install aws-glue-datacatalog-hive3-client
jar file into your local maven repo(please refer this [github repo](https://github.com/awslabs/aws-glue-data-catalog-client-for-apache-hive-metastore) to install or
you can find this jar file in EMR Cluster and install it into your local maven repo) and copy your EMR cluster's `hive-site.xml` file into the project and repackage the project.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing for the reader. There is no EMR Cluster to copy the file from. The hive-site.xml is part of this example and that's it

| `KafkaConf` | `kafka_conf@_properties.auto.offset.reset` | Offset of the Kafka Consumer |
| `KafkaConf` | `kafka_conf@_properties.group.id` | Consumer group Id |
| `CatalogConf` | `catalog_conf@_metastore.client.class` | Paimon Hive Catalog metastore client class name |
| `CatalogConf` | `...` | ... |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "..." means here? What else the user can add?

All parameters are case-sensitive.

### Samples
**Create an MSF application**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not consistent with what we are doing with all other examples.
The goal of these examples is not to cover step by step the creation of an application. There is nothing specific to this example here, except passing the profile name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These instructions do not really explain what data the kafka source is expecting. What schema? What serialization?


### Generating data

You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is nor correct. The example uses Kafka.

To run this example e2e the user should set up a DB, and a CDC Kafka Connector. This is quite heavyweight, particularly to test the application locally.

Would it be possible to add a dockercompose the user can run to set up locally what required?

Also, doing CDC directly in Flink would simplify the requirements for running this example e2e, both locally and on AWS


</dependencies>

<profiles>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the goal of the example is clarity, I would omit using profiles to switch dependencies
The example is centered on Kafka source and only that dependency should be used without any profile.

The possibility of doing CDC from databases is very important, but it is barely mentioned in the README.
I would suggest to expand it a bit in the README, keep the dependencies for CDC in the POM but commented out, with comments like "uncomment this to source data from MySQL CDC" (etc)


This example parses runtime parameters according to the following rules and passes the parsed parameters to Apache Paimon Actions.

- The Paimon CDC ingestion action name is parsed from the key named action in the 'ActionConf' parameter group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be explained that this example reads from Kafka so you need to set up a CDC Kafka Connector

{
"PropertyGroupId": "KafkaConf",
"PropertyMap": {
"kafka_conf@_properties.bootstrap.servers": "b-2.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092,b-1.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not include real endpoint. Use a placeholder or evidently fake

"PropertyMap": {
"kafka_conf@_properties.bootstrap.servers": "b-2.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092,b-1.mycluster.bzvtby.c8.kafka.us-west-2.amazonaws.com:9092",
"kafka_conf@_topic": "kafka_topic",
"kafka_conf@_properties.group.id": 1234546,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be a string?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants