So I'm making a flow that extract data from elasticsearch using SearchElasticsearch processor and dump the data into my table in BigQuery using PutBigQuery processor.
The data extracted from elasticsearch is json with new line as delimiter, like this:
{"_index":"twitter","_id":"123", "_source":{"message":"bla bla bla", type:"tweet"}}
{"_index":"twitter","_id":"124", "_source":{"message":"blalalala", type:"tweet"}}
And then I'm doing some cleaning to change some column name and make all the hits into one json and write it as pretty json like:
[
{"_index":"twitter",
"_id":"123",
"_source":
{"tweet":"bla bla bla",
"type":"tweet"}},
{"_index":"twitter",
"_id":"124",
"_source":
{"tweet":"blalalala",
type:"tweet"}}
And then, I'm trying to convert flowfile into JSON with my defined schema, so I used UpdateAtribute so my flowfile has my schema name of atribute.
Then I used ConvertRecord so each record is using the same avro schema (Because the data retrieved from elasticsearch has different columns, there are field that are contained in a data and some are not) here's the configuration:
Next, I used UpdateRecord and applied escapeXML() function on "message" field's.
The final processor in this flow is PutBigquery:
And when I run this processor it raised an error with this message:
and when I run this processor it raised an error with this message:
What do you guys think is incorrect about this entire process?
Here's the schema and example of data:
avro schema
BigQuery schema
Flowfile before PutBigquery example
I sincerely appreciate all the comments; but, if more explanation is required, just leave a comment below.