Member since
01-15-2019
48
Posts
30
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1169 | 07-20-2021 01:05 AM | |
11264 | 11-28-2019 06:59 AM |
03-14-2024
07:44 PM
Purpose: Access CDW Iceberg table with Snowflake
Architecture
Image source: Configure an external volume for Iceberg tables
Initiate
Configure an External Volume in Snowflake
Step 1: Configure access permissions for the S3 bucket
Create a policy in AWS IAM. I created a policy called zzeng-Snowflake-ext: {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::<my-bucket>/data/zzeng/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::<my-bucket>",
"Condition": {
"StringLike": {
"s3:prefix": [
"data/zzeng/*"
]
}
}
}
]
}
Set it in the AWS console
Step 2: Create an IAM role in AWS
zzeng-Snowflake-ext-role:
Step 3: Grant privileges required for SSE-KMS encryption to the IAM role (optional)
(Ref: Grant privileges required for SSE-KMS encryption to the IAM role (optional))
Skip.
Step 4: Create an external volume in Snowflake
SQL in Snowflake:
CREATE OR REPLACE EXTERNAL VOLUME extIcebergVolC
STORAGE_LOCATIONS =
(
(
NAME = 'zzeng-iceberg-se-s3-ap-northeast-1'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://<my-bucket>/data/zzeng/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<my-AWS-id*****>:role/zzeng-Snowflake-ext-role'
)
);
Step 5: Retrieve the AWS IAM user for your Snowflake account
DESC EXTERNAL VOLUME extIcebergVolC;
Result:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>DESC EXTERNAL VOLUME extIcebergVolC;
+-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
| parent_property | property | property_type | property_value | property_default |
|-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------|
| | ALLOW_WRITES | Boolean | true | true |
| STORAGE_LOCATIONS | STORAGE_LOCATION_1 | String | {"NAME":"zzeng-iceberg-se-s3-ap-northeast-1","STORAGE_PROVIDER":"S3","STORAGE_BASE_URL":"s3://<my-bucket-id>/data/zzeng/","STORAGE_ALLOWED_LOCATIONS":["s3://<my-bucket-id>/data/zzeng/*"],"STORAGE_REGION":"us-east-2","PRIVILEGES_VERIFIED":true,"STORAGE_AWS_ROLE_ARN":"<STORAGE_AWS_ROLE_ARN>","STORAGE_AWS_IAM_USER_ARN":"<STORAGE_AWS_ROLE_ARN>","STORAGE_AWS_EXTERNAL_ID":"<a long string for STORAGE_AWS_EXTERNAL_ID>","ENCRYPTION_TYPE":"NONE","ENCRYPTION_KMS_KEY_ID":""} | |
| STORAGE_LOCATIONS | ACTIVE | String | zzeng-iceberg-se-s3-ap-northeast-1 | |
+-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+
3 Row(s) produced. Time Elapsed: 0.949s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
Take a memo for STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID.
Step 6: Grant the IAM user permissions to access bucket objects
Modify the trustship in IAM, use the value in Step 5: {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<snowflake_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<snowflake_external_id>"
}
}
}
]
}
Create an Iceberg Table in CDW
Create folder: s3a://${my-test-bucket}/data/${user_id}/airlines/airlines
use my own value as the ${my-test-bucket} and ${user_id}: CREATE DATABASE ${user_id}_airlines_ice;
drop table if exists ${user_id}_airlines_ice.airlines;
CREATE EXTERNAL TABLE ${user_id}_airlines_ice.airlines (code string, description string)
STORED BY ICEBERG
STORED AS PARQUET
LOCATION 's3a://${cdp_env_bucket}/data/${user_id}/airlines/airlines'
tblproperties("format-version"="2",'external.table.purge'='true');
INSERT INTO ${user_id}_airlines_ice.airlines
SELECT * FROM ${user_id}_airlines_csv.airlines_csv;
select * from ${user_id}_airlines_ice.airlines;
select count(*) from ${user_id}_airlines_ice.airlines;
Check my DDL:
Create an Iceberg Table in Snowflake, pointing to the exist table in CDW
SQL:
CREATE OR REPLACE ICEBERG TABLE airlines
CATALOG='zzengIcebergCatalogInt'
EXTERNAL_VOLUME='extIcebergVolC'
BASE_LOCATION='airlines/airlines'
METADATA_FILE_PATH='metadata/00001-7e28a998-42f9-4466-8884-32d450af5c85.metadata.json'
;
Check the value
In Snowflake::snowflake: zzeng#COMPUTE_WH@ZZENG.PUBLIC>select count(*) from AIRLINES;
+----------+
| COUNT(*) |
|----------|
| 1491 |
+----------+
1 Row(s) produced. Time Elapsed: 0.393s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>select * from AIRLINES limit 3;
+------+--------------------+
| CODE | DESCRIPTION |
|------+--------------------|
| 02Q | Titan Airways |
| 04Q | Tradewind Aviation |
| 05Q | "Comlux Aviation |
+------+--------------------+
3 Row(s) produced. Time Elapsed: 4.705s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
In CDW Hive HUE: select count(*) from ${user_id}_airlines_ice.airlines;
Modify Data
INSERT DATA
Snowflake can’t insert into an external Iceberg table
Manage an Iceberg table
Iceberg tables
It is mentioned that You can use INSERT and UPDATE statements to modify an Iceberg table that uses Snowflake as the catalog, I got an Error in Snowflake when inserting it into the Iceberg table:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>INSERT INTO airlines (code, description) VALUES
('A1', 'Airline 1 Description'),
('A2', 'Airline 2 Description'),
('A3', 'Airline 3 Description'),
('A4', 'Airline 4 Description'),
('A5', 'Airline 5 Description'),
('A6', 'Airline 6 Description'),
('A7', 'Airline 7 Description'),
('A8', 'Airline 8 Description'),
('A9', 'Airline 9 Description'),
('A10', 'Airline 10 Description'),
('A11', 'Airline 11 Description'),
('A12', 'Airline 12 Description'),
('A13', 'Airline 13 Description'),
('A14', 'Airline 14 Description'),
('A15', 'Airline 15 Description'),
('A16', 'Airline 16 Description'),
('A17', 'Airline 17 Description'),
('A18', 'Airline 18 Description'),
('A19', 'Airline 19 Description'),
('A20', 'Airline 20 Description');
091357 (42601): SQL Compilation error: Iceberg table AIRLINES with an external catalog integration is a read-only table and cannot be modified
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
Hive Insert
INSERT INTO `zzeng_airlines_ice`.`airlines` (code, description) VALUES
('A1', 'Airline 1 Description'),
('A2', 'Airline 2 Description'),
('A3', 'Airline 3 Description'),
('A4', 'Airline 4 Description'),
('A5', 'Airline 5 Description'),
('A6', 'Airline 6 Description'),
('A7', 'Airline 7 Description'),
('A8', 'Airline 8 Description'),
('A9', 'Airline 9 Description'),
('A10', 'Airline 10 Description'),
('A11', 'Airline 11 Description'),
('A12', 'Airline 12 Description'),
('A13', 'Airline 13 Description'),
('A14', 'Airline 14 Description'),
('A15', 'Airline 15 Description'),
('A16', 'Airline 16 Description'),
('A17', 'Airline 17 Description'),
('A18', 'Airline 18 Description'),
('A19', 'Airline 19 Description'),
('A20', 'Airline 20 Description');
Refresh the metadata for an Iceberg table created from files in object storage
Issue: Hive changed the Iceberg table, but after that, even when I ran the Snowflake query, the data was not updated:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>select count(*) from AIRLINES;
+----------+
| COUNT(*) |
|----------|
| 1491 |
+----------+
1 Row(s) produced. Time Elapsed: 0.678s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
In Snowflake, The metadata files do not identify the most recent snapshot of an Iceberg table.
Ref URL: Iceberg tables
I have to alter my Snowflake Iceberg table definition manually:
Then refresh the metadata:
ALTER ICEBERG TABLE AIRLINES REFRESH 'metadata/00002-c33ae888-1af3-4f64-830b-eac9e0a95983.metadata.json';
Result:
zzeng#COMPUTE_WH@ZZENG.PUBLIC>ALTER ICEBERG TABLE AIRLINES REFRESH 'metadata/00002-c33ae888-1af3-4f64-830b-eac9e0a95983.metadata.json';
+----------------------------------+
| status |
|----------------------------------|
| Statement executed successfully. |
+----------------------------------+
1 Row(s) produced. Time Elapsed: 10.199s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>select count(*) from AIRLINES;
+----------+
| COUNT(*) |
|----------|
| 1511 |
+----------+
1 Row(s) produced. Time Elapsed: 0.204s
zzeng#COMPUTE_WH@ZZENG.PUBLIC>
Considerations and limitations
Iceberg tables
The following is an excerpt from Snowflake documents, where considerations and limitations apply to Iceberg tables, and are subject to change:
Iceberg
Versions 1 and 2 of the Apache Iceberg specification are supported, excluding the following features:
Row-level deletes (either position deletes or equality deletes).
Using the history.expire.min-snapshots-to-keep table property to specify the default minimum number of snapshots to keep. For more information, see Metadata and snapshots.
Iceberg partitioning with the bucket transform function impacts performance for queries that use conditional clauses to filter results.
Iceberg tables created from files in object storage aren’t supported if the following conditions are true:
The table contains a partition spec that defines an identity transform.
The source column of the partition spec does not exist in a Parquet file.
For Iceberg tables that are not managed by Snowflake, time travel to any snapshot generated after table creation is supported as long as you periodically refresh the table before the snapshot expires.
File formats
Support is limited to Apache Parquet files.
Parquet files that use the unsigned integer logical type are not supported.
External volumes
You must access the cloud storage locations in external volumes using direct credentials. Storage integrations are not supported.
The trust relationship must be configured separately for each external volume that you create.
Metadata files
The metadata files do not identify the most recent snapshot of an Iceberg table.
You cannot modify the location of the data files or snapshot using the ALTER ICEBERG TABLE command. To modify either of these settings, you must recreate the table (using the CREATE OR REPLACE ICEBERG TABLE syntax).
... View more
03-05-2024
02:09 AM
Goal : Use CLI to create / import a dataflow in CDF-PC
Then, run Teraform to create/import Dataflow into CDF-PC.
Process :
Install CDP CLI (I am using MacOS, so I install cdpcli in this way. pip3 install cdpcli Then confirm it's installed correctly [zzeng@zeng-mbp ~]$ ll ~/Library/Python/3.9/bin/ | grep cdp
-rwxr-xr-x@ 1 zzeng staff 250 Mar 4 12:22 cdp
-rwxr-xr-x@ 1 zzeng staff 250 Mar 4 12:22 cdp_completer
[zzeng@zeng-mbp ~]$ export PATH="$HOME/Library/Python/3.9/bin:$PATH"
[zzeng@zeng-mbp ~]$ cdp --version
0.9.107
[zzeng@zeng-mbp ~]$
Configure CLI: (Reference Link) from [Management Console] ->[User] -> [Profile]
Then I can find your API key here. If it does not exist, create one.
Then, configure the API key in my MacOS (Reference): cdp configure
After this config, check the latest status: [zzeng@zeng-mbp ~]$ cdp iam get-user
{
"user": {
"userId": *****
"status": "ACTIVE",
"workloadPasswordDetails": {
"isPasswordSet": true
}
}
}
Use CLI to create dataflow: cdp df import-flow-definition \
--name "zzeng2-fetch_from_S3_folder" \
--file "/<<PATH_TO_UPDATE>>/fetch_from_S3_folder.json" \
--comments "Initial Version" Example: $ cdp df import-flow-definition --name "zzeng-fetch_from_S3_folder" --description "Description for this flow" --file "/Users/zzeng/Library/CloudStorage/OneDrive-Personal/38_CLDR_Docs/50_demo/FetchFromS3Folder/fetch_from_S3_folder.json" --comments "Initial Version"
{
"crn": "crn:cdp:df:us-west-1:******:flow:zzeng-fetch_from_S3_folder",
"name": "zzeng-fetch_from_S3_folder",
"versionCount": 1,
"createdTimestamp": 1709632435790,
"description": "Description for this flow",
"modifiedTimestamp": 1709632435790,
"versions": [
{
"crn": "crn:cdp:df:us-west-1:******:flow:zzeng-fetch_from_S3_folder/v.1",
"bucketIdentifier": "https://s3.us-west-2.amazonaws.com/*****.cloudera.com/******",
"author": "Zhen Zeng",
"version": 1,
"timestamp": 1709632435792,
"deploymentCount": 0,
"comments": "Initial Version",
"draftCount": 0,
"tags": []
}
]
}
Deploy the flow to an environment:
We can use a wizard to do this or use the wizard to generate the command.
Result:
$ cdp df create-deployment \
> --service-crn crn:cdp:df:us-west-1:******:service:***** \
> --flow-version-crn "crn:cdp:df:us-west-1:*****:flow:zzeng-fetch_from_S3_folder/v.1" \
> --deployment-name "zzeng-deploy-01" \
> --project-crn "crn:cdp:df:us-west-1:****:project:*****" \
> --cfm-nifi-version 1.24.0.2.3.13.0-9 \
> --auto-start-flow \
> --cluster-size-name EXTRA_SMALL \
> --static-node-count 1 \
> --no-auto-scaling-enabled
{
"deploymentCrn": "crn:cdp:df:us-west-1:******:deployment:*****/*****"
}
Reference:
Update Deployment
mc cli client setup
cdf deploy flow cli
... View more
01-15-2024
02:11 AM
Update: In the latest NiFi, now we can directly connect "GenerateTableFetch" and and "ExecuteSQL" with a connection. NiFi is evolving 🙂
... View more
11-15-2023
06:28 PM
What's your OS version? If you use CentOS Stream release, you will get this error.
... View more
05-09-2023
04:08 PM
1 Kudo
NiFi - Oracle Connect
Processor: ExecuteSQL
1) Prepare Oracle Data (Optional)
Prepare the data, and create the Oracle table:
CREATE TABLE USER_MASTER
(
USER_ID VARCHAR2(8) NOT NULL,
DEPT_NO VARCHAR2(8),
USER_NAME VARCHAR2(32),
CREATED_ON DATE DEFAULT SYSDATE,
MODIFIED_ON DATE,
CONSTRAINT pk_USER_MASTER PRIMARY KEY(USER_ID)
);
Insert the data into Oracle Table:
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 1 Ro','1001','1');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 2 Ro','1001','2');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 3 Ro','1001','3');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 4 Ro','1001','4');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 5 Ro','1001','5');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 6 Ro','1001','6');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 7 Ro','1001','7');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 8 Ro','1001','8');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 9 Ro','1001','9');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 10 Ro','1001','10');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 11 Ro','1001','11');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 12 Ro','1001','12');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 13 Ro','1001','13');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 14 Ro','1001','14');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 15 Ro','1001','15');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 16 Ro','1001','16');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 17 Ro','1001','17');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 18 Ro','1001','18');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 19 Ro','1001','19');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 20 Ro','1001','20');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 21 Ro','1001','21');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 22 Ro','1001','22');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 23 Ro','1001','23');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 24 Ro','1001','24');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 25 Ro','1001','25');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 26 Ro','1001','26');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 27 Ro','1001','27');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 28 Ro','1001','28');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 29 Ro','1001','29');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 30 Ro','1001','30');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 31 Ro','1001','31');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 32 Ro','1001','32');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 33 Ro','1001','33');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 34 Ro','1001','34');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 35 Ro','1001','35');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 36 Ro','1001','36');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 37 Ro','1001','37');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 38 Ro','1001','38');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 39 Ro','1001','39');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 40 Ro','1001','40');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 41 Ro','1001','41');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 42 Ro','1001','42');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 43 Ro','1001','43');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 44 Ro','1001','44');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 45 Ro','1001','45');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 46 Ro','1001','46');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 47 Ro','1001','47');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 48 Ro','1001','48');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 49 Ro','1001','49');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 50 Ro','1001','50');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 51 Ro','1001','51');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 52 Ro','1001','52');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 53 Ro','1001','53');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 54 Ro','1001','54');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 55 Ro','1001','55');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 56 Ro','1001','56');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 57 Ro','1001','57');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 58 Ro','1001','58');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 59 Ro','1001','59');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 60 Ro','1001','60');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 61 Ro','1001','61');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 62 Ro','1001','62');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 63 Ro','1001','63');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 64 Ro','1001','64');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 65 Ro','1001','65');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 66 Ro','1001','66');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 67 Ro','1001','67');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 68 Ro','1001','68');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 69 Ro','1001','69');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 70 Ro','1001','70');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 71 Ro','1001','71');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 72 Ro','1001','72');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 73 Ro','1001','73');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 74 Ro','1001','74');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 75 Ro','1001','75');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 76 Ro','1001','76');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 77 Ro','1001','77');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 78 Ro','1001','78');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 79 Ro','1001','79');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 80 Ro','1001','80');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 81 Ro','1001','81');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 82 Ro','1001','82');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 83 Ro','1001','83');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 84 Ro','1001','84');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 85 Ro','1001','85');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 86 Ro','1001','86');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 87 Ro','1001','87');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 88 Ro','1001','88');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 89 Ro','1001','89');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 90 Ro','1001','90');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 91 Ro','1001','91');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 92 Ro','1001','92');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 93 Ro','1001','93');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 94 Ro','1001','94');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 95 Ro','1001','95');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 96 Ro','1001','96');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 97 Ro','1001','97');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 98 Ro','1001','98');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 99 Ro','1001','99');
INSERT INTO USER_MASTER (USER_NAME, DEPT_NO, USER_ID) VALUES ('Hayashi 100 Ro','1001','100');
2) Add Processor
Add an ExecuteSQL Processor :
With the following input parameters:
select * from user_master
Create the DBCPConnectionPool Controller Service:
jdbc:oracle:thin:@zzeng-*******.com:1521:svcname
oracle.jdbc.driver.OracleDriver
/opt/nifi/jdbc/ojdbc8.jar
Be careful about the "svcname" above, you must input the correct Oracle service name.
Download JDBC Driver:
To get the ojdbc8.jar,
go to https://www.oracle.com/database/technologies/appdev/jdbc-downloads.html
wget https://download.oracle.com/otn-pub/otn_software/jdbc/219/ojdbc8-full.tar.gz
tar xvf ojdbc8-full.tar.gz
sudo mkdir -p /opt/nifi/jdbc/
sudo chown nifi:nifi /opt/nifi/jdbc/
sudo chmod 755 /opt/nifi/jdbc/
cd /opt/nifi/jdbc/
sudo cp /home/centos/oracle-connect/ojdbc8.jar .
3) Run
Run once:
Check Provenance
4) Write to HDFS
Hadoop Configuration Resources:
/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/core-site.xml
Directory:
/tmp/nifi/
Check result:
[centos@cdp conf]$ hdfs dfs -ls /tmp/nifi/
Found 2 items
-rw-r--r-- 1 nifi supergroup 5268 2023-05-10 04:52 /tmp/nifi/78338e0b-27b4-4b44-b406-c5b1cada12eb
-rw-r--r-- 1 nifi supergroup 5268 2023-05-10 05:16 /tmp/nifi/ca5756c4-0e9e-480c-abf5-050b731493fb
[centos@cdp conf]$
5) Write to Hive External Table
1)Get data from Oracle(Avro Format output) ExecuteSQL Processor 2)Convert from Avro to Parquet
Use ConvertAvroToParquet Processor 3)Write Parquet data to the external Hive Table
PuTHDFS
Prepare Hive Table:
CREATE EXTERNAL TABLE DEMO.USER_MASTER
(
USER_ID STRING,
DEPT_NO STRING,
USER_NAME STRING,
CREATED_ON STRING ,
MODIFIED_ON STRING
)
STORED AS PARQUET
LOCATION '/tmp/data/parquet';
... View more
Labels:
01-12-2023
02:16 AM
2 Kudos
To install Apache NiFi on Macbook, do the following:
Download the Zip file from Apache nifi Downloads. Ensure that you download the latest version zip file Apache NiFi Binary 1.19.1 [OpenPGP] [SHA-256] [SHA-512].
Download it to MBP, and then unzip it:
chmod +x bin/nifi.sh
Run the following command for User Generate:
bin/nifi.sh set-single-user-credentials zzeng <my long password>
Run the following command to Start:
bin/nifi.sh start bin/nifi.sh status netstat -an | grep LIST
Check the 8443 port is LISTEN:
tcp4 0 0 127.0.0.1.8443 *.* LISTEN
Login from https://localhost:8443/nifi/
... View more
Labels:
11-13-2022
08:07 AM
https://lists.apache.org/thread/907n11xlvdmckp1045bspzloclthfqsh As NiFi is a pure Java/JVM application, we use Jython rather than
Python for ExecuteScript. This means that you can't import native
(CPython, e.g.) modules into your Jython scripts in ExecuteScript consider using ExecuteStreamCommand with a real Python
interpreter and script. I'm looking at Py4J to try and bridge the gap
... View more
04-10-2022
10:47 PM
The latest document about Airflow in Data Engineering Experience (CDE) within Public Cloud and CDP Private Cloud Data Service is here: CDP Public Cloud https://docs.cloudera.com/data-engineering/cloud/orchestrate-workflows/topics/cde-airflow-dag-pipeline.html CDP Private Cloud Data Service https://docs.cloudera.com/data-engineering/1.3.3/orchestrate-workflows/topics/cde-airflow-dag-pipeline.html
... View more
07-20-2021
01:05 AM
Sorry for my late reply. > what AMI are you using and were there any pre-requisites for OS ? I used the CentOS 7.8, like the matrix showed. https://supportmatrix.cloudera.com/#Cloudera To download the CDP, if you are not paying for CDP, you can try it from here. https://docs.cloudera.com/cdp-private-cloud/latest/release-guide/topics/cdpdc-trial-download-information.html The trial version has the same functions with normal CDP but can only use 60 days. Please have a try and let me know if it works.
... View more
07-09-2021
12:01 AM
Hi Sam, I am also use CDP on AWS for learning. If you don't have enough AWS credit budget, you can use 3 nodes in this way, Use these 3 nodes as both Master and worker nodes. Means put NameNode and DataNode in these nodes. Put DataNode in all the 3 nodes.
... View more