Member since
04-11-2016
471
Posts
325
Kudos Received
118
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2129 | 03-09-2018 05:31 PM | |
2694 | 03-07-2018 09:45 AM | |
2593 | 03-07-2018 09:31 AM | |
4466 | 03-03-2018 01:37 PM | |
2511 | 10-17-2017 02:15 PM |
08-09-2016
11:18 AM
This should be quite straightforward. If your processor is correctly configured with the same properties as you do in your script, there is no reason it does not work. Could you share the stack traces you may have in ./log/nifi-app.log file?
... View more
08-08-2016
01:24 PM
The job is coded in scala and uses default serialization parameters. The object has no reference to the RDD itself. What do you mean by "broadcast join"? Here is the current DAG:
... View more
08-05-2016
08:21 AM
Hello, I am a bit surprised you receive duplicate tweets. Do you know why? How is configured your GetTwitter processor?
... View more
08-04-2016
02:42 PM
Not sure to understand your point. I probably miss something in how Spark works with broadcasts but my understanding is that broadcasts are immutable read-only objects, why would it keep growing? what is doing Spark with broadcasts? I used the SizeEstimator (thanks for the tip) and what I am broadcasting is a couple of MB.
... View more
08-04-2016
01:01 PM
Hi, I'm running a job on Spark 1.5.2 and I get OutOfMemoryError on broadcast variables access. The thing is I am not sure to understand why the broadcast keeps growing and why it does at this place of code. Basically, I have a large input file, each line having a key. I group by key my lines to have one object with all data related to a given key. Then I am doing a map to iterate over my objects, and for each object, I iterate over a collection which is a broadcast variable. The exception is thrown when iterating on the broadcast variable. Here is a quick example: Input file : key1,row1
key2,row1
key1,row2
key2,row2 Broadcast variable is a list List(1,2,3) I group by key my input : key1, (row1,row2)
key2, (row1,row2) Then I do a map myRdd.map( object -> for(item <- myBroadcast.value) executeFunction(object, item) ) I know the groupByKey is a very costly operation but I am not sure I can avoid it since the 'executeFunction' needs to have all the lines for a given key to be executed. Besides the stage where the groupByKey is performed is successfully completed when the exception is thrown. Here is an extract from the logs: 16/08/04 03:17:50 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far)
16/08/04 03:18:37 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.3 GB so far)
16/08/04 03:19:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.4 GB so far)
16/08/04 03:20:07 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far)
16/08/04 03:20:53 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.6 GB so far)
16/08/04 03:21:11 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.7 GB so far)
16/08/04 03:21:15 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 2.5 GB so far)
16/08/04 03:44:22 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.4 GB so far)
16/08/04 03:53:03 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 3.8 GB so far)
16/08/04 04:02:00 WARN storage.MemoryStore: Not enough space to cache broadcast_90 in memory! (computed 4.1 GB so far)
16/08/04 04:20:52 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
16/08/04 04:20:52 ERROR executor.Executor: Exception in task 1.0 in stage 62.1 (TID 1109)
java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:229)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:194)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:165)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:550)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:168)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) ...
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at org.apache.spark.util.collection.CompactBuffer.map(CompactBuffer.scala:30) Any suggestion regarding what could explain this behavior? Thanks!
... View more
Labels:
- Labels:
-
Apache Spark
08-03-2016
11:17 AM
Hi @Darek Krysmann, Could you try something like: ^(?!S_)((?!\$).)*
... View more
08-03-2016
10:10 AM
Hi @Jaime, Spark 2.0 is not supported in 2.4.x. Have a look here: https://hortonworks.com/blog/welcome-apache-spark-2-0/ Hope this helps.
... View more
08-01-2016
09:34 PM
1 Kudo
Hi @Ed Prout, If you don't care about having one FlowFile by line from your input file, I'd suggest you to use RouteText processor with a matching strategy 'starts with' and adding a custom property like 'prefix' with the value '#'. This will create a relationship 'prefix' with all lines starting with # (then you'll want to route the lines for relationship 'unmatched' if you want lines not starting with a #). Hope this helps.
... View more
08-01-2016
03:10 PM
2 Kudos
Since you are running on a standalone NiFi instance, the MapCacheServer is running locally, then you should put 'localhost' for the property 'Server Hostname' in the client service. Let me know if it helps.
... View more