Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Put data from Parquet files into DynamoDB with NiFi

avatar
Explorer
Hello,

I want to integrate data into DynamoDB from Parquet files using NiFi (which I run in a Docker container). I fetch my files from AWS S3 using the ListS3 and FetchS3Object processors and then, as I understand it, convert the files to JSON using ConvertRecord and send the data using PutDynamoDB.

I've tried configuring the AvroSchemaRegistry, ParquetReader and JsonRecordSetWriter controllers, but I'm obviously doing it wrong... I've tried using an UpdateAttribute processor too but nothing works. I don't really understand if I have to add the schema and where to add it. Thanks to anyone who can help me!

1 ACCEPTED SOLUTION

avatar

The issue you are having is when you try to read the parquet file using the ParquetReader where its failing on the invalid column names containing the illegal character "-" . I dont know of a way you can address this in Nifi. You probably have to fix this before you consume through Nifi. You can use pandas dataframe in python to help you remove any illegal characters from column name as an example :

import pandas as pd

df =  pd.read_parquet('source.parquet', engine='fastparquet')
# replace hyphen with underscore in column names 
df.columns = df.columns.str.replace("-","_")
df.to_parquet("target.parquet",engine='fastparquet')

Its possible to do this through Nifi as well using ExecuteStreamCommand :

https://community.cloudera.com/t5/Support-Questions/Can-anyone-provide-an-example-of-a-python-script...

The steps will be like this:

1- Fetch Parquet from S3

2- Save to Staging area with certain filename using PutFile

3- Run ExecuteStreamCommand and pass filename and path to the py . The py script will rename columns as shown above and save final copy to target folder

4- Use FetchFile to get the final parquet file from target folder using the same filename

5- Convert Record

....

If that helps please accept solution.

Thanks

 

 

View solution in original post

7 REPLIES 7

avatar

Can you provide more details what the issue is  , what the error message you are getting if any and where and in which processor is causing the problem based on the provided input and expected output.

I have never used PutDynamoDB but here are some links that can help:

https://www.youtube.com/watch?si=ctBH-f-JOzAPgKAJ&embeds_referring_euri=https%3A%2F%2Fwww.google.com...

 

https://stackoverflow.com/questions/45840156/how-putdynamodb-works-in-nifi

 

 

 

avatar
Explorer

Thanks for the video! 🙂 It solved one of my problems: in fact, as I have a list of items to insert, I need to use PutDynamoDBRecord and not PutDynamoDB. So I can insert data after converting one of my Parquet files.
But I still have a problem with another file. Here's the error:

UTC ERROR
ConvertRecord[id=92018f18-018b-1000-fd6f-0a3466abe069] Failed to process
FlowFile[filename=mini_de_train.parquet]; will route to failure:
org.apache.avro.SchemaParseException: Illegal character in: A1BG-AS1

There are some characters that are not accepted (like "-" in "A1BG-AS1") so I've changed them all in the schema, the beginning of which is shown below (there are more than 18,000 columns):

AvroSchemaRegistry.png

So I tried to add it via an UpdateAttribute processor before the ConvertRecord where I put the name of the schema (de_train), and an AvroSchemaRegistry used by my JsonRecordSetWriter which calls this schema :

UpdateAttribute.pngConvertRecord.pngJsonRecordSetWriter.png

But after these modifications I still get the same error:

Erreur.png

What am I doing wrong?

avatar
Community Manager

@yan439 Has the reply helped resolve your issue? If so, please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. If you are still experiencing the issue, can you provide the information @samsal has requested? Thanks.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Explorer

Sorry I haven't had much time to visit the site in the last few days. 🙂

avatar

The issue you are having is when you try to read the parquet file using the ParquetReader where its failing on the invalid column names containing the illegal character "-" . I dont know of a way you can address this in Nifi. You probably have to fix this before you consume through Nifi. You can use pandas dataframe in python to help you remove any illegal characters from column name as an example :

import pandas as pd

df =  pd.read_parquet('source.parquet', engine='fastparquet')
# replace hyphen with underscore in column names 
df.columns = df.columns.str.replace("-","_")
df.to_parquet("target.parquet",engine='fastparquet')

Its possible to do this through Nifi as well using ExecuteStreamCommand :

https://community.cloudera.com/t5/Support-Questions/Can-anyone-provide-an-example-of-a-python-script...

The steps will be like this:

1- Fetch Parquet from S3

2- Save to Staging area with certain filename using PutFile

3- Run ExecuteStreamCommand and pass filename and path to the py . The py script will rename columns as shown above and save final copy to target folder

4- Use FetchFile to get the final parquet file from target folder using the same filename

5- Convert Record

....

If that helps please accept solution.

Thanks

 

 

avatar
Explorer

Thanks for your answer SAMSAL.

I was hoping to be able to use a processor directly to add my schema but if that's not possible, I'll use a script.
As well as changing the names of several columns, I also need to change the type of some of them, as some are of type "large_string" and one is of type "bool". I had this error for example when I tried to add the schema (retrieved with Python code from my Parquet file) to the ConvertRecord processor:

'schema-text' validated against '{
"type": "record",
"name": "de_train",
"fields": [
{
"name": "cell_type",
"type": "string"
},
{
"name": "sm_name",
"type": "string"
},
{
"name": "sm_lincs_id",
"type": "string"
},
{
"name": "SMILES",
"type": "string"
},
{
"name": "control",
"type": "bool"
},
{
"name": "A1BG",
"type": "double"
},
{
"name": "A1BG_AS1",
"type": "double"
},
{
"name": "A2M",
"type": "double"
},
{
"name": "A2M_AS1",
"type": "double"
},
{
"name": "A2MP1",
"type": "double"
}
]
}' is invalid because Not a valid Avro Schema: "bool" is not a defined name. The type of the "control" field must be a defined name or a {"type": ...} expression.

I had to change "large_string" to "string" and "bool" to "boolean" to get no more errors in the AvroSchemaRegistry.
So how do I change the types in a Parquet file? Is it possible to do this from the dataframe as well as for names?

avatar

@yan439,

Im not sure I understand. I thought you have the schema already defined in the registry with the correct column names and data types. Can you elaborate more on how the avro schema came about and if its the same thing you are using the in the registry?