Member since
09-16-2017
20
Posts
1
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2385 | 12-01-2017 05:05 PM | |
4409 | 12-01-2017 04:59 PM |
12-01-2017
05:05 PM
All - just an update. The ES-Hadoop connector, as it should be, is something more in the benefit of Elasticsearch, not so much Spark or Hadoop. It will allow me to connect to the Elasticsearch cluster with spark-shell or PySpark. This is great for ad-hoc queries, however, for long term data movement, use Apache NiFi. The setup, if you are interested, can be found via Stackoverflow here, where I got some great help: https://stackoverflow.com/questions/47399391/using-nifi-to-pull-elasticsearch-indexes?noredirect=1#comment82139433_47399391 One issue I ran into was that we have SSL setup on Elasticsearch and while I was referencing that cert (I had to convert the PEM format to JKS, since Hadoop/Spark only understand JKS), it wasn't working. After working with Elasticsearch support, they had me add the CERT to the CACERTS file in my Java installation and everything worked after that. I had to do this on each box in my cluster for Spark/Hadoop if I ran a job across the cluster. If I ran in stand-alone mode, the single box was fine. Either way, this can save you a lot of issues, just add your Elasticsearch CERT to the CACERTS using the keytool.
... View more
12-01-2017
04:59 PM
All - just an update. I was able to get help resolving this on StackOverflow. See the post here: https://stackoverflow.com/questions/47399391/using-nifi-to-pull-elasticsearch-indexes?noredirect=1#comment82139433_47399391
... View more
11-28-2017
11:26 PM
All - thanks in advance for any help that can be provided. So, big picture: I have stood up a Hadoop/Spark cluster using Ambari (HDP 2.6.2/Hadoop 2.7.3/Spark 2.1.1) and want to do some advanced machine learning/analytics on some data. First use case is anomaly detection in syslog where we get that data from our Elasticsearch cluster. I was pointed to NiFi as a solution for automating our data movement from ES to HDFS. Each day in ES is a unique index (i.e. logstash-syslog-2017.11.28, etc.) and my goal is to setup NiFi to grab those indexes and save them to HDFS is Parquet format. Since everything in HDFS is going to be processes with MapReduce or Spark, this is a natural choice. I have a basic flow setup with some help from StackOverflow and Reddit (see flow.png). Now, this is using a GenerateFlowFile processor, since I already know how my data comes back. I just got a single document return and put that in as custom text so I wouldn't have to pound my ES cluster during testing. So, note that I have three outputs (PutFule, PutHDFS and PutParquet). The first two work just fine, they output files locally and to HDFS - no problem. The problem comes in on the third output, which is the one I need - PutParquet. I get an error: "Failed to write due to org.apache.nifi.processors.hadoop.execption.RecordReaderFactoryExcpetion: Unable to create RecordReader: Unable to create RecordReader." So, I thought, maybe it's due to all of the nested JSON I get back from Elasticsearch. I decided to go back to basics and get something working - so I followed the example from: https://community.hortonworks.com/articles/140422/convert-data-from-jsoncsvavro-to-parquet-with-nifi.html I still get the same error back, even when I change the JSON example and the Avro Schema to the ones specified in this example. So, I think I must have something unnecessary in my flow or some odd setting that I am unaware I need to setup. Things must have changed between whatever version of NiFi was being used in the above URL and 1.4, as the example shows Schema settings as part of the PutParquet processor options, however, in my version, that comes as part of the JsonTreeReader and associated AvroSchema. I am new to all of this and while I get the gist of NiFi, I start to get lost around the JsonTreeReader and AvroSchema stuff. What I'd like is to read in JSON from Elastic, convert that JSON to Parquet and store it. Do I need to define a schema for this, or is there some automated way I can have NiFi convert my read in JSON into something that can be stored as Parquet? Here is a example of how my data looks coming back from ES (some fields have been masked for obvious reasons). Any help on sorting this out and getting a working flow from ES to a Parquet file would be amazing, I've been working on this for around a week or so and am starting to come to the end of my rope on this.... Thanks so much! [
{
"hits": [
{
"app": {
"threadID": "6DE2CB70",
"environment": "DEV",
"service": "commonvpxLro",
"opID": "opID=HB-host-3009@205149-7520446b-5c",
"service_info": "VpxLRO"
},
"severity": "info",
"hostIP_geo": {
"location": {
"lon": XXXX,
"lat": XXXX
},
"postal_code": "Location 1"
},
"hostname": "DEV3-02",
"@timestamp": "2017-11-27T22:20:51.617Z",
"hostIP": "10.10.0.1",
"meta": {
"grok_match": "grok_match_1",
"received_at_indexer": "2017-11-27T22:20:51.727Z",
"received_from": "10.10.0.1",
"processed_at_indexer": "xvzzpaXXXXc",
"kafka_topic": "syslog",
"received_at_shipper": "2017-11-27T22:20:51.661Z",
"processed_at_shipper": "xvzzpaXXXXb"
},
"@version": "1",
"syslog": {
"program": "Vpxa",
"type": "vmware_esxi",
"priority": "166"
},
"message": "-- BEGIN session[938e0611-282b-22c4-8c93-776436e326c7]52dd2640-f406-2da1-6931-24930920b5db -- -- vpxapi.VpxaService.retrieveChanges -- 938e0611-282b-22c4-8c93-776436e326c7\n",
"type": "syslog",
"tags": [
"syslog",
"vmware",
"esxi",
"index_static",
"geoip"
]
}
]
}
]
... View more
Labels:
- Labels:
-
Apache NiFi
10-31-2017
07:36 PM
Perfect! Thanks so much!
... View more
10-31-2017
06:03 PM
So, I think I fixed this, as the 'hdfs' user, I simply did an 'hdfs -chmod -R 777 /spark2-history' and restarted services. I'm no longer seeing the access / permission errors. Let me know if this was the correct fix or if I maybe did something I shouldn't have.... Thanks!
... View more
10-31-2017
05:21 PM
@Aditya Sirna - I checked this prior to posting - the Spark user does own that directory, but I don't think the issue with the Spark user. It seems to be with the other user zx6868a: org.apache.hadoop.security.AccessControlException: Permission denied: user=spark, access=READ, inode="/spark2-history/local-1505774309971":zx6878a:hadoop:-rwxrwx--- I think what is happening is that user is running PySpark / spark-submit as his own username, not as the Spark user. At least that is my guess. Would doing a CHMOD on that /spark2-hisotry folder to give everyone read and write access (chmod 777) be appropriate and fix this?
... View more
10-30-2017
10:12 PM
We are just getting underway to use Spark and the rest of our HDP 2.6.2 distribution for some machine learning. I got a ticket from our infrastructure guys late last week stating that I was running high on disk usage on one of my nodes. This particular node happens to be a Spark2 History Server. So, I go to check it out. Sure enough /var/log/spark2/ had one log that was over 14gb!! I removed that file, restarted the service and when I came in this morning after the weekend to check on it, once again, ~12gb. So I check the logs and see stuff like this: 17/10/30 15:00:47 INFO FsHistoryProvider: Replaying log path: hdfs://xczzpa0073.apsc.com:8020/spark2-history/local-1505774309971
17/10/30 15:00:47 ERROR FsHistoryProvider: Exception encountered when attempting to load application log hdfs://xczzpa0073.apsc.com:8020/spark2-history/local-1505774309971
org.apache.hadoop.security.AccessControlException: Permission denied: user=spark, access=READ, inode="/spark2-history/local-1505774309971":zx6878a:hadoop:-rwxrwx---
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:219)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1955)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1939)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPathAccess(FSDirectory.java:1913)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2001)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1970)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1883)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)
at sun.reflect.GeneratedConstructorAccessor5.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1225)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:274)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:266)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1538)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:331)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:327)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:327)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:786)
at org.apache.spark.scheduler.EventLoggingListener$.openEventLog(EventLoggingListener.scala:312)
at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:647)
at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:464)
at org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$3$$anon$4.run(FsHistoryProvider.scala:352)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=spark, access=READ, inode="/spark2-history/local-1505774309971":zx6878a:hadoop:-rwxrwx---
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:219)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1955)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1939)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPathAccess(FSDirectory.java:1913)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2001)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1970)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1883)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
at org.apache.hadoop.ipc.Client.call(Client.java:1498)
at org.apache.hadoop.ipc.Client.call(Client.java:1398)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1238)
... 20 more
Ok, so it appears to be a permissions thing, but I am not sure how to fix this. A little background - I am in an enterprise setting, but have setup a vanilla HDP deployment with Ambari - no AD/Kerberos stuff going on, I am letting local process accounts deal with things. When you see a user like zxXXXX, that is a local user. In the example above, that is one of our contractors we have working on some of the heavy lifting for some of our machine learning algorithms. Looks like maybe he is running Spark or PySpark as his user, not the Spark user, but I can't really tell. Any idea what is going on here and how I can fix it to keep from getting so many error logs from building up? Thanks!
... View more
Labels:
- Labels:
-
Apache Ambari
-
Apache Spark
10-25-2017
10:01 PM
Since Spark 2.2 is not provided by HDP (yet), and we are trying to use "computeSVD", is there an alternative for the same functionality under Spark 2.1? Basically the code we are using to compute the singular decomposition value of a matrix of message identifiers needs functionality from that "computeSVD". Its provided in the Scala API of Spark 2.1.1, but not in the Python API. Is there something else I can use for this?
... View more
10-25-2017
05:55 PM
Thanks in advance on this - I am running Ambari and have deployed HDP-2.6.2.0 on a 12-node cluster (one name node, one secondary name-node and 10 data nodes). Originally when I did this I deployed Spark 2.1.1, HDFS 2.7.3 and other dependencies. One of our data scientists stated that he wants to use "computeSVD", but it is only available via the Python API in Spark 2.2. I'd like to upgrade Spark in place, but not sure if I need to upgrade other things, if I can do this via Ambari or what. Is there a process for doing this? Is Spark 2.2 provided in HDP at all yet? Thanks!
... View more
Labels:
- Labels:
-
Apache Ambari
-
Apache Spark
10-17-2017
11:52 PM
1 Kudo
I have a Hadoop/Spark cluster setup via Ambari (HDP -2.6.2.0). Now that I have my cluster running, I want to feed some data into it. We have an Elasticsearch cluster on premise (version 5.6). I want to setup the ES-Hadoop Connector (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/doc-sections.html) that Elastic provides so I can dump some data from Elastic to HDFS. I grabbed the ZIP file with the JARS and followed the directions on a blog post at CERN: https://db-blog.web.cern.ch/blog/prasanth-kothuri/2016-05-integrating-hadoop-and-elasticsearch-%E2%80%93-part-2-%E2%80%93-writing-and-querying So far, this seems reasonable, but I have some questions: 1. We have SSL/TLS setup on our Elasticsearch cluster, so when I perform a query, I obviously get an error using the example on the blog. What do I need to do on my Hadoop/Spark side and on the Elastic side to make this communication work? 2. I read that I need to add those JARS to the Spark classpath - is there a rule of thumb as to where i should put those on my cluster? I assume on of my Spark Client nodes, but I am not sure. Also, once i put them there, is there a way to add them to the classpath so that all of my nodes / client nodes have the same classpath? Maybe something in Ambari provides that? Basically what I am looking for is to be able to preform a query to ES from Spark that triggers a job that tells ES to push "X" amount of data to my HDFS. Based on what I can read on the Elastic site, this is how I think it should work, but I am really confused by the documentation. It's lacking and has confused both me and my Elastic team. Can someone provide some clear directions or some clarity around what I need to do to set this up?
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark