Created on 05-19-2024 12:43 AM - edited on 05-21-2024 11:59 PM by VidyaSargur
Run SELECT to ingest data from Oracle 19c, and save the data into Azure ADLS Gen2 object storage, in Parquet format.
Make sure the Oracle 19c environment works well.
Prepare an Oracle table:
CREATE TABLE demo_sample (
column1 NUMBER,
column2 NUMBER,
column3 NUMBER,
column4 VARCHAR2(10),
column5 VARCHAR2(10),
column6 VARCHAR2(10),
column7 VARCHAR2(10),
column8 VARCHAR2(10),
column9 VARCHAR2(10),
column10 VARCHAR2(10),
column11 VARCHAR2(10),
column12 VARCHAR2(10),
CONSTRAINT pk_demo_sample PRIMARY KEY (column1, column2, column3, column4, column5, column6, column7, column8, column9)
);
Prepare 20000 records data:
import cx_Oracle
import random
# Oracleデータベース接続情報
dsn = cx_Oracle.makedsn("<your Oracle database>", 1521, service_name="PDB1")
connection = cx_Oracle.connect(user="<your user name>", password="<your password>", dsn=dsn)
# データ挿入関数
def insert_data():
cursor = connection.cursor()
sql = """
INSERT INTO demo_sample (
column1, column2, column3, column4, column5, column6,
column7, column8, column9, column10, column11, column12
) VALUES (
:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11, :12
)
"""
batch_size = 10000
data = []
for i in range(20000): # 2万件
record = (
random.randint(1, 1000),
random.randint(1, 1000),
random.randint(1, 1000),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10)),
''.join(random.choices('ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10))
)
data.append(record)
if len(data) == batch_size:
cursor.executemany(sql, data)
connection.commit()
data = []
if data:
cursor.executemany(sql, data)
connection.commit()
cursor.close()
# メイン処理
try:
insert_data()
finally:
connection.close()
This ExecuteSQLRecord uses two Service,
Download the Oracle JDBC Driver from here https://www.oracle.com/jp/database/technologies/appdev/jdbc-downloads.html
Save the jdbc driver here (or anywhere your nifi can access):
/Users/zzeng/Downloads/tools/Oracle_JDBC/ojdbc8-full/ojdbc8.jar
DBCPConnectionPool Properties:
We can use default settings here.
Key: azure.filename Value : ${uuid:append('.ext')}