Support Questions

Find answers, ask questions, and share your expertise

How to solve a serialization error in PublishKafkaRecord in NiFi ?

avatar

Hello everyone,

I am currently facing an issue with integrating NiFi and Kafka (sending data to Kafka).

I am using the latest version of the NiFi docker image (apache/nifi, version 1.5) with Kafka confluent docker image (confluentinc/cp-kafka:4.0.0).

I am trying to send specific data to Kafka using the publishKafkaRecord_1_0 processor. However I was yet not able to succeed due to an IllegalTypeConversion error that is described further down this post.

Here is the NiFi template with all the processors and controller services configuration:

67417-nifi-flow.png


GenerateFlowFile config :

67418-generateflowfile-config.png

The custom generated text looks like this:

col_1;col_2;col_3;col_4; etc.
36531253;4787ea68-4276-4b3b-b154-d70419d23113;https://www.dummyexample.com;My Dummy website, description;?;365311753;2018-01-02T07:08:40Z;https://www.dummyexample.com/fr/openspace/loggin?axes4=priv;1;1;1;_15148769143240.5030172901622478_;?

The header line contains the column names.The "?" fields are unknown fields.

UpdateAttribute config :

67419-updateattribute-config.png

I put the schema name in the schema.name attribute.


PublishKafkaRecord_1_0 config :

67421-publishkafkarecordconfig-1.png

As for the controller services, I am using CSVReader for the record reader and JSONRecordSetWriter for the record writer.

Both use the Schema Name property for the access strategy (specified in the attribute thanks to the updateAttribute processor).

Both refer to a local Avro Schema Registry set up with the following schema :

{
  "type": "record",
  "name": "page_record",
  "fields" : [
    {"name": "session_number", "type": "bytes", "logicalType": "decimal", "precision": 20},
    {"name": "tracking_uuid", "type": "bytes"},
    {"name": "page_location_domain", "type": "string"},
    {"name": "page_title", "type": "string"},
    {"name": "referring_page_instance_id", "type": "bytes", "logicalType": "decimal", "precision": 20},
    {"name": "page_instance_id", "type": "bytes", "logicalType": "decimal", "precision": 20},
    {"name": "event_timestamp", "type": "long", "logicalType": "timestamp-millis"},
    {"name": "page_location", "type": "string"},
    {"name": "attribution_sequence_in_session", "type": "long"},
    {"name": "page_sequence_in_session", "type": "long"},
    {"name": "page_sequence_in_attribution", "type": "long"},
    {"name": "top_level_window_id", "type": "string"},
    {"name": "profile_uuid", "type": "string"}
  ]
}

N.B : (col_1, col_2 etc... are not the real names of the columns. One column name example is "session_number").

When going through the publishKafkaRecord_1_0 processor, the flowfiles cannot be sent to Kafka. Here is the error displayed by NiFi :

PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-1151-6eb4291aa590] Failed to send StandardFlowFileRecord[uuid=d7ab2519-d8b8-419b-8e0c-6d0704f36a97,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522227238377-1, container=default, section=1], offset=1338, length=525],offset=0,name=12260221790638,size=525] to Kafka: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [session_number] of type class java.lang.String to Object Array for field session_number

The problem is the same when I use the Confluent schema-registry instead of a local Avro schema registry.


I have also tried to use an AvroRecordSetWriter for the record writer but the result is the same : I still get an IllegalTypeConversion error from String to ByteBuffer.

Do you have any idea what can be the issue ?

Thank you in advance.

1 ACCEPTED SOLUTION

avatar
Master Guru

Was able to make this work with a few modifications...

1) logicalTypes need to be represented as a sub-type like this:

{
  "name": "session_number", 
  "type": { 
 	"type": "bytes", 
	"logicalType": "decimal", 
	"precision": 20
  }
}

Instead of:

{
  "name": "session_number", 
  "type": "bytes", 
  "logicalType": "decimal", 
  "precision": 20
} 

2) You can't use "?" for missing values because if the missing value is a number (or something other than string) than it will try to parse "?" into the given type and fail. To work around this you can make the fields in the schema nullable by using a union of "null" and the real type.

3) Your input data has the timestamp in a string format, but the schema specifies it as timestamp millis, so the CSV Reader needs the timestamp property set to

yyyy-MM-dd'T'HH:mm:ss'Z'

Here is the full schema with the changes mentioned in #1 and #2:

{
  "type": "record",
  "name": "page_record",
  "fields" : [
  {"name": "session_number", "type": { "type": "bytes", "logicalType": "decimal", "precision": 20} },
  {"name": "tracking_uuid", "type": "string"},
  {"name": "page_location_domain", "type": "string"},
  {"name": "page_title", "type": "string"},
  {"name": "referring_page_instance_id", "type": ["null", { "type": "bytes", "logicalType": "decimal", "precision": 20} ]},
  {"name": "page_instance_id", "type": { "type": "bytes", "logicalType": "decimal", "precision": 20}},
  {"name": "event_timestamp", "type": { "type": "long", "logicalType": "timestamp-millis"}},
  {"name": "page_location", "type": "string"},
  {"name": "attribution_sequence_in_session", "type": "long"},
  {"name": "page_sequence_in_session", "type": "long"},
  {"name": "page_sequence_in_attribution", "type": "long"},
  {"name": "top_level_window_id", "type": "string"},
  {"name": "profile_uuid", "type": ["null", "string"]}
  ]
}

And here is the input data with the question marks removed:

col_1;col_2;col_3;col_4;col_5;col_6;col_7;col_8;col_9;col_10;col_11;col_12;col_13 36531253;4787ea68-4276-4b3b-b154-d70419d23113;https://www.dummyexample.com;My Dummy website, description;;365311753;2018-01-02T07:08:40Z;https://www.dummyexample.com/fr/openspace/loggin?axes4=priv;1;1;1;_15148769143240.5030172901622478_;

View solution in original post

5 REPLIES 5

avatar
Master Guru

Can you provide the full stacktrace from nifi-app.log for the above error?

avatar

Thanks @Bryan Bende for the quick response.
Here is the full stacktrace from nifi-app.log :

2018-03-30 07:30:37,664 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@14927cd4 checkpointed with 3 Records and 0 Swap Files in 7 milliseconds (Stop-the-world time = 1 milliseconds, Clear Edit Logs time = 1 millis), max Transaction ID 8
2018-03-30 07:30:39,798 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2018-03-30 07:30:39,846 INFO [pool-10-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@66694050 checkpointed with 0 Records and 0 Swap Files in 48 milliseconds (Stop-the-world time = 21 milliseconds, Clear Edit Logs time = 17 millis), max Transaction ID 4
2018-03-30 07:30:39,846 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 48 milliseconds
2018-03-30 07:32:37,675 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@14927cd4 checkpointed with 3 Records and 0 Swap Files in 10 milliseconds (Stop-the-world time = 2 milliseconds, Clear Edit Logs time = 2 millis), max Transaction ID 8
2018-03-30 07:32:39,846 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2018-03-30 07:32:39,918 INFO [pool-10-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@66694050 checkpointed with 0 Records and 0 Swap Files in 71 milliseconds (Stop-the-world time = 36 milliseconds, Clear Edit Logs time = 24 millis), max Transaction ID 4
2018-03-30 07:32:39,918 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 71 milliseconds
2018-03-30 07:33:53,439 INFO [NiFi Web Server-84] o.a.n.c.s.StandardProcessScheduler Starting GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9]
2018-03-30 07:33:53,444 INFO [StandardProcessScheduler Thread-5] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9] to run with 1 threads
2018-03-30 07:33:53,680 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:33:57,445 INFO [Provenance Maintenance Thread-3] o.a.n.p.PersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 8
2018-03-30 07:33:57,482 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.lucene.SimpleIndexManager Index Writer for ./provenance_repository/index-1522394737000 has been returned to Index Manager and is no longer in use. Closing Index Writer
2018-03-30 07:33:57,483 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully merged 16 journal files (1 records) into single Provenance Log File ./provenance_repository/7.prov in 40 milliseconds
2018-03-30 07:33:57,483 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 1 records. In the past 5 minutes, 1 events have been written to the Provenance Repository, totaling 312 bytes
2018-03-30 07:34:02,274 INFO [NiFi Web Server-93] o.a.n.c.s.StandardProcessScheduler Stopping GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9]
2018-03-30 07:34:02,274 INFO [NiFi Web Server-93] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.GenerateFlowFile
2018-03-30 07:34:02,274 INFO [StandardProcessScheduler Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9] to run
2018-03-30 07:34:02,822 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:34:03,949 INFO [NiFi Web Server-84] o.a.n.c.s.StandardProcessScheduler Starting UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494]
2018-03-30 07:34:03,952 INFO [StandardProcessScheduler Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494] to run with 1 threads
2018-03-30 07:34:04,465 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:34:06,378 INFO [NiFi Web Server-119] o.a.n.c.s.StandardProcessScheduler Starting PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64]
2018-03-30 07:34:06,386 INFO [StandardProcessScheduler Thread-7] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] to run with 1 threads
2018-03-30 07:34:06,391 INFO [Timer-Driven Process Thread-1] o.a.k.clients.producer.ProducerConfig ProducerConfig values: 
    acks = 0
    batch.size = 16384
    bootstrap.servers = [kafka:29092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 5000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2018-03-30 07:34:06,399 INFO [Timer-Driven Process Thread-1] o.a.kafka.common.utils.AppInfoParser Kafka version : 1.0.0
2018-03-30 07:34:06,400 INFO [Timer-Driven Process Thread-1] o.a.kafka.common.utils.AppInfoParser Kafka commitId : aaa7af6d4a11b29d
2018-03-30 07:34:06,403 ERROR [Timer-Driven Process Thread-1] o.a.n.p.k.pubsub.PublishKafkaRecord_1_0 PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] Failed to send StandardFlowFileRecord[uuid=26306f03-9659-42b3-bbfd-97d19ee1b9b8,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522394734968-1, container=default, section=1], offset=1050, length=525],offset=0,name=3630905036922,size=525] to Kafka: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
    at org.apache.nifi.serialization.record.util.DataTypeUtils.toArray(DataTypeUtils.java:270)
    at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:136)
    at org.apache.nifi.csv.CSVRecordReader.convert(CSVRecordReader.java:177)
    at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:117)
    at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
    at org.apache.nifi.serialization.RecordReader$1.next(RecordReader.java:96)
    at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:162)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0$1.process(PublishKafkaRecord_1_0.java:411)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2175)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2145)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0.onTrigger(PublishKafkaRecord_1_0.java:403)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2018-03-30 07:34:06,403 ERROR [Timer-Driven Process Thread-1] o.a.n.p.k.pubsub.PublishKafkaRecord_1_0 PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] Failed to send StandardFlowFileRecord[uuid=26306f03-9659-42b3-bbfd-97d19ee1b9b8,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522394734968-1, container=default, section=1], offset=1050, length=525],offset=0,name=3630905036922,size=525] to Kafka: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
    at org.apache.nifi.serialization.record.util.DataTypeUtils.toArray(DataTypeUtils.java:270)
    at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:136)
    at org.apache.nifi.csv.CSVRecordReader.convert(CSVRecordReader.java:177)
    at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:117)
    at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
    at org.apache.nifi.serialization.RecordReader$1.next(RecordReader.java:96)
    at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:162)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0$1.process(PublishKafkaRecord_1_0.java:411)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2175)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2145)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0.onTrigger(PublishKafkaRecord_1_0.java:403)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
2018-03-30 07:34:06,404 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 5000 ms.
2018-03-30 07:34:06,629 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:34:11,256 INFO [NiFi Web Server-125] o.a.n.c.s.StandardProcessScheduler Stopping UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494]
2018-03-30 07:34:11,256 INFO [NiFi Web Server-125] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.attributes.UpdateAttribute
2018-03-30 07:34:11,257 INFO [StandardProcessScheduler Thread-6] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494] to run
2018-03-30 07:34:11,257 INFO [NiFi Web Server-125] o.a.n.c.s.StandardProcessScheduler Stopping PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64]
2018-03-30 07:34:11,257 INFO [NiFi Web Server-125] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0
2018-03-30 07:34:11,258 INFO [StandardProcessScheduler Thread-4] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] to run
2018-03-30 07:34:11,762 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false

N.B : I am using docker instances for both NiFi and the Confluent Kafka.

Do not hesitate to ask me for further details on the NiFi flow configuration.

Here is the full stacktrace (I have run the test twice) :
nifi-app.txt

avatar
Master Guru

Was able to make this work with a few modifications...

1) logicalTypes need to be represented as a sub-type like this:

{
  "name": "session_number", 
  "type": { 
 	"type": "bytes", 
	"logicalType": "decimal", 
	"precision": 20
  }
}

Instead of:

{
  "name": "session_number", 
  "type": "bytes", 
  "logicalType": "decimal", 
  "precision": 20
} 

2) You can't use "?" for missing values because if the missing value is a number (or something other than string) than it will try to parse "?" into the given type and fail. To work around this you can make the fields in the schema nullable by using a union of "null" and the real type.

3) Your input data has the timestamp in a string format, but the schema specifies it as timestamp millis, so the CSV Reader needs the timestamp property set to

yyyy-MM-dd'T'HH:mm:ss'Z'

Here is the full schema with the changes mentioned in #1 and #2:

{
  "type": "record",
  "name": "page_record",
  "fields" : [
  {"name": "session_number", "type": { "type": "bytes", "logicalType": "decimal", "precision": 20} },
  {"name": "tracking_uuid", "type": "string"},
  {"name": "page_location_domain", "type": "string"},
  {"name": "page_title", "type": "string"},
  {"name": "referring_page_instance_id", "type": ["null", { "type": "bytes", "logicalType": "decimal", "precision": 20} ]},
  {"name": "page_instance_id", "type": { "type": "bytes", "logicalType": "decimal", "precision": 20}},
  {"name": "event_timestamp", "type": { "type": "long", "logicalType": "timestamp-millis"}},
  {"name": "page_location", "type": "string"},
  {"name": "attribution_sequence_in_session", "type": "long"},
  {"name": "page_sequence_in_session", "type": "long"},
  {"name": "page_sequence_in_attribution", "type": "long"},
  {"name": "top_level_window_id", "type": "string"},
  {"name": "profile_uuid", "type": ["null", "string"]}
  ]
}

And here is the input data with the question marks removed:

col_1;col_2;col_3;col_4;col_5;col_6;col_7;col_8;col_9;col_10;col_11;col_12;col_13 36531253;4787ea68-4276-4b3b-b154-d70419d23113;https://www.dummyexample.com;My Dummy website, description;;365311753;2018-01-02T07:08:40Z;https://www.dummyexample.com/fr/openspace/loggin?axes4=priv;1;1;1;_15148769143240.5030172901622478_;

avatar

Thank you @Bryan Bende ! It worked 🙂

avatar
Master Guru

Glad to hear it! Can you please mark this answer as "accepted" ?