Member since
08-21-2018
8
Posts
0
Kudos Received
0
Solutions
11-03-2019
05:29 PM
Hi Sadly I have still not solved this issue so hopefully some more information and a list of what I have been trying without success will help. Below is the json file in my flow file {
"Revenue_Label": "Dining Room",
"StoreName": "STORE A",
"Revenue_Id": "1",
"Alteration_Flag": "False",
"Order_Mode_Label": "DriveThru",
"checkClosetime": "2019-10-24T13:43:19+13:00",
"Alterations": [ {
"Alteration_Product_Code": "211136",
"Alteration_Product_Net_Amount": 0.0,
"Altered_Product_Code": "211135",
"Alteration_Product_Amount": 0.0,
"Altered_Product_Name": "Burger",
"Alteration_Product_Name": "Add Sauce",
"Alteration_Product_Qty": 1.0 } ],
"StoreId": "1234",
"dob": "20191024",
"Order_Mode_Id": "3",
"checknumber": "54321"} Below is the schema I am now using [
{ "mode" : "NULLABLE" , "name" : "Revenue_Label" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "StoreName" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Revenue_Id" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Alteration_Flag" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Order_Mode_Label" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "checkClosetime" , "type" : "TIMESTAMP" },
{ "mode" : "REPEATED" ,
"name" : "Altertions" ,
"type" : "RECORD" ,
"fields" : [
{ "mode" : "NULLABLE" , "name" : "Alteration_Product_Code" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Alteration_Product_Net_Amount" , "type" : "FLOAT" },
{ "mode" : "NULLABLE" , "name" : "Altered_Product_Code" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Alteration_Product_Amount" , "type" : "FLOAT" },
{ "mode" : "NULLABLE" , "name" : "Altered_Product_Name" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Alteration_Product_Name" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Alteration_Product_Qty" , "type" : "FLOAT" }
]
},
{ "mode" : "NULLABLE" , "name" : "StoreId" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "dob" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "Order_Mode_Id" , "type" : "STRING" },
{ "mode" : "NULLABLE" , "name" : "checknumber" , "type" : "STRING" }
] What I have tried: Removed all the white spaces and carriage return characters from the json flowflie Tried the array at the start and end of the json file and matched the BigQuery table to this. Changed the Names of the items to not include underscores. Built the insert one column at a time and only fails once the array is present. Used the schema definition generated from Google Cloud BigQuery directly when using the utility "bq show --schema --format=prettyjson. Defined the mode for each of the array items to REQUIRED instead of NULLABLE. Removed all the white spaces and carriage return characters from the schema definition in the properties of the PutBigQueryBatch process in Nifi Within Google BigQuery I created a table manually by importing the above json file and and having it auto detect the schema. Then pointed to this table in nifi Within Google BigQuery I created a table manually by importing the above json file and and having it defining the schema myself using the above definition. Then pointed to this table in nifi All resulted in getting the same error as stated above Note Elsewhere in my nifi process I successfully use the PutBigQueryBatch process and update the table. The difference being that json does not have an array present. I really am at loss on how to solve this issue. Thanks for any help forthcoming Tim
... View more
10-21-2019
03:44 PM
HI I am using the nifi processor PutBigQueryBatch to put data into a Google Cloud BigQuery table. However, I am getting the below error with regards to the schema I have defined in the properties Below is the schema definition I have in the properties of the process [ { "name": "checknumber", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "Altertions", "type": "RECORD", "mode": "REPEATED", "fields": [ { "name": "Alteration_Product_Qty", "type": "FLOAT", "mode": "NULLABLE" }, { "name": "Alteration_Product_Name", "type": "STRING", "mode": "NULLABLE" }, { "name": "Altered_Product_Name", "type": "STRING", "mode": "NULLABLE" }, { "name": "Alteration_Product_Code", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "Altered_Product_Code", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "Alteration_Product_Net_Amount", "type": "FLOAT", "mode": "NULLABLE" }, { "name": "Alteration_Product_Amount", "type": "FLOAT", "mode": "NULLABLE" } ] }, { "name": "dob", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "StoreId", "type": "STRING", "mode": "NULLABLE" }, { "name": "Order_Mode_Id", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "checkClosetime", "type": "TIMESTAMP", "mode": "NULLABLE" }, { "name": "Revenue_Id", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "Alteration_Flag", "type": "STRING", "mode": "NULLABLE" }, { "name": "Revenue_Label", "type": "STRING", "mode": "NULLABLE" }, { "name": "Order_Mode_Label", "type": "STRING", "mode": "NULLABLE" }, { "name": "StoreName", "type": "STRING", "mode": "NULLABLE" } ] and here is the definition of the table in BigQuery I think it has something to do with how I have defined the Alterations array in the properties but can't figure out what. Does anyone know how to define the array correctly in the schema definition in the properties of the PutBigQueryBatch process? Thanks Tim
... View more
Labels:
- Labels:
-
Apache NiFi
05-15-2019
02:25 AM
Hi All I managed to solve this issue by manually adding the property "User-Agent" to the invokehttp process Regards Tim
... View more
03-12-2019
10:45 PM
Hi I have an issue with an InvokeHttp Process. Overview: I have the below Nifi process whereby I create a dynamic URLs and call them using an InvokeHttp process. Issue: However, I am getting the below error from the InvokeHttp Process resulting in the and Json file not being retrieved. Note: I got this error when I first called the master URL from the GetHttp process at the start. I was able to solve the issue by defining the User Agent property within then GetHttp processor to be Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36 Question: Is it possible to define a User Agent property in the InvokeHttp Process? Is there some other way I am able to solve this issue? Hope that made sense Many Thanks Tim
... View more
Labels:
- Labels:
-
Apache NiFi
09-24-2018
12:21 PM
Hi All Being new to Nifi it has taken me a while to come up with a solution to this
issue. Seeing as there has been a few views on this question I thought I
would update members on what I have come up with and to see if there is any
feedback on how I could improve the solution regarding design and robustness. There are two parts to the design Part one: Create a logging solution Part two: Invoke the url from the new logging solution. Part One:Create a logging solution When I first call the Master URL I get the below Json file {"storeId": "136678",
"dob":"20180122",
"checks":[{"id": "20971531",
"printableId": "80001",
“IsClosed”= “true”,
"marker": 636704886835750777,
link": "https://abcd.com/136678/20180122/20971531"}
]}
Part One Steps InvokeHTTP: Call the Master URL EvaluateJson:. populates the StoreID and DOB
variables SplitJson: Split based on the $.checks array. EvaluateJson: Populate variables printableId, link, IsClosed UpdateAttribute: builds the filename
StoreID_DOB_ printableId.json, create variable ProcessedFlag and set it to “N”. RouteOnAttribute: Only process where IsClosed
= “true” PutHDFS: Resulting json file to directory
/apps/LinksProcessed with file name 136678_20180122_80001.json. Note: Conflict
Resolution Strategy = Ignore Resulting json File {"Link":"https://abcd.com/136678/20180122/20971531”,
"PrintableId":"80001",
"StoreId":"136678",
"DOB":"2010122",
"ProcessedFlag":“N"}
Part Two Steps ListHDFS: Returns the filename of the latest
files added to the /apps/LinksProcessed directory. In this example 136678_20180122_80001.json FetchHDFS: Fetches the json files for the
filenames returned from step 1. EvaluateJson: Populate the parameters DOB,
StoreId, PrintableId, Link and ProcessedFlag RouteOnAttribute: Only process when ProcessedFlag
= “N” InvokeHTTP: Invoke the Url in the link variable
in this example: https://abcd.com/136678/20180122/20971531 PutHDFS: Place the transaction detail json file
in the directory /apps/SalesTransactions with the file name 136678_20180122_80001.json UpdateAttribute: Set ProcessedFlag = “Y” AttributesToJson: Get variables DOB, StoreId, PrintableId,
Link and ProcessedFlag PutHDFS: replace the file 136678_20180122_80001.json
in the apps/LinksProcessed with the data
from the previous step File 136678_20180122_80001.json in the directory apps/LinksProcessed has the resulting data and the detail transaction json file has been placed in the HDFS directory apps/SalesTransactions {"Link":"https://abcd.com/136678/20180122/20971531”,
"PrintableId":"80001",
"StoreId":"136678",
"DOB":"2010122",
"ProcessedFlag":“Y"} What happens when we call the
master URL again. We get the data {"storeId": "136678",
"dob":"20180122",
"checks":[{"id": "20971531",
"printableId": "80001",
“IsClosed”= “true”,
"marker": 636704886835750777,
link": "https://abcd.com/136678/20180122/20971531"},
{"id": "20971531",
"printableId": "80001",
“IsClosed”= “true”,
"marker": 636704886835750777,
link": "https://abcd.com/136678/20180122/20971531"},
{"id": "20971531",
"printableId": "80001",
“IsClosed”= “true”,
"marker": 636704886835750777,
link": "https://abcd.com/136678/20180122/20971531"}
]} Two new transactions have been appended to the file, we now have three transactions, one of which we processed in the first pass. Part
one This will result in placing two new json files in the apps/LinksProcessed
directory. The first file is not updated nor does the PUTHDFS create an error as the Conflict Resolution Strategy= ignore. Therefore apps/LinksProcessed will look like Apps/ProcessLinks/136678_20180122_80001.json Apps/ProcessLinks/136678_20180122_10001.json Apps/ProcessLinks/136678_20180122_10002.json Part two The ListHDFS process will only call the latest files added to the directory since the last call. Therefore only the files 136678_20180122_10001.json and 136678_20180122_10002.json (the two new ones). This means that only the URLs for these transactions will be invoked and not
the first one that we have already processed in the first pass Resulting HDFS Invoke URL jsons Apps/ProcessLinks/136678_20180122_80001.json Apps/ProcessLinks/136678_20180122_10001.json Apps/ProcessLinks/136678_20180122_10002.json Detail sales transcation
json Apps/SalesTransactions/136678_20180122_80001.json Apps/SalesTransactions/136678_20180122_10001.json Apps/SalesTransactions/136678_20180122_10002.json I hope that is of use to people. Like I said I am new to Nifi so happy to receive any feedback that will improve the solution Tim
... View more
09-05-2018
10:45 PM
Hi Bryan Thank you for you help. I was getting confused between the different Schema Strategies and your explanation helped a lot. I tried altering the strategy to "Schema Name", but sadly this resulted in a different error "Failed to determine schema for writing: connection refused". I tried altering the HWX Schema registry and Kafka URLs, but without success. I then decided to alter the UpdateAttribute process and set the avro.schema to the schema definition and used the strategy "Schema Text" and this worked. This will allow me to continue development and I shall look at implementing the "Schema Name" strategy at a later date. Many thanks
... View more
09-05-2018
03:21 AM
Hi I am getting the below error when trying to pass a Json record into a Publish Kafka Record Process. Unexpected character ('A'(code 65)): expected a vaild value (number, String, array, object, 'true','false' or 'null') at [source: java.io.stringReader@7070ee3e; line: 1, column: 2 See attached image error.png Explanation: I am trying to pass the below Json File into the PublishKafkaRecord. Json File {"storeId":"psi119745","dob":"20180905","printedCheckId":"90001","typeId":"212919","label":"Product A","id":"9437243","createdOn":"2018-09-05T04:20:49+12:00","quantity":1.0,"UniqueItemKey":"psi119745_20180905_90001_9437243"} I have created a schema in the schema registry called AllItems with the below definition. AllItems Schema {
"type": "record",
"namespace": "AllItems.trans",
"name": "AllItems",
"fields": [
{
"name": "storeId",
"type": "string"
},
{
"name": "dob",
"type": "string"
},
{
"name": "printedCheckId",
"type": "string"
},
{
"name": "typeId",
"type": "string"
},
{
"name": "label",
"type": "string"
},
{
"name": "id",
"type": "string"
},
{
"name": "createdOn",
"type": "string"
},
{
"name": "quantity",
"type": "double"
},
{
"name": "UniqueItemKey",
"type": "string"
}
]
} Nifi data flow 1. Update Attributes In this step I set the parameters avro.schema = AllItems, kafka.topic = All_Items_Transactions and schema.name = AllItems See attached image for properties process-update-attributes-properties.png 2. PublishKafkaRecord Topic Name = ${kafka.topic}, Record Reader = JsonTreeReader and Record Writer = AvroRecordSetWriter see attached image for properites process-publishkafkarecord-properties.png JsonTreeReader, AvroRecordSetWriter and HortonworksSchemaRegistry properties serviceproperties.png Thanks for any help Tim
... View more
Labels:
- Labels:
-
Apache Kafka
-
Schema Registry
08-22-2018
03:50 AM
Hi I have a question about checking
to see if a URL has already been called by the Invoke Process. Below is the nifi data flow Step1. GetHTTP Process Step2: Split Json on $.checks Step3: Evaluate Json on $.link Step4: InvokeHttp based on $.link Step5: PuttHDF and
KafkaRecordQueue The issue I have is that when I first called the GetHttp
process the Json file looks like First Call Returns
{"storeId":
"136678", "dob": "20180122", "checks": [ { "id": "20971531", "printableId":
"80001", "marker": 636704886835750777, "link":
"https://abcd.com/136678/20180122/20971531" } ]} The next time I call it the, new information is appended to
the bottom of the file Second Call Returns
{"storeId":
"136678", "dob": "20180122", "checks": [ { "id": "20971531", "printableId":
"80001", "marker": 636704886835750777, "link":
"https://abcd.com/136678/20180122/20971531" }, { "id": "20971535", "printableId": "10001", "marker": 636704886835789652, "link":
"https://abcd.com/136678/20180122/20971535" } ]} Issue:
The second call is invoking the link from the first call again, that has already
been called and placed in the HDFS and Publish Kafka Record Queue. Question:
Is there any way I can check to see if the link called in
the First Call has been successfully called and is not invoked in the second
call? Hope that made sense Many thanks Tim
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi
-
HDFS