Member since
06-17-2016
56
Posts
6
Kudos Received
0
Solutions
08-29-2018
07:32 AM
Hi everyone, we are using NiFi 1.4 and the ConsumeMQTT processor to subscribe to topics publish by a message broker from Web Methods calls Universal Messaging. For some channels we experience unexpected behavior. When I start the ConsumeMQTT Processor I get the following error message: 2018-08-29 09:05:16,615 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.mqtt.ConsumeMQTT ConsumeMQTT[id=01651015-56c3-1f18-9c66-35570bec2f7a] Connection to tcp://10.170.232.13:1883 lost (or was never connected) and ontrigger connect failed. Yielding processor:
org.eclipse.paho.client.mqttv3.MqttException: MqttException
at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:327)
at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:313)
at org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347)
at org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:255)
at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Attached a screenshot with the processor settings. Error is only thrown one time and the people who administer the broker told us they are not able to see the connection. We are not getting the messages. If I have to guess I would say the problem is related to the QoS value, but I have no clues. Any idea or comment would be appreciated. Kind regards, Paul
... View more
Labels:
- Labels:
-
Apache NiFi
06-19-2018
09:28 AM
1 Kudo
Hi @rama from the official documentation: An external table describes the metadata / schema on external files. External table files can be accessed and managed by processes outside of Hive. External tables can access data stored in sources such as Azure Storage Volumes (ASV) or remote HDFS locations. If the structure or partitioning of an external table is changed, an MSCK REPAIR TABLE table_name statement can be used to refresh metadata information. To make the metastore aware of partitions that were added directly to HDFS, you can use the metastore check command (MSCK) Since you are adding and dropping partitions to an external table, you need to make the metastore aware of it. Hope that clarifies the issue. Kind regards, Paul
... View more
06-18-2018
01:19 PM
1 Kudo
Hi @rama have you issued this command after creating a partition? MSCK REPAIR TABLE <tablename>; Are you working with internal or internal tables? Kind regards, Paul
... View more
05-29-2018
09:09 PM
Hi guys, thanks so much for the fast support and thanks to the Matts Team @Matt Burgess and @Matt Clarke I finally understood how the processor works. He emits a flow file with no payload and in the meta attributes are the file details like path and filename. Those are used by the HDFSFetch to fetch the correspondent files. Kind regards, Paul
... View more
05-29-2018
04:02 PM
Hi @Sungwoo Park, thanks for the input. Could you please elaborate a little bit more, why could the symlink cause problems, and which ones? I am very interesting since we have this settings in a demo cluster within a customer. BR. Paul
... View more
05-29-2018
03:57 PM
Hi everyone, I am facing a problem during the last days with a NiFi flow using HDFS List and Fetch processors. The queue between them shows more than one million flow files and a total of 0 MB size. This is very confusing. If I tried to see one of the files I am able to list them and if I click on the info bottom I can confirm the file size, but it seems to be empty. Back pressure is set to 100K, therefore I could not understand the number of files. I tried restarting NiFi and dropping the files but the problem returns again. Attached a screenshot of part of the flow.Any idea would be appreciated. Best regards, Paul
... View more
Labels:
- Labels:
-
Apache NiFi
05-28-2018
01:03 PM
1 Kudo
Hi everyone, I already solved it after a deep analysis of the code. As you can see in the code I posted above, I am repartitioning the data. As a background, the regular process transforms small files, and I want to collect the partial results and created a sigle file, which is then written into HDFS. That is a desired feature since HDFS works better with bigger files. To explain it better, because small and big could be very fuzzy. Our HDFS has a standard configuration of 128 MB blocks, therefore, a 2 or 3 MB files makes no sense and is also affecting the performance. This is the regular situation, but now a backlog of around 1 TB needs to be processed and the repartition is causing a shuffle operation. As far as I understand, the repartition requires to collect all the parts in one worker to create one partition. Since the original RDD is bigger than the memory available in the workers, this collapses everything and throws the errors I reported above. aswdirCsvDf.repartition(1).write I just removed the ".repartition(1)" from the code and now is everything working. The program, writes several files, that is, one file pro worker, and in this context it is quite ok. Kind regards, Paul
... View more
05-24-2018
02:12 PM
Hi everyone, this week we get an increment in the amount of data our Spark ETL Job needs to process. We were able to successfully process up to 120 GB and due to some changes and backlog now around 1TB needs to be processed. We have a cluster with 18 Spark2 clients and I have to use a Yarn Queue that has 30% assigned. Every box has 56 CPUs and 256 GB RAM. HDP Version = 2.6.4 Spark = 2.2.0 I was running the job with: Number of executors: 73 Executor Memory: 3 GB Executor Cores: 5 Driver Memory: 5 GB Spark Master: yarn spark.driver.maxResultSize: 5 GB Since we have this amount of data (1 TB) we are getting the following error: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11 These are the details of the failing job (see attachments): And the complete error trace: org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 11
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) That is a piece of the Spark program: // Define json processing path
val jsonProcessingPath = "hdfs:///" + env +"/etl/dwd/forecast/processing/" + filesPath + "*"
// Target directories
val dwdForecastDataPath = "hdfs:///" + env +"/data/dwd/forecast"
val dwdForecastStagePath = "hdfs:///" + env +"/etl/dwd/forecast/stage"
val jsonRDD = spark.sparkContext.wholeTextFiles(jsonProcessingPath, 72)
val jsonDF = spark.read.json(jsonRDD.map(f => f._2))
val dwdJsonRecords = jsonDF.select($"meta.filename"
, $"meta.numberOfRecords", $"meta.recordNumber", explode($"records").as("records"))
.select($"filename", $"numberOfRecords", $"recordNumber", $"records.time", explode($"records.grid s").as("grids"))
.select($"filename", $"numberOfRecords", $"recordNumber", $"time", $"grids.gPt".getItem(0).as("lat 1"), $"grids.gPt".getItem(1).as("long1"), $"grids.gPt".getItem(2).as("lat2"), $"grids.gPt".g etItem(3).as("long2"), $"grids.gPt".getItem(4).as("value"))
// Create Temporary View for data processing
dwdJsonRecords.createOrReplaceTempView("dwd")
// Create table from View
spark.table("dwd")
// Load the table in memory
spark.table("dwd").cache
/* --------------------------------------
-------- ASWDIR_S ----------------
------------------------------------ */
val aswdirCsvDf = spark.sql("""SELECT regexp_extract(filename, 'cosmo.+_([0-9]{10}_[0-9]{3})_.+(?=.gri b2.bz2)', 1) as `msg_id`
, to_utc_timestamp(CAST(from_unixtime(unix_timestamp(time,"yyyy-MM-dd'T'HH:mm:ssXXX")) as timestam p),'Europe/Berlin') as `msg_ts`
, lat1, long1, lat2, long2, value FROM dwd WHERE filename like '%ASWDIR%' and value != 'NaN'""")
aswdirCsvDf.repartition(1).write.format("com.databricks.spark.csv").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").mode("append").save(aswdirPathCsv.concat("ASWDIR_S"))
I am using a wholeTextFiles to load several JSON files at once and for 72 partitions. I selected this values after some try and error some months ago. Then I want to catch the dataframe in-memory. Finally I write some queries to flatten-out the JSON content and write it back as single csv file to HDFS. Attach you can find an screen shot of the stages. Any help will be highly appreciated. Kind regards, Paul
... View more
Labels:
- Labels:
-
Apache Spark
04-28-2018
09:49 AM
Hi @Sungwoo Park, You can have a look at this question. I think it would help you : https://stackoverflow.com/questions/47198678/zeppelin-python-conda-and-python-sql-interpreters-do-not-work-without-adding-a Best regards, Paul
... View more
04-04-2018
12:10 PM
Hi @David Streever, thanks for your reply. I think I found the problem but I still have to test the solution. It seems that Spark is not able to traverse my directory structure unless I create partitions. Even when I defined the properties: .config("mapreduce.input.fileinputformat.input.dir.recursive","true")
.config("mapred.input.dir.recursive","true")
.config("hive.mapred.supports.subdirectories","true")
... View more