Member since
03-28-2018
5
Posts
0
Kudos Received
0
Solutions
06-03-2019
04:17 PM
Problem solved. Here is the post where it was solved with more details (code, execution plan) : https://stackoverflow.com/questions/55919699/spark-2-2-join-fails-with-huge-dataset
... View more
04-29-2019
06:16 PM
Hello everyone, I am currently facing issues when trying to join (inner) a huge dataset (654 GB) with a smaller one (535 MB) using Spark DataFrame API. I am broadcasting the smaller dataset to the worker nodes using the broadcast() function. I am unable to do the join between those two datasets. Here is a sample of the errors I got : 19/04/26 19:39:07 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1315
19/04/26 19:39:07 INFO executor.Executor: Running task 25.1 in stage 13.0 (TID 1315)
19/04/26 19:39:07 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/26 19:39:07 INFO datasources.SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
19/04/26 19:39:07 INFO datasources.FileScanRDD: Reading File path: SOMEFILEPATH, range: 3087007744-3221225472, partition values: [empty row]
19/04/26 19:39:17 INFO datasources.FileScanRDD: Reading File path: SOMEFILEPATH, range: 15971909632-16106127360, partition values: [empty row]
19/04/26 19:39:24 WARN hdfs.DFSClient: DFSOutputStream ResponseProcessor exception for block isi_hdfs_pool:blk_4549851005_134218728
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2280)
at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:244)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:733)
19/04/26 19:39:27 ERROR util.Utils: Aborting task
com.univocity.parsers.common.TextWritingException: Error writing row.
Internal state when error was thrown: recordCount=458089, recordData=["SOMEDATA"]
at com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:916)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:706)
at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:82)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:139)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:327)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Error closing the output.
at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:861)
at com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:903)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:811)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:704)
... 15 more
Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage[10.241.209.34:585,null,DISK] are bad. Aborting...
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401)
19/04/26 19:39:27 WARN util.Utils: Suppressing exception in catch: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "SOMENODEHOST"; destination host is: "SOMEDESTINATIONHOST":SOMEPORT;
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "SOMENODEHOST"; destination host is: "SOMEDESTINATIONHOST":SOMEPORT;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy17.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy18.delete(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortTask(FileOutputCommitter.java:568)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortTask(FileOutputCommitter.java:557)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortTask(HadoopMapReduceCommitProtocol.scala:159)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:266)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1384)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:520)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1084)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979) Before joining the large dataset with the smaller one, I tried joining 10 000 records of the first one with the entire smaller one (535 MB). I had a "Futures timed out [300 s] error". I then increased the spark.sql.broadcastTimeout variable to 3600 s. It worked fine. But when I try joining it with the entire dataset (654 GB), it gives me the error you can see up (TextWriting Exception). My questions are : - How can I monitor more efficiently my spark jobs ? And how should I proceed ? - What do you think is causing this error to happen ? How can I solve it ? You will find below some information on the cluster, the execution and the configuration of the spark job. Some information/context : I am working on a production environment (see the cluster configuration below). I cannot upgrade my spark version. I do not have spark UI or yarn UI to monitor my jobs. All I can retrieve are the yarn logs. Spark Version : 2.2 Cluster configuration: 21 compute nodes (workers) 8 cores each 64 GB RAM per node Current Spark configuration : master: yarn executor-memory: 42G executor-cores: 5 driver memory: 42G num-executors: 28 spark.sql.broadcastTimeout=3600 spark.kryoserializer.buffer.max=512 spark.yarn.executor.memoryOverhead=2400 spark.driver.maxResultSize=500m spark.memory.storageFraction=0.9 spark.memory.fraction=0.9 spark.hadoop.fs.permissions.umask-mode=007 How is the job executed: We build an artifact (jar) with IntelliJ and then send it to a server. Then a bash script is executed . This script : - export some environment variables (SPARK_HOME, HADOOP_CONF_DIR, PATH and SPARK_LOCAL_DIRS) - launch the spark-submit command with all the parameters defined in the spark configuration above. - retrieves the yarn logs of the application
... View more
Labels:
- Labels:
-
Apache Spark
04-04-2018
07:03 AM
Thank you @Bryan Bende ! It worked 🙂
... View more
03-30-2018
07:38 AM
Thanks @Bryan Bende for the quick response.
Here is the full stacktrace from nifi-app.log : 2018-03-30 07:30:37,664 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@14927cd4 checkpointed with 3 Records and 0 Swap Files in 7 milliseconds (Stop-the-world time = 1 milliseconds, Clear Edit Logs time = 1 millis), max Transaction ID 8
2018-03-30 07:30:39,798 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2018-03-30 07:30:39,846 INFO [pool-10-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@66694050 checkpointed with 0 Records and 0 Swap Files in 48 milliseconds (Stop-the-world time = 21 milliseconds, Clear Edit Logs time = 17 millis), max Transaction ID 4
2018-03-30 07:30:39,846 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 48 milliseconds
2018-03-30 07:32:37,675 INFO [Write-Ahead Local State Provider Maintenance] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@14927cd4 checkpointed with 3 Records and 0 Swap Files in 10 milliseconds (Stop-the-world time = 2 milliseconds, Clear Edit Logs time = 2 millis), max Transaction ID 8
2018-03-30 07:32:39,846 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2018-03-30 07:32:39,918 INFO [pool-10-thread-1] org.wali.MinimalLockingWriteAheadLog org.wali.MinimalLockingWriteAheadLog@66694050 checkpointed with 0 Records and 0 Swap Files in 71 milliseconds (Stop-the-world time = 36 milliseconds, Clear Edit Logs time = 24 millis), max Transaction ID 4
2018-03-30 07:32:39,918 INFO [pool-10-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 0 records in 71 milliseconds
2018-03-30 07:33:53,439 INFO [NiFi Web Server-84] o.a.n.c.s.StandardProcessScheduler Starting GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9]
2018-03-30 07:33:53,444 INFO [StandardProcessScheduler Thread-5] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9] to run with 1 threads
2018-03-30 07:33:53,680 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:33:57,445 INFO [Provenance Maintenance Thread-3] o.a.n.p.PersistentProvenanceRepository Created new Provenance Event Writers for events starting with ID 8
2018-03-30 07:33:57,482 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.lucene.SimpleIndexManager Index Writer for ./provenance_repository/index-1522394737000 has been returned to Index Manager and is no longer in use. Closing Index Writer
2018-03-30 07:33:57,483 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully merged 16 journal files (1 records) into single Provenance Log File ./provenance_repository/7.prov in 40 milliseconds
2018-03-30 07:33:57,483 INFO [Provenance Repository Rollover Thread-1] o.a.n.p.PersistentProvenanceRepository Successfully Rolled over Provenance Event file containing 1 records. In the past 5 minutes, 1 events have been written to the Provenance Repository, totaling 312 bytes
2018-03-30 07:34:02,274 INFO [NiFi Web Server-93] o.a.n.c.s.StandardProcessScheduler Stopping GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9]
2018-03-30 07:34:02,274 INFO [NiFi Web Server-93] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.GenerateFlowFile
2018-03-30 07:34:02,274 INFO [StandardProcessScheduler Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling GenerateFlowFile[id=9b51d55e-84c0-330c-1e4e-09554aac25f9] to run
2018-03-30 07:34:02,822 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:34:03,949 INFO [NiFi Web Server-84] o.a.n.c.s.StandardProcessScheduler Starting UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494]
2018-03-30 07:34:03,952 INFO [StandardProcessScheduler Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494] to run with 1 threads
2018-03-30 07:34:04,465 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:34:06,378 INFO [NiFi Web Server-119] o.a.n.c.s.StandardProcessScheduler Starting PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64]
2018-03-30 07:34:06,386 INFO [StandardProcessScheduler Thread-7] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] to run with 1 threads
2018-03-30 07:34:06,391 INFO [Timer-Driven Process Thread-1] o.a.k.clients.producer.ProducerConfig ProducerConfig values:
acks = 0
batch.size = 16384
bootstrap.servers = [kafka:29092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 5000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2018-03-30 07:34:06,399 INFO [Timer-Driven Process Thread-1] o.a.kafka.common.utils.AppInfoParser Kafka version : 1.0.0
2018-03-30 07:34:06,400 INFO [Timer-Driven Process Thread-1] o.a.kafka.common.utils.AppInfoParser Kafka commitId : aaa7af6d4a11b29d
2018-03-30 07:34:06,403 ERROR [Timer-Driven Process Thread-1] o.a.n.p.k.pubsub.PublishKafkaRecord_1_0 PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] Failed to send StandardFlowFileRecord[uuid=26306f03-9659-42b3-bbfd-97d19ee1b9b8,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522394734968-1, container=default, section=1], offset=1050, length=525],offset=0,name=3630905036922,size=525] to Kafka: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
at org.apache.nifi.serialization.record.util.DataTypeUtils.toArray(DataTypeUtils.java:270)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:136)
at org.apache.nifi.csv.CSVRecordReader.convert(CSVRecordReader.java:177)
at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:117)
at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
at org.apache.nifi.serialization.RecordReader$1.next(RecordReader.java:96)
at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:162)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0$1.process(PublishKafkaRecord_1_0.java:411)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2175)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2145)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0.onTrigger(PublishKafkaRecord_1_0.java:403)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-03-30 07:34:06,403 ERROR [Timer-Driven Process Thread-1] o.a.n.p.k.pubsub.PublishKafkaRecord_1_0 PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] Failed to send StandardFlowFileRecord[uuid=26306f03-9659-42b3-bbfd-97d19ee1b9b8,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522394734968-1, container=default, section=1], offset=1050, length=525],offset=0,name=3630905036922,size=525] to Kafka: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [36531253] of type class java.lang.String to Object Array for field session_number
at org.apache.nifi.serialization.record.util.DataTypeUtils.toArray(DataTypeUtils.java:270)
at org.apache.nifi.serialization.record.util.DataTypeUtils.convertType(DataTypeUtils.java:136)
at org.apache.nifi.csv.CSVRecordReader.convert(CSVRecordReader.java:177)
at org.apache.nifi.csv.CSVRecordReader.nextRecord(CSVRecordReader.java:117)
at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
at org.apache.nifi.serialization.RecordReader$1.next(RecordReader.java:96)
at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:162)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0$1.process(PublishKafkaRecord_1_0.java:411)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2175)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2145)
at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0.onTrigger(PublishKafkaRecord_1_0.java:403)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2018-03-30 07:34:06,404 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 5000 ms.
2018-03-30 07:34:06,629 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
2018-03-30 07:34:11,256 INFO [NiFi Web Server-125] o.a.n.c.s.StandardProcessScheduler Stopping UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494]
2018-03-30 07:34:11,256 INFO [NiFi Web Server-125] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.attributes.UpdateAttribute
2018-03-30 07:34:11,257 INFO [StandardProcessScheduler Thread-6] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling UpdateAttribute[id=75baff16-0162-1000-c164-aee99357f494] to run
2018-03-30 07:34:11,257 INFO [NiFi Web Server-125] o.a.n.c.s.StandardProcessScheduler Stopping PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64]
2018-03-30 07:34:11,257 INFO [NiFi Web Server-125] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0
2018-03-30 07:34:11,258 INFO [StandardProcessScheduler Thread-4] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-372e-206a2dd31f64] to run
2018-03-30 07:34:11,762 INFO [Flow Service Tasks Thread-2] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@20fc70aa // Another save pending = false
N.B : I am using docker instances for both NiFi and the Confluent Kafka. Do not hesitate to ask me for further details on the NiFi flow configuration. Here is the full stacktrace (I have run the test twice) : nifi-app.txt
... View more
03-28-2018
02:47 PM
Hello everyone,
I am currently facing an issue with integrating NiFi and Kafka (sending data to Kafka).
I am using the latest version of the NiFi docker image (apache/nifi, version 1.5) with Kafka confluent docker image (confluentinc/cp-kafka:4.0.0).
I am trying to send specific data to Kafka using the publishKafkaRecord_1_0 processor. However I was yet not able to succeed due to an IllegalTypeConversion error that is described further down this post.
Here is the NiFi template with all the processors and controller services configuration: GenerateFlowFile config :
The custom generated text looks like this: col_1;col_2;col_3;col_4; etc.
36531253;4787ea68-4276-4b3b-b154-d70419d23113;https://www.dummyexample.com;My Dummy website, description;?;365311753;2018-01-02T07:08:40Z;https://www.dummyexample.com/fr/openspace/loggin?axes4=priv;1;1;1;_15148769143240.5030172901622478_;? The header line contains the column names.The "?" fields are unknown fields. UpdateAttribute config :
I put the schema name in the schema.name attribute.
PublishKafkaRecord_1_0 config :
As for the controller services, I am using CSVReader for the record reader and JSONRecordSetWriter for the record writer. Both use the Schema Name property for the access strategy (specified in the attribute thanks to the updateAttribute processor). Both refer to a local Avro Schema Registry set up with the following schema : {
"type": "record",
"name": "page_record",
"fields" : [
{"name": "session_number", "type": "bytes", "logicalType": "decimal", "precision": 20},
{"name": "tracking_uuid", "type": "bytes"},
{"name": "page_location_domain", "type": "string"},
{"name": "page_title", "type": "string"},
{"name": "referring_page_instance_id", "type": "bytes", "logicalType": "decimal", "precision": 20},
{"name": "page_instance_id", "type": "bytes", "logicalType": "decimal", "precision": 20},
{"name": "event_timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "page_location", "type": "string"},
{"name": "attribution_sequence_in_session", "type": "long"},
{"name": "page_sequence_in_session", "type": "long"},
{"name": "page_sequence_in_attribution", "type": "long"},
{"name": "top_level_window_id", "type": "string"},
{"name": "profile_uuid", "type": "string"}
]
}
N.B : (col_1, col_2 etc... are not the real names of the columns. One column name example is "session_number").
When going through the publishKafkaRecord_1_0 processor, the flowfiles cannot be sent to Kafka. Here is the error displayed by NiFi :
PublishKafkaRecord_1_0[id=22b8f3a8-32cc-39f2-1151-6eb4291aa590] Failed to send StandardFlowFileRecord[uuid=d7ab2519-d8b8-419b-8e0c-6d0704f36a97,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1522227238377-1, container=default, section=1], offset=1338, length=525],offset=0,name=12260221790638,size=525] to Kafka: org.apache.nifi.serialization.record.util.IllegalTypeConversionException: Cannot convert value [session_number] of type class java.lang.String to Object Array for field session_number
The problem is the same when I use the Confluent schema-registry instead of a local Avro schema registry.
I have also tried to use an AvroRecordSetWriter for the record writer but the result is the same : I still get an IllegalTypeConversion error from String to ByteBuffer. Do you have any idea what can be the issue ? Thank you in advance.
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi
-
Schema Registry