Member since
01-15-2019
49
Posts
31
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1204 | 07-20-2021 01:05 AM | |
11366 | 11-28-2019 06:59 AM |
03-25-2024
02:43 AM
Purpose
Detect updates to S3 files and insert the updated files into Aurora PostgreSQL with NiFi
Data flow
The finished dataflow
Process
1.) Download the dataflow file (JSON file Import_S3_To_Aurora_PostgreSQL.json)
2.) Create a new processor group. When creating this processor group, choose the following JSON file to upload.
Step 1: Choose processor group Step 2 : Step 3 : Finish upload.
3.) Install JDBC Driver
wget https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
mkdir /tmp/nifi
mv postgresql-42.7.3.jar /tmp/nifi/
4.) Set parameters in NiFi
Set ListS3 parameters
S3 Access Key set
The values input was protected as access key/values are sensitive values. Only "Sensitive value set" be shown.
4.2) Start the controllver service AWSCredentialsProviderControllerService (for saving AWS sensitive values)
4.3) Start the CSVReader Controller service
4.4) Start the JDBC Connection pool(DBCPConnectionPool-postgreSQL)service
5.1) Fix the Bucket,Prefix values
5.2) Fix the PostgreSQL table name for INSERT PutDatabaseRecord setting:
6.) Start the processors
7.) Check the provenance:
... View more
Labels:
03-14-2024
07:44 PM
1 Kudo
Purpose: Access CDW Iceberg table with Snowflake
Architecture
Image source: Configure an external volume for Iceberg tables
Initiate
Configure an External Volume in Snowflake
Step 1: Configure access permissions for the S3 bucket
Create a policy in AWS IAM. I created a policy called zzeng-Snowflake-ext: {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::<my-bucket>/data/zzeng/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<my-bucket>",
"Condition": {
"StringLike": {
"s3:prefix": [
"data/zzeng/*"
]
}
}
}
]
}
Set it in the AWS console
Step 2: Create an IAM role in AWS
zzeng-Snowflake-ext-role:
Step 3: Grant privileges required for SSE-KMS encryption to the IAM role (optional)
(Ref: Grant privileges required for SSE-KMS encryption to the IAM role (optional))
Skip.
Step 4: Create an external volume in Snowflake
SQL in Snowflake:
CREATE OR REPLACE EXTERNAL VOLUME extIcebergVolC
STORAGE_LOCATIONS =
(
(
NAME = 'zzeng-iceberg-se-s3-ap-northeast-1'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://<my-bucket>/data/zzeng/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<my-AWS-id*****>:role/zzeng-Snowflake-ext-role'
)
);
Step 5: Retrieve the AWS IAM user for your Snowflake account
DESC EXTERNAL VOLUME extIcebergVolC;
Result:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>DESC EXTERNAL VOLUME extIcebergVolC;
+-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
| parent_property | property | property_type | property_value | property_default |
|-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------|
| | ALLOW_WRITES | Boolean | true | true |
| STORAGE_LOCATIONS | STORAGE_LOCATION_1 | String | {"NAME":"zzeng-iceberg-se-s3-ap-northeast-1","STORAGE_PROVIDER":"S3","STORAGE_BASE_URL":"s3://<my-bucket-id>/data/zzeng/","STORAGE_ALLOWED_LOCATIONS":["s3://<my-bucket-id>/data/zzeng/*"],"STORAGE_REGION":"us-east-2","PRIVILEGES_VERIFIED":true,"STORAGE_AWS_ROLE_ARN":"<STORAGE_AWS_ROLE_ARN>","STORAGE_AWS_IAM_USER_ARN":"<STORAGE_AWS_ROLE_ARN>","STORAGE_AWS_EXTERNAL_ID":"<a long string for STORAGE_AWS_EXTERNAL_ID>","ENCRYPTION_TYPE":"NONE","ENCRYPTION_KMS_KEY_ID":""} | |
| STORAGE_LOCATIONS | ACTIVE | String | zzeng-iceberg-se-s3-ap-northeast-1 | |
+-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
3 Row(s) produced. Time Elapsed: 0.949s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
Take a memo for STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID.
Step 6: Grant the IAM user permissions to access bucket objects
Modify the trustship in IAM, use the value in Step 5: {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<snowflake_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<snowflake_external_id>"
}
}
}
]
}
Create an Iceberg Table in CDW
Create folder: s3a://${my-test-bucket}/data/${user_id}/airlines/airlines
use my own value as the ${my-test-bucket} and ${user_id}: CREATE DATABASE ${user_id}_airlines_ice;
drop table if exists ${user_id}_airlines_ice.airlines;
CREATE EXTERNAL TABLE ${user_id}_airlines_ice.airlines (code string, description string)
STORED BY ICEBERG
STORED AS PARQUET
LOCATION 's3a://${cdp_env_bucket}/data/${user_id}/airlines/airlines'
tblproperties("format-version"="2",'external.table.purge'='true');
INSERT INTO ${user_id}_airlines_ice.airlines
SELECT * FROM ${user_id}_airlines_csv.airlines_csv;
select * from ${user_id}_airlines_ice.airlines;
select count(*) from ${user_id}_airlines_ice.airlines;
Check my DDL:
Create an Iceberg Table in Snowflake, pointing to the exist table in CDW
SQL:
CREATE OR REPLACE ICEBERG TABLE airlines
CATALOG='zzengIcebergCatalogInt'
EXTERNAL_VOLUME='extIcebergVolC'
BASE_LOCATION='airlines/airlines'
METADATA_FILE_PATH='metadata/00001-7e28a998-42f9-4466-8884-32d450af5c85.metadata.json'
;
Check the value
In Snowflake::snowflake: zzeng#COMPUTE_WH@ZZENG.PUBLIC>select count(*) from AIRLINES;
+----------+
| COUNT(*) |
|----------|
| 1491 |
+----------+
1 Row(s) produced. Time Elapsed: 0.393s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>select * from AIRLINES limit 3;
+------+--------------------+
| CODE | DESCRIPTION |
|------+--------------------|
| 02Q | Titan Airways |
| 04Q | Tradewind Aviation |
| 05Q | "Comlux Aviation |
+------+--------------------+
3 Row(s) produced. Time Elapsed: 4.705s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
In CDW Hive HUE: select count(*) from ${user_id}_airlines_ice.airlines;
Modify Data
INSERT DATA
Snowflake can’t insert into an external Iceberg table
Manage an Iceberg table
Iceberg tables
It is mentioned that You can use INSERT and UPDATE statements to modify an Iceberg table that uses Snowflake as the catalog, I got an Error in Snowflake when inserting it into the Iceberg table:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>INSERT INTO airlines (code, description) VALUES
('A1', 'Airline 1 Description'),
('A2', 'Airline 2 Description'),
('A3', 'Airline 3 Description'),
('A4', 'Airline 4 Description'),
('A5', 'Airline 5 Description'),
('A6', 'Airline 6 Description'),
('A7', 'Airline 7 Description'),
('A8', 'Airline 8 Description'),
('A9', 'Airline 9 Description'),
('A10', 'Airline 10 Description'),
('A11', 'Airline 11 Description'),
('A12', 'Airline 12 Description'),
('A13', 'Airline 13 Description'),
('A14', 'Airline 14 Description'),
('A15', 'Airline 15 Description'),
('A16', 'Airline 16 Description'),
('A17', 'Airline 17 Description'),
('A18', 'Airline 18 Description'),
('A19', 'Airline 19 Description'),
('A20', 'Airline 20 Description');
091357 (42601): SQL Compilation error: Iceberg table AIRLINES with an external catalog integration is a read-only table and cannot be modified
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
Hive Insert
INSERT INTO `zzeng_airlines_ice`.`airlines` (code, description) VALUES
('A1', 'Airline 1 Description'),
('A2', 'Airline 2 Description'),
('A3', 'Airline 3 Description'),
('A4', 'Airline 4 Description'),
('A5', 'Airline 5 Description'),
('A6', 'Airline 6 Description'),
('A7', 'Airline 7 Description'),
('A8', 'Airline 8 Description'),
('A9', 'Airline 9 Description'),
('A10', 'Airline 10 Description'),
('A11', 'Airline 11 Description'),
('A12', 'Airline 12 Description'),
('A13', 'Airline 13 Description'),
('A14', 'Airline 14 Description'),
('A15', 'Airline 15 Description'),
('A16', 'Airline 16 Description'),
('A17', 'Airline 17 Description'),
('A18', 'Airline 18 Description'),
('A19', 'Airline 19 Description'),
('A20', 'Airline 20 Description');
Refresh the metadata for an Iceberg table created from files in object storage
Issue: Hive changed the Iceberg table, but after that, even when I ran the Snowflake query, the data was not updated:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>select count(*) from AIRLINES;
+----------+
| COUNT(*) |
|----------|
| 1491 |
+----------+
1 Row(s) produced. Time Elapsed: 0.678s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
In Snowflake, The metadata files do not identify the most recent snapshot of an Iceberg table.
Ref URL: Iceberg tables
I have to alter my Snowflake Iceberg table definition manually:
Then refresh the metadata:
ALTER ICEBERG TABLE AIRLINES REFRESH 'metadata/00002-c33ae888-1af3-4f64-830b-eac9e0a95983.metadata.json';
Result:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>ALTER ICEBERG TABLE AIRLINES REFRESH 'metadata/00002-c33ae888-1af3-4f64-830b-eac9e0a95983.metadata.json';
+----------------------------------+
| status |
|----------------------------------|
| Statement executed successfully. |
+----------------------------------+
1 Row(s) produced. Time Elapsed: 10.199s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>select count(*) from AIRLINES;
+----------+
| COUNT(*) |
|----------|
| 1511 |
+----------+
1 Row(s) produced. Time Elapsed: 0.204s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
Considerations and limitations
Iceberg tables
The following is an excerpt from Snowflake documents, where considerations and limitations apply to Iceberg tables, and are subject to change:
Iceberg
Versions 1 and 2 of the Apache Iceberg specification are supported, excluding the following features:
Row-level deletes (either position deletes or equality deletes).
Using the history.expire.min-snapshots-to-keep table property to specify the default minimum number of snapshots to keep. For more information, see Metadata and snapshots.
Iceberg partitioning with the bucket transform function impacts performance for queries that use conditional clauses to filter results.
Iceberg tables created from files in object storage aren’t supported if the following conditions are true:
The table contains a partition spec that defines an identity transform.
The source column of the partition spec does not exist in a Parquet file.
For Iceberg tables that are not managed by Snowflake, time travel to any snapshot generated after table creation is supported as long as you periodically refresh the table before the snapshot expires.
File formats
Support is limited to Apache Parquet files.
Parquet files that use the unsigned integer logical type are not supported.
External volumes
You must access the cloud storage locations in external volumes using direct credentials. Storage integrations are not supported.
The trust relationship must be configured separately for each external volume that you create.
Metadata files
The metadata files do not identify the most recent snapshot of an Iceberg table.
You cannot modify the location of the data files or snapshot using the ALTER ICEBERG TABLE command. To modify either of these settings, you must recreate the table (using the CREATE OR REPLACE ICEBERG TABLE syntax).
... View more
03-05-2024
02:09 AM
Goal : Use CLI to create / import a dataflow in CDF-PC
Then, run Teraform to create/import Dataflow into CDF-PC.
Process :
Install CDP CLI (I am using MacOS, so I install cdpcli in this way. pip3 install cdpcli Then confirm it's installed correctly [zzeng@zeng-mbp ~]$ ll ~/Library/Python/3.9/bin/ | grep cdp
-rwxr-xr-x@ 1 zzeng staff 250 Mar 4 12:22 cdp
-rwxr-xr-x@ 1 zzeng staff 250 Mar 4 12:22 cdp_completer
[zzeng@zeng-mbp ~]$ export PATH="$HOME/Library/Python/3.9/bin:$PATH"
[zzeng@zeng-mbp ~]$ cdp --version
0.9.107
[zzeng@zeng-mbp ~]$
Configure CLI: (Reference Link) from [Management Console] ->[User] -> [Profile]
Then I can find your API key here. If it does not exist, create one.
Then, configure the API key in my MacOS (Reference): cdp configure
After this config, check the latest status: [zzeng@zeng-mbp ~]$ cdp iam get-user
{
"user": {
"userId": *****
"status": "ACTIVE",
"workloadPasswordDetails": {
"isPasswordSet": true
}
}
}
Use CLI to create dataflow: cdp df import-flow-definition \
--name "zzeng2-fetch_from_S3_folder" \
--file "/<<PATH_TO_UPDATE>>/fetch_from_S3_folder.json" \
--comments "Initial Version" Example: $ cdp df import-flow-definition --name "zzeng-fetch_from_S3_folder" --description "Description for this flow" --file "/Users/zzeng/Library/CloudStorage/OneDrive-Personal/38_CLDR_Docs/50_demo/FetchFromS3Folder/fetch_from_S3_folder.json" --comments "Initial Version"
{
"crn": "crn:cdp:df:us-west-1:******:flow:zzeng-fetch_from_S3_folder",
"name": "zzeng-fetch_from_S3_folder",
"versionCount": 1,
"createdTimestamp": 1709632435790,
"description": "Description for this flow",
"modifiedTimestamp": 1709632435790,
"versions": [
{
"crn": "crn:cdp:df:us-west-1:******:flow:zzeng-fetch_from_S3_folder/v.1",
"bucketIdentifier": "https://s3.us-west-2.amazonaws.com/*****.cloudera.com/******",
"author": "Zhen Zeng",
"version": 1,
"timestamp": 1709632435792,
"deploymentCount": 0,
"comments": "Initial Version",
"draftCount": 0,
"tags": []
}
]
}
Deploy the flow to an environment:
We can use a wizard to do this or use the wizard to generate the command.
Result:
$ cdp df create-deployment \
> --service-crn crn:cdp:df:us-west-1:******:service:***** \
> --flow-version-crn "crn:cdp:df:us-west-1:*****:flow:zzeng-fetch_from_S3_folder/v.1" \
> --deployment-name "zzeng-deploy-01" \
> --project-crn "crn:cdp:df:us-west-1:****:project:*****" \
> --cfm-nifi-version 1.24.0.2.3.13.0-9 \
> --auto-start-flow \
> --cluster-size-name EXTRA_SMALL \
> --static-node-count 1 \
> --no-auto-scaling-enabled
{
"deploymentCrn": "crn:cdp:df:us-west-1:******:deployment:*****/*****"
}
Reference:
Update Deployment
mc cli client setup
cdf deploy flow cli
... View more
01-15-2024
02:11 AM
Update: In the latest NiFi, now we can directly connect "GenerateTableFetch" and and "ExecuteSQL" with a connection. NiFi is evolving 🙂
... View more
11-15-2023
06:28 PM
What's your OS version? If you use CentOS Stream release, you will get this error.
... View more
05-10-2023
05:56 AM
@zzeng Great article. Reach out to me on internal channels. I would love to show you my oracle to kudu demo, using kafka and schema registry.
... View more
02-03-2023
09:10 PM
After following above steps I'm still not able to start hiveserver2
... View more
01-12-2023
02:16 AM
2 Kudos
To install Apache NiFi on Macbook, do the following:
Download the Zip file from Apache nifi Downloads. Ensure that you download the latest version zip file Apache NiFi Binary 1.19.1 [OpenPGP] [SHA-256] [SHA-512].
Download it to MBP, and then unzip it:
chmod +x bin/nifi.sh
Run the following command for User Generate:
bin/nifi.sh set-single-user-credentials zzeng <my long password>
Run the following command to Start:
bin/nifi.sh start bin/nifi.sh status netstat -an | grep LIST
Check the 8443 port is LISTEN:
tcp4 0 0 127.0.0.1.8443 *.* LISTEN
Login from https://localhost:8443/nifi/
... View more
Labels:
11-13-2022
08:07 AM
https://lists.apache.org/thread/907n11xlvdmckp1045bspzloclthfqsh As NiFi is a pure Java/JVM application, we use Jython rather than
Python for ExecuteScript. This means that you can't import native
(CPython, e.g.) modules into your Jython scripts in ExecuteScript consider using ExecuteStreamCommand with a real Python
interpreter and script. I'm looking at Py4J to try and bridge the gap
... View more
04-19-2022
01:07 AM
Hello @SVK If your queries concerning Apache Airflow has been addressed, Feel free to mark the Post as Solved. If you have any further ask, Kindly share the same & we shall get back to you accordingly. Regards, Smarak
... View more