Member since
06-17-2016
56
Posts
5
Kudos Received
0
Solutions
09-20-2019
12:46 AM
Hi everyone, I am trying to build a flow, that: Gets a Bearer token using an API call, store it either in a variable or in a file Read a CSV files with a list of parameter values Call another API method using the parameter values (one call per parameter) and the token generated I found some hurdles in order to do that: I know there are processor group variables, but it seems I can only set them manually and not programmatically. If I put the valid tokens in a file I don't know exactly how to combine it with the list of parameters What I try to achieve is the Cartesian product of parameters and token. I figured out a flow using Wait and Notify processors but at the end I am little bit lost. Some other challenges: If the token becomes invalid, generate a new token and call the API again with the same parameters before the next 5 minutes schedule How to merge the flowfile with the token more than one time Any ideas if there is a not too nasty way to accomplish that. Any comments will be highly appreciated. Best, Paul
... View more
Labels:
01-17-2019
07:35 PM
Hi everyone, We are facing some difficulties: Application description: Spark subscribes to Kafka topic, flattens out Json content and writes it using JDBC driver to Teradata Table. The Teradata Table is queried by a Talend Job that transfers the data to a DWH 3N model. Problem: There are multiple deadlock problems in the Spark App. Is there any best practice or technical solution to tackle this problem? Software components:
Apache Spark 2.2.0 Teradata 16.20 Kafka Topic has 5 partitions. Any comment will be appreciated.
... View more
Labels:
10-04-2018
11:06 AM
Hi @Abdul Rahim this is not exactly a DataSet but you can get an idea. I am also not sure about what you want to accomplish. val l = List(List("10","14","16","19","52"), List("08","09","12","20","45"), List("55","56","70","78","53"))
l map ( i => i.sortBy(x => x.toInt).reverse)
... View more
08-29-2018
07:32 AM
Hi everyone, we are using NiFi 1.4 and the ConsumeMQTT processor to subscribe to topics publish by a message broker from Web Methods calls Universal Messaging. For some channels we experience unexpected behavior. When I start the ConsumeMQTT Processor I get the following error message: 2018-08-29 09:05:16,615 ERROR [Timer-Driven Process Thread-1] o.a.nifi.processors.mqtt.ConsumeMQTT ConsumeMQTT[id=01651015-56c3-1f18-9c66-35570bec2f7a] Connection to tcp://10.170.232.13:1883 lost (or was never connected) and ontrigger connect failed. Yielding processor:
org.eclipse.paho.client.mqttv3.MqttException: MqttException
at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:327)
at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:313)
at org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347)
at org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:255)
at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Attached a screenshot with the processor settings. Error is only thrown one time and the people who administer the broker told us they are not able to see the connection. We are not getting the messages. If I have to guess I would say the problem is related to the QoS value, but I have no clues. Any idea or comment would be appreciated. Kind regards, Paul
... View more
Labels:
06-19-2018
09:28 AM
1 Kudo
Hi @rama from the official documentation: An external table describes the metadata / schema on external files. External table files can be accessed and managed by processes outside of Hive. External tables can access data stored in sources such as Azure Storage Volumes (ASV) or remote HDFS locations. If the structure or partitioning of an external table is changed, an MSCK REPAIR TABLE table_name statement can be used to refresh metadata information. To make the metastore aware of partitions that were added directly to HDFS, you can use the metastore check command (MSCK) Since you are adding and dropping partitions to an external table, you need to make the metastore aware of it. Hope that clarifies the issue. Kind regards, Paul
... View more
06-18-2018
01:19 PM
1 Kudo
Hi @rama have you issued this command after creating a partition? MSCK REPAIR TABLE <tablename>; Are you working with internal or internal tables? Kind regards, Paul
... View more
05-29-2018
09:09 PM
Hi guys, thanks so much for the fast support and thanks to the Matts Team @Matt Burgess and @Matt Clarke I finally understood how the processor works. He emits a flow file with no payload and in the meta attributes are the file details like path and filename. Those are used by the HDFSFetch to fetch the correspondent files. Kind regards, Paul
... View more
05-29-2018
04:02 PM
Hi @Sungwoo Park, thanks for the input. Could you please elaborate a little bit more, why could the symlink cause problems, and which ones? I am very interesting since we have this settings in a demo cluster within a customer. BR. Paul
... View more
05-29-2018
03:57 PM
Hi everyone, I am facing a problem during the last days with a NiFi flow using HDFS List and Fetch processors. The queue between them shows more than one million flow files and a total of 0 MB size. This is very confusing. If I tried to see one of the files I am able to list them and if I click on the info bottom I can confirm the file size, but it seems to be empty. Back pressure is set to 100K, therefore I could not understand the number of files. I tried restarting NiFi and dropping the files but the problem returns again. Attached a screenshot of part of the flow.Any idea would be appreciated. Best regards, Paul
... View more
Labels:
05-28-2018
01:03 PM
Hi everyone, I already solved it after a deep analysis of the code. As you can see in the code I posted above, I am repartitioning the data. As a background, the regular process transforms small files, and I want to collect the partial results and created a sigle file, which is then written into HDFS. That is a desired feature since HDFS works better with bigger files. To explain it better, because small and big could be very fuzzy. Our HDFS has a standard configuration of 128 MB blocks, therefore, a 2 or 3 MB files makes no sense and is also affecting the performance. This is the regular situation, but now a backlog of around 1 TB needs to be processed and the repartition is causing a shuffle operation. As far as I understand, the repartition requires to collect all the parts in one worker to create one partition. Since the original RDD is bigger than the memory available in the workers, this collapses everything and throws the errors I reported above. aswdirCsvDf.repartition(1).write I just removed the ".repartition(1)" from the code and now is everything working. The program, writes several files, that is, one file pro worker, and in this context it is quite ok. Kind regards, Paul
... View more
05-24-2018
02:12 PM
Hi everyone, this week we get an increment in the amount of data our Spark ETL Job needs to process. We were able to successfully process up to 120 GB and due to some changes and backlog now around 1TB needs to be processed. We have a cluster with 18 Spark2 clients and I have to use a Yarn Queue that has 30% assigned. Every box has 56 CPUs and 256 GB RAM. HDP Version = 2.6.4 Spark = 2.2.0 I was running the job with: Number of executors: 73 Executor Memory: 3 GB Executor Cores: 5 Driver Memory: 5 GB Spark Master: yarn spark.driver.maxResultSize: 5 GB Since we have this amount of data (1 TB) we are getting the following error: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 11 These are the details of the failing job (see attachments): And the complete error trace: org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 11
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:693)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:147)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) That is a piece of the Spark program: // Define json processing path
val jsonProcessingPath = "hdfs:///" + env +"/etl/dwd/forecast/processing/" + filesPath + "*"
// Target directories
val dwdForecastDataPath = "hdfs:///" + env +"/data/dwd/forecast"
val dwdForecastStagePath = "hdfs:///" + env +"/etl/dwd/forecast/stage"
val jsonRDD = spark.sparkContext.wholeTextFiles(jsonProcessingPath, 72)
val jsonDF = spark.read.json(jsonRDD.map(f => f._2))
val dwdJsonRecords = jsonDF.select($"meta.filename"
, $"meta.numberOfRecords", $"meta.recordNumber", explode($"records").as("records"))
.select($"filename", $"numberOfRecords", $"recordNumber", $"records.time", explode($"records.grid s").as("grids"))
.select($"filename", $"numberOfRecords", $"recordNumber", $"time", $"grids.gPt".getItem(0).as("lat 1"), $"grids.gPt".getItem(1).as("long1"), $"grids.gPt".getItem(2).as("lat2"), $"grids.gPt".g etItem(3).as("long2"), $"grids.gPt".getItem(4).as("value"))
// Create Temporary View for data processing
dwdJsonRecords.createOrReplaceTempView("dwd")
// Create table from View
spark.table("dwd")
// Load the table in memory
spark.table("dwd").cache
/* --------------------------------------
-------- ASWDIR_S ----------------
------------------------------------ */
val aswdirCsvDf = spark.sql("""SELECT regexp_extract(filename, 'cosmo.+_([0-9]{10}_[0-9]{3})_.+(?=.gri b2.bz2)', 1) as `msg_id`
, to_utc_timestamp(CAST(from_unixtime(unix_timestamp(time,"yyyy-MM-dd'T'HH:mm:ssXXX")) as timestam p),'Europe/Berlin') as `msg_ts`
, lat1, long1, lat2, long2, value FROM dwd WHERE filename like '%ASWDIR%' and value != 'NaN'""")
aswdirCsvDf.repartition(1).write.format("com.databricks.spark.csv").option("timestampFormat", "yyyy-MM-dd HH:mm:ss").mode("append").save(aswdirPathCsv.concat("ASWDIR_S"))
I am using a wholeTextFiles to load several JSON files at once and for 72 partitions. I selected this values after some try and error some months ago. Then I want to catch the dataframe in-memory. Finally I write some queries to flatten-out the JSON content and write it back as single csv file to HDFS. Attach you can find an screen shot of the stages. Any help will be highly appreciated. Kind regards, Paul
... View more
Labels:
04-28-2018
09:49 AM
Hi @Sungwoo Park, You can have a look at this question. I think it would help you : https://stackoverflow.com/questions/47198678/zeppelin-python-conda-and-python-sql-interpreters-do-not-work-without-adding-a Best regards, Paul
... View more
04-23-2018
01:38 PM
Hi everyone, we are facing an unexpected behavior using NiFi 1.4 in order
to move new incoming files in HDFS from an “incoming” folder to the “processing”
folder. This is the flow design (see attached file: fetch-new-file.png): Not all of the new
files are listed and fetched. Every file delivery consists of 37 files. For
some reason, sometimes only a group of these files are correctly moved. The
files are arriving in a specific order, and the gaps are sequential, i.e., the
files 1 to 8 are listed, fetched and moved, and then the files 20 to 37. The
files 9 to 19 are missing in the target folder. The scheduling of
the ListHDFS was set to 2 seconds and I changed it to 30 Minutes to see if its
help. I also verified the
audit log of HDFS. On this image you can see a correctly processed file on the top
and an incorrectly processing file on the bottom(see attached file: file-not-moved.png): I was not able to see any
error message or warning in the NiFi app log. Any comment will be
appreciated. Kind regards, Paul
... View more
04-04-2018
12:10 PM
Hi @David Streever, thanks for your reply. I think I found the problem but I still have to test the solution. It seems that Spark is not able to traverse my directory structure unless I create partitions. Even when I defined the properties: .config("mapreduce.input.fileinputformat.input.dir.recursive","true")
.config("mapred.input.dir.recursive","true")
.config("hive.mapred.supports.subdirectories","true")
... View more
04-04-2018
09:55 AM
Hi everyone, I am getting crazy trying to figure out, why I cannot read a Hive External Table, which points to a directory with parquet files. The parquet files are created with a Spark program like this: eexTable.repartition(1).write.mode("append").save(dataPath.concat(eexFileName)) I created an external table using this dll: CREATE EXTERNAL TABLE my_db.eex_actual_plant_gen_line (
meta_date timestamp,
meta_directory string ,
meta_filename string,
meta_host string,
meta_numberofrecords int,
meta_recordnumber int,
meta_sequenceid string,
country string,
source string,
data_timestamp timestamp,
actual_generation double,
publication_timestamp timestamp,
modification_timestamp timestamp,
created_on timestamp
)
COMMENT 'External table eex transparency actual plant generation line'
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/development/data/EEX/transparency/production/usage' I am able to query this table using Ambari or CLI but When I try to use spark I can retrieve the table schema but no rows are returned: import org.apache.spark.sql.{ Row, SaveMode, SparkSession }
import org.apache.spark.sql.functions._
val warehouseLocation = "/apps/hive/warehouse"
val spark = SparkSession
.builder()
.appName("EEX_Trans")
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("hive.metastore.uris", "thrift://myserver1:9083,thrift://myserver2:9083")
.enableHiveSupport()
.getOrCreate()
val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
spark.sql("Select * from dev_sdsp.facilities").count() I cannot find the error and I already read 1000 posts without luck. Any comment will be appreciated. Kind regards, Paul
... View more
Labels:
03-15-2018
09:02 AM
Hi @Patrick Young You need to follow many steps to make this works. About python: - I installed anaconda3 and the critical step is do not let anaconda3 to be configured in the environment variables. HDP platform needs python 2 for some scripts and the python path needs to be resolved to a python 2 installation. Since I want to have spark and spark2 interpreters I commented the SPARK_HOME line in the zeppelin-env.sh file: Another configuration I changed in this file: According
to the documentation, the variable ZEPPELIN_JAVA_OPTS changed in spark2 to
ZEPPELIN_INTP_JAVA_OPTS. Since both versions are active these two variables are
defined: exportZEPPELIN_JAVA_OPTS="-Dhdp.version=None
-Dspark.executor.memory=512m -Dspark.executor.instances=2
-Dspark.yarn.queue=default" export ZEPPELIN_INTP_JAVA_OPTS="-Dhdp.version=None
-Dspark.executor.memory=512m -Dspark.executor.instances=2
-Dspark.yarn.queue=default" - You need to configure the spark2 interpreter as follow: I also created a Python interpreter: Finally I created a symbolic link to be able to find conda Create symlink to /bin/conda: ln -s /opt/anaconda3/bin/conda /bin/conda Of course you have to adjust the paths above to your paths. Hope that helps. Kind regards, Paul
... View more
03-06-2018
08:11 PM
Hi @Dongjoon Hyun, thanks for your answer. We are planing an spark upgrade in the next sprints. I would like to improve the performance of my current script anyway. BR. Paul
... View more
03-06-2018
08:09 PM
Hi @Timothy Spann, thanks for your answer. We made an effort a couple of month ago to upgrade NiFi 1.0 to 1.4 and now the processor we need is in a newer version 😕 We will consider it. BTW: how is the performance of this processor according to your experience? BR. Paul
... View more
03-06-2018
08:12 AM
I am trying to parse a json file as csv file. The structure is a little bit complex and I wrote a spark program in scala to accomplish this task. Like the document does not contain a json object per line I decided to use the wholeTextFiles method as suggested in some answers and posts I’ve found. val jsonRDD = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2) Then I read the json content in a dataframe val dwdJson = spark.read.json(jsonRDD) Then I would like to navigate the json and flatten out the data. This is the schema from dwdJson root
|-- meta: struct (nullable = true)
| |-- dimensions: struct (nullable = true)
| | |-- lat: long (nullable = true)
| | |-- lon: long (nullable = true)
| | |-- time: long (nullable = true)
| |-- directory: string (nullable = true)
| |-- filename: string (nullable = true)
|-- records: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- grids: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- gPt: array (nullable = true)
| | | | | |-- element: double (containsNull = true)
| | |-- time: string (nullable = true)
<br> This is my best approach: val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat"))
val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt"))
val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt")
val dwdJson_flat = dwdJson_e3.select($"filename",$"time",$"gPt".getItem(0).as("lat1"),$"gPt".getItem(1).as("long1"),$"gPt".getItem(2).as("lat2"),$"gPt".getItem(3).as("long2"),$"gPt".getItem(4).as("value")) I am a scala rookie and I am wondering if I can avoid create the intermediate dataframes (dwdJson_e1, dwdJson_e2, dwdJson_e3) that seems to be inefficient and the program runs very slowly (compare with a java parser running in a laptop). On the other side I could not find I way how to unbind these nested arrays. spark version: 2.0.0 scala: 2.11.8 java: 1.8 HDP: 2.5 sample Json file I want to convert: {
"meta" : {
"directory" : "weather/cosmo/de/grib/12/aswdir_s",
"filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2",
"dimensions" : {
"lon" : 589,
"time" : 3,
"lat" : 441
}
},
"records" : [ {
"grids" : [ {
"gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ]
}, {
"gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ]
}, {
"gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ]
} ],
"time" : "2018-02-23T12:15:00Z"
}, {
"grids" : [ {
"gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ]
}, {
"gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ]
}, {
"gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ]
}
],
"time" : "2018-02-23T12:30:00Z"
}
]
}
This is a sample output from the json above: filename, time, lat1, long1, lat2, long2, value
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55,45.2,13.575,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575,45.2,13.6,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6,45.2,13.625,3.366295E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.55,45.2,13.575,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7 Any help will be appreciated. Kind regards, Paul
... View more
Labels:
02-26-2018
08:07 AM
Hi @Aditya Sirna, Thanks for your answer. Independent of the Hive-Phoenix platform support I would like to know how to centrally add hive aux libs. I am able to access the phoenix tables from the hive cli in two ways: Running a "set hive.aux.jars.path=<jar location" in the cli Adding a auxlib folder to hive The problem with these 2 approaches is, the library is only available to the node or box where the cli is running. It does not work on hive ambari views or other clients/boxes. Customization of the jinja template for hive-env seems to be extrem complicated for me and I will find a system engineer to do that. I don't really know if it worth and we can just access the phoenix tables directly without hive. Kind regards, Paul
... View more
02-23-2018
03:33 PM
Hi everyone, I am trying to add the Apache Phoenix Storage Handler to our hdp 2.5.3 cluster following the directions I found hier: Phoenix Storage Handler for Apache Hive The problem is, I am working in a ambari managed cluster and I want to do it using ambari. I created a folder auxlib in /usr/hdp/current/hive-server2/ but this only makes the jar available in the node where I created it. I could potentially repeat this step for every node but I also want to access hive from clients outside the cluster. I tried creating a property in the custom hive-site in ambari: hive.aux.jars.path = hdfs:///hdp/apps/2.5.3.0-37/hive/auxjars/* But it did not work. I also don't want to edit the jija template for the advanced hive-env section Any comment will be appreciated. Kind regards, Paul
... View more
Labels:
02-23-2018
03:23 PM
Hhi @smanjee hi @Sridhar Reddy I try to add the phoenix-hive.jar to hive permanently and general available for external applications but I don't know exactly how to do that with ambari. I set the hive.aux.jars.path property in cusmto hive-site in abari to a location in hdfs to centrally reach the jars but it is not working. If I create a auxlib folder in a node, then it works but only for this node. Any comment will be appreciated. Kind regards, Paul
... View more
02-23-2018
03:12 PM
Hi @bthiyagarajan thanks for the article. I would like to know why this does not work, I mean, why I cannot set this property using the custom hive-site in ambari? How should I specify the value of the HIVE_AUX_JARS_PATH in the jinja template? Many thanks in advance, Kind regards, Paul
... View more
02-01-2018
11:13 AM
Hi @Krishnaswami Rajagopalan I don't know exactly the detail of this Sandbox in the Azure Cloud. Are you connecting to the Sandbox or to the docker container inside? The docker container is where zeppelin and other services are located. To connect to the docker container use the port 2222 in your SSH command. Example: ssh root@127.0.0.1 -p 2222 It doesn't matter I guess if your cluster or sandbox is running on the cloud. You should be able to find zeppelin under /usr/hdp/current/zeppelin-server Hope this helps. BR. Paul
... View more
12-26-2017
11:09 PM
Hi @vromeo I raised this question on stack overflow and received an acceptable answer : https://stackoverflow.com/questions/47198678/zeppelin-python-conda-and-python-sql-interpreters-do-not-work-without-adding-a Kind regards, Paul
... View more
12-11-2017
08:05 AM
Hi
everyone, I
currently experience some difficulties with NiFi. We
are running NiFi 1.0 standalone in a HDP 2.5 There
is an unexpected behavior of some Queues, which are not moving any longer. I
can see the number of flow files queued in the processor but if I tried to list
them nothing is shown. We
experience this problem intermittently but it blocks and affects all the flows.
I am not getting any visible warning or error message. I also discovered that
my Grafana monitor for NiFi was dead; I just restarted the correspondent Ambari
metrics reporting task and now is working again. In the monitor I can confirm
the presence of the flow files in the queues. After
digging in the logs I could find some error messages that explain this
behavior. java.util.concurrent.ExecutionException:
java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:1.8.0_111]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[na:1.8.0_111]
at org.apache.nifi.engine.FlowEngine.afterExecute(FlowEngine.java:100) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1150)
[na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.lang.OutOfMemoryError: Java heap space
2017-12-11 07:10:19,523 ERROR [Timer-Driven Process Thread-39] o.a.n.processors.standard.MergeContent
java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar replaced object
2017-12-11 07:15:03,136 ERROR [Event-Driven Process Thread-112]org.apache.nifi.engine.FlowEngine A flow controller task execution stopped abnormally
java.util.concurrent.ExecutionException:
java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:1.8.0_111]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[na:1.8.0_111]
at org.apache.nifi.engine.FlowEngine.afterExecute(FlowEngine.java:100) ~[na:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1150) [na:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
Caused by: java.lang.OutOfMemoryError: Java heap space
But
the question is, why there is no bulletin reporting the out-of-memory problems? I'm trying to figure out how I can prevent these errors. I need to deploy the flows
to a production system and I am concern with it, also because we have
integrated 4 data sources and we expect to have more than 50 in the future. Any
comment will be appreciated. Paul
... View more
Labels:
11-25-2017
07:26 AM
Hi @Chiranjeevi Nimmala you can have a look at my last blog post, it may help you: Installing Apache Zeppelin 0.7.3 in HDP 2.5.3 with Spark and Spark2 Interpreters
... View more
11-25-2017
07:19 AM
Hi @enzo EL 1) If you just need pandas with pyspark, just test it with the example I provided for the spark interpreter 2) It seems like the python interpreter is first available with Zeppelin 0.7.2. Is an upgrade possible for you? 3) You can add non available interpreters following the official documentation: https://zeppelin.apache.org/docs/0.7.0/manual/interpreterinstallation.html I have never done it before but it should work.
... View more
11-24-2017
10:32 AM
@enzo EL what is your Zeppelin version? It seems like an 0.6.x version You may be able to use Pandas without configuring the python interpreter.
... View more
11-24-2017
10:08 AM
Hi @enzo EL In your case: export PYTHONPATH="/root/yes/bin/anaconda/bin:/usr/hdp/current/spark-client/python/lib/py4j-0.8.2.1-src.zip" export SPARK_YARN_USER_ENV="PYTHONPATH=${PYTHONPATH}" export PYSPARK_DRIVER_PYTHON="/root/yes/bin/anaconda/bin/python" export PYSPARK_PYTHON="/root/yes/bin/anaconda/bin/python" This should works. BTW. Why have you installed anaconda in this location? Kind regards, Paul
... View more