Support Questions

Find answers, ask questions, and share your expertise

SplitJson for ConsumerKafka

avatar
New Contributor

Hi guys

I'm new to nifi and I need some help if possible.

I am consuming from a Kafka Consumer that, when paused for some time, returns the json with several records, and I would like to divide them into separate files 1:1
ConsumeKafka ResultSet:

KleytonMayer_0-1688070422378.png

 



here is my json from 1 record

 

 

 

 

{
   "before":null,
   "after":{
      "uid":"80faad95-3383-4093-b4c1-e1840f076eef",
      "nome":"yuri",
      "sobrenome":"feminino",
      "email":"yuri.feminino@garupa.co",
      "celular":"53999122007",
      "cpf":"3",
      "avaliacao":"AfQ=",
      "created_at":1687173349000,
      "updated_at":1687423717000,
      "promocode":null,
      "fcm_registration_id":"edzz9eK3QDOUx-jsmpix4K:APA91bFPjXEsD7h-fYzih-pbr-TMfnF21Qk6sPDQYoa52etUwuIn3jCv1nM7ts4QYQ6f2Wcg-S88KdZsAqcEk98ZUAle-xnvFreavHUjB9_JBjj5wKDfjIYx6Z0en65Po_9knVGpz1u9",
      "paypal_client_id":null,
      "paypal_refresh_token":null,
      "foto":"https://garupa-dev.s3.amazonaws.com/207e9b35-c4b2-4660-af4f-cc20a629b581.jpg",
      "stripe_customer_id":null,
      "stripe_card_id":null,
      "cidade":null,
      "verificado":0,
      "cidade_cadastro":"Bagé",
      "estado_cadastro":"RS",
      "pais_cadastro":"BR",
      "uid_cidade":"5526ac90-b4b3-5c9d-f644-10dae4fcb79d",
      "sexo":"F",
      "observacoes":null,
      "uid_usuario_sigg":null,
      "telefone_verificado":true,
      "chamar_mulher":0,
      "situacao":1,
      "cadastro_ativo":true,
      "estudante":null,
      "observacoes_estudante":null,
      "professor":null,
      "gov_car":null,
      "data_nascimento":null,
      "cat_desconto":"NORMAL",
      "bairro":null,
      "numero":null,
      "cep":null,
      "complemento":null,
      "logradouro":null,
      "ddi":"55",
      "app_version":"3.2.2 24/05/2023",
      "corridas_finalizadas":3,
      "nota_app":null,
      "data_nota_app":null
   },
   "source":{
      "version":"2.1.3.Final",
      "connector":"postgresql",
      "name":"garupa",
      "ts_ms":1688069986956,
      "snapshot":"false",
      "db":"dfn2li0o7jp2l0",
      "sequence":"[\"831864725408\",\"831864731400\"]",
      "schema":"public",
      "table":"passageiros",
      "txId":3008149,
      "lsn":831864731400,
      "xmin":null
   },
   "op":"u",
   "ts_ms":1688069987250,
   "transaction":null
}{
   "before":null,
   "after":{
      "uid":"538377eb-2241-4bbe-aa8f-adf2432b2cf8",
      "nome":"yuri",
      "sobrenome":"refatoracao",
      "email":"yuri.refatoracao@garupa.co",
      "celular":"51992393118",
      "cpf":"3",
      "avaliacao":"AfQ=",
      "created_at":1687175340000,
      "updated_at":1687189491000,
      "promocode":null,
      "fcm_registration_id":null,
      "paypal_client_id":null,
      "paypal_refresh_token":null,
      "foto":"https://garupa-dev.s3.amazonaws.com/17c7bfc4-c3bd-486d-882c-17cf14e94c66.jpg",
      "stripe_customer_id":null,
      "stripe_card_id":null,
      "cidade":null,
      "verificado":0,
      "cidade_cadastro":"Bagé",
      "estado_cadastro":"RS",
      "pais_cadastro":"BR",
      "uid_cidade":"5526ac90-b4b3-5c9d-f644-10dae4fcb79d",
      "sexo":"F",
      "observacoes":null,
      "uid_usuario_sigg":null,
      "telefone_verificado":true,
      "chamar_mulher":0,
      "situacao":1,
      "cadastro_ativo":true,
      "estudante":null,
      "observacoes_estudante":null,
      "professor":null,
      "gov_car":null,
      "data_nascimento":null,
      "cat_desconto":"NORMAL",
      "bairro":null,
      "numero":null,
      "cep":null,
      "complemento":null,
      "logradouro":null,
      "ddi":"55",
      "app_version":null,
      "corridas_finalizadas":1,
      "nota_app":null,
      "data_nota_app":null
   },
   "source":{
      "version":"2.1.3.Final",
      "connector":"postgresql",
      "name":"garupa",
      "ts_ms":1688069987110,
      "snapshot":"false",
      "db":"dfn2li0o7jp2l0",
      "sequence":"[\"831864771784\",\"831864771784\"]",
      "schema":"public",
      "table":"pax",
      "txId":3008150,
      "lsn":831864771784,
      "xmin":null
   },
   "op":"u",
   "ts_ms":1688069987251,
   "transaction":null
}{
   "before":null,
   "after":{
      "uid":"e49c67e7-214c-4a19-923a-5188db938e14",
      "nome":"Yuri",
      "sobrenome":"Novo Pax Apple",
      "email":"yuru.pplae@hotmail.co",
      "celular":"99634382",
      "cpf":"3",
      "avaliacao":"AfQ=",
      "created_at":1684248098000,
      "updated_at":1688036553000,
      "promocode":null,
      "fcm_registration_id":"dOGi__Id0EosgQqjmd0IUX:APA91bHpJfIWoi1e9liV2iui6VfucFZNm2fC66axm2frf_XUPWkI07G4i8wVgyYAnAPSsnfAia_ATJGHpfJXRi5HOZPy6kIaVd8tN7vcoeBhS7Jj4Rcce03NyF785XYfGGMi88ekkx1f",
      "paypal_client_id":null,
      "paypal_refresh_token":null,
      "foto":"www.foto.com.br",
      "stripe_customer_id":null,
      "stripe_card_id":null,
      "cidade":null,
      "verificado":0,
      "cidade_cadastro":"Bagé",
      "estado_cadastro":"RS",
      "pais_cadastro":"BR",
      "uid_cidade":"5526ac90-b4b3-5c9d-f644-10dae4fcb79d",
      "sexo":"M",
      "observacoes":null,
      "uid_usuario_sigg":null,
      "telefone_verificado":true,
      "chamar_mulher":0,
      "situacao":1,
      "cadastro_ativo":true,
      "estudante":null,
      "observacoes_estudante":null,
      "professor":null,
      "gov_car":null,
      "data_nascimento":null,
      "cat_desconto":"NORMAL",
      "bairro":null,
      "numero":null,
      "cep":null,
      "complemento":null,
      "logradouro":null,
      "ddi":"598",
      "app_version":"3.2.1",
      "corridas_finalizadas":2,
      "nota_app":null,
      "data_nota_app":null
   },
   "source":{
      "version":"2.1.3.Final",
      "connector":"postgresql",
      "name":"garupa",
      "ts_ms":1688069987263,
      "snapshot":"false",
      "db":"dfn2li0o7jp2l0",
      "sequence":"[\"831864799856\",\"831864799856\"]",
      "schema":"public",
      "table":"passageiros",
      "txId":3008151,
      "lsn":831864799856,
      "xmin":null
   },
   "op":"u",
   "ts_ms":1688069987757,
   "transaction":null
}

 

 

 

 


I tried a splitJson with that specs

KleytonMayer_1-1688070565093.png


but my return got 6 values, but need to be 3, with all that json

KleytonMayer_2-1688070692393.png

I just want to get a line from the consumekafka and says that was 1 record , to all lines, to thejolttransform can be apllied normally before


what should i do?
Thanks.



1 ACCEPTED SOLUTION

avatar
Master Mentor

@KleytonMayer 

Best to provide the version of NiFi you are using along with the specific ConsumeKafka/ConsumeKafkaRecord processor you are using along with its configuration.
I'd expect your ConsumeKafka to split out one FlowFile per consumed Kafka record unless you have changed setting defaults or you are using a ConsumeKafkaRecord processor.

If you don't need to split your FlowFile into different FlowFiles for processing, I'd recommend you look into using the various "record" based processors NiFi offers.  Working with larger multi-record FlowFiles is more efficient and uses less resources.

The output you shared looks like it may be a single complete JSON per line.  If so, you could simply use the SplitText processor with a line Split Count of 1.

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

View solution in original post

3 REPLIES 3

avatar
Community Manager

@KleytonMayer Welcome to the Cloudera Community!

To help you get the best possible solution, I have tagged our NiFi experts @steven-matison and @SAMSAL  who may be able to assist you further.

Please keep us updated on your post, and we hope you find a satisfactory solution to your query.


Regards,

Diana Torres,
Community Moderator


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
Master Mentor

@KleytonMayer 

Best to provide the version of NiFi you are using along with the specific ConsumeKafka/ConsumeKafkaRecord processor you are using along with its configuration.
I'd expect your ConsumeKafka to split out one FlowFile per consumed Kafka record unless you have changed setting defaults or you are using a ConsumeKafkaRecord processor.

If you don't need to split your FlowFile into different FlowFiles for processing, I'd recommend you look into using the various "record" based processors NiFi offers.  Working with larger multi-record FlowFiles is more efficient and uses less resources.

The output you shared looks like it may be a single complete JSON per line.  If so, you could simply use the SplitText processor with a line Split Count of 1.

If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped.

Thank you,

Matt

avatar
New Contributor

@MattWho

Thanks for the help, and yes i will provide versions in my upcoming post.
That solved my issue.