Member since
11-07-2016
637
Posts
253
Kudos Received
144
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2224 | 12-06-2018 12:25 PM | |
2281 | 11-27-2018 06:00 PM | |
1775 | 11-22-2018 03:42 PM | |
2831 | 11-20-2018 02:00 PM | |
5132 | 11-19-2018 03:24 PM |
07-20-2018
06:20 PM
In the windows 10 operating system you will see many changes here you will get the hard drive storage as This PC so here with the help of our tutorial you will get the process to access windows 10 file explorer in easiest way and access your file explorer in windows 10 easily.
... View more
06-20-2018
07:21 AM
@Saurabh Ambre, Glad to know that the previous issue is resolved. It is always good to create a separate thread for each issue. Please create a new issue for this issue so that the main thread doesn't get deviated. Also in the new question , put the complete stack trace and attach the pom.xml file. Feel free to tag me in the question. Please Accept the above answer.
... View more
02-07-2019
06:18 PM
Awesome!!!
... View more
06-15-2018
10:56 AM
1 Kudo
@Alex Witte, According to your question, you want to transform it to the below format Col1 Col2
1 [agakhanpark,science centre,sunnybrookpark,laird,leaside,mountpleasant,avenue]
2 [agakhanpark,wynford,sloane,oconnor,pharmacy,hakimilebovic,goldenmile,birchmount] I have changed your code little bit and was able to achieve it. Please check this code and the pyspark execution output from pyspark.sql.types import *
data_schema = [StructField('id', IntegerType(), False),StructField('route', StringType(),False)]
final_struc = StructType(fields=data_schema)
df = sqlContext.read.option("delimiter", "|").csv('/user/hrt_qa/a.txt',schema=final_struc)
df.show()
from pyspark.sql.functions import udf
def str_to_arr(my_list):
my_list = my_list.split(",")
return '[' + ','.join([str(elem) for elem in my_list]) + ']'
str_to_arr_udf = udf(str_to_arr,StringType())
df = df.withColumn('route_arr',str_to_arr_udf(df["route"]))
df = df.drop("route")
df.show() >>> from pyspark.sql.types import *
>>> data_schema = [StructField('id', IntegerType(), False),StructField('route', StringType(),False)]
>>> final_struc = StructType(fields=data_schema)
>>> df = sqlContext.read.option("delimiter", "|").csv('/user/hrt_qa/a.txt',schema=final_struc)
>>> df.show()
+---+--------------------+
| id| route|
+---+--------------------+
| 1|agakhanpark,scien...|
| 2|agakhanpark,wynfo...|
+---+--------------------+
>>>
>>>
>>> from pyspark.sql.functions import udf
>>> def str_to_arr(my_list):
... my_list = my_list.split(",")
... return '[' + ','.join([str(elem) for elem in my_list]) + ']'
...
>>> str_to_arr_udf = udf(str_to_arr,StringType())
>>> df = df.withColumn('route_arr',str_to_arr_udf(df["route"]))
>>> df = df.drop("route")
>>> df.show()
+---+--------------------+
| id| route_arr|
+---+--------------------+
| 1|[agakhanpark,scie...|
| 2|[agakhanpark,wynf...|
+---+--------------------+ . Please "Accept" the answer if this helps. . -Aditya
... View more
05-25-2018
09:57 AM
2 Kudos
@Michael Bronson, You can do this in 3 steps. 1. Get the latest version tag for kafka-env. You can do this by hitting below curl request #curl -u admin:admin -H "X-Requested-By: ambari" http://{ambari-host}:{ambari-port}/api/v1/clusters/{cluster-name}?fields=Clusters/desired_configs
Sample resp:
{
"href" : "http://localhost:8080/api/v1/clusters/clustername?fields=Clusters/desired_configs",
"Clusters" : {
"cluster_name" : "clustername",
"version" : "HDP-2.6",
"desired_configs" : {
"accumulo-env" : {
"tag" : "version1525370182117",
"version" : 8
},
"accumulo-log4j" : {
"tag" : "version1525368283467",
"version" : 4
},
"accumulo-logsearch-conf" : {
"tag" : "version1525368283467",
"version" : 4
},
"accumulo-site" : {
"tag" : "version1525987821696",
"version" : 9
},
"kafka-env" : {
"tag" : "version1526330057712",
"version" : 1
},
"admin-properties" : {
"tag" : "version1526330057712",
"version" : 1
},
"ams-env" : {
"tag" : "version1",
"version" : 1
},
"ams-grafana-env" : {
"tag" : "version1",
"version" : 1
}
}
}
}
2) Get the tag for kafka-env from the above response. For above example call, tag for kafka-env is "version1526330057712". Now get the latest kafka-env config by using the above tag and the curl call. curl -u admin:admin -H "X-Requested-By: ambari" "http://{ambari-host}:{ambari-port}/api/v1/clusters/{cluster-name}/configurations?type=kafka-env&tag={tag-version}"
Sample resp:
{
"href" : "http://localhost:8080/api/v1/clusters/clustername/configurations?type=kafka-env&tag=version1525370182459",
"items" : [
{
"href" : "http://localhost:8080/api/v1/clusters/clustername/configurations?type=kafka-env&tag=version1525370182459",
"tag" : "version1525370182459",
"type" : "kafka-env",
"version" : 10,
"Config" : {
"cluster_name" : "clustername",
"stack_id" : "HDP-2.6"
},
"properties" : {
"content" : "\n#!/bin/bash\n\n# Set KAFKA specific environment variables here.\n\n# The java implementation to use.\nexport JAVA_HOME={{java64_home}}\nexport PATH=$PATH:$JAVA_HOME/bin\nexport PID_DIR={{kafka_pid_dir}}\nexport LOG_DIR={{kafka_log_dir}}\n{% if kerberos_security_enabled or kafka_other_sasl_enabled %}\nexport KAFKA_KERBEROS_PARAMS=\"-Djavax.security.auth.useSubjectCredsOnly=false {{kafka_kerberos_params}}\"\n{% else %}\nexport KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}\n{% endif %}\n# Add kafka sink to classpath and related depenencies\nif [ -e \"/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\" ]; then\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*\nfi\nif [ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then\n. /etc/kafka/conf/kafka-ranger-env.sh\nfi",
"is_supported_kafka_ranger" : "true",
"kafka_keytab" : "/etc/security/keytabs/kafka.service.keytab",
"kafka_log_dir" : "/var/log/kafka",
"kafka_pid_dir" : "/var/run/kafka",
"kafka_principal_name" : "kafka/_HOST@KDC_COLO.COM",
"kafka_user" : "kafka",
"kafka_user_nofile_limit" : "128000",
"kafka_user_nproc_limit" : "65536"
}
}
]
}
3) Copy the properties json from the above response. Append your config "export KAFKA_HEAP_OPTS="-Xms3g -Xmx3g" to the content field under properties json. New content json should look like below "content" : "\n#!/bin/bash\n\n# Set KAFKA specific environment variables here.\n\n# The java implementation to use.\nexport JAVA_HOME={{java64_home}}\nexport PATH=$PATH:$JAVA_HOME/bin\nexport PID_DIR={{kafka_pid_dir}}\nexport LOG_DIR={{kafka_log_dir}}\n{% if kerberos_security_enabled or kafka_other_sasl_enabled %}\nexport KAFKA_KERBEROS_PARAMS=\"-Djavax.security.auth.useSubjectCredsOnly=false {{kafka_kerberos_params}}\"\n{% else %}\nexport KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}\n{% endif %}\n# Add kafka sink to classpath and related depenencies\nif [ -e \"/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\" ]; then\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*\nfi\nif [ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then\n. /etc/kafka/conf/kafka-ranger-env.sh\nfi\nexport KAFKA_HEAP_OPTS=\"-Xms3g -Xmx3g\"" 4) Post the new config to ambari curl -u admin:admin -H "X-Requested-By: ambari" -X PUT -d '[
{
"Clusters": {
"desired_config": [
{
"type": "kafka-env",
"tag": "unique value",
"properties": {
"content" : "\n#!/bin/bash\n\n# Set KAFKA specific environment variables here.\n\n# The java implementation to use.\nexport JAVA_HOME={{java64_home}}\nexport PATH=$PATH:$JAVA_HOME/bin\nexport PID_DIR={{kafka_pid_dir}}\nexport LOG_DIR={{kafka_log_dir}}\n{% if kerberos_security_enabled or kafka_other_sasl_enabled %}\nexport KAFKA_KERBEROS_PARAMS=\"-Djavax.security.auth.useSubjectCredsOnly=false {{kafka_kerberos_params}}\"\n{% else %}\nexport KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}\n{% endif %}\n# Add kafka sink to classpath and related depenencies\nif [ -e \"/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\" ]; then\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\n export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*\nfi\nif [ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then\n. /etc/kafka/conf/kafka-ranger-env.sh\nfi\nexport KAFKA_HEAP_OPTS=\"-Xms3g -Xmx3g\"",
"is_supported_kafka_ranger": "true",
"kafka_keytab": "/etc/security/keytabs/kafka.service.keytab",
"kafka_log_dir": "/var/log/kafka",
"kafka_pid_dir": "/var/run/kafka",
"kafka_principal_name": "kafka/_HOST@KDC_COLO.COM",
"kafka_user": "kafka",
"kafka_user_nofile_limit": "128000",
"kafka_user_nproc_limit": "65536"
},
"service_config_version_note": "New config version"
}
]
}
}
]' "http://localhost:8080/api/v1/clusters/clustername" Make sure to give unique value for the tag key in the above json. Add all the properties obtained from step 3 in the above curl call and add extra config values if you need any After doing these steps, new config will be added to Kafka. Restart kafka for the changes to reflect. Reference : https://cwiki.apache.org/confluence/display/AMBARI/Modify+configurations . -Aditya
... View more
04-01-2018
05:15 PM
@Aishwarya Sudhakar Could you clarify which username under which you are running the spark under? Because of its distributed aspect, you should copy the dataset.csv to HDFS users directory which is accessible to that user running the spark job. According to your output above you file is HDFS directory /demo/demo/dataset.csv so your load should look like this load "hdfs:////demo/demo/dataset.csv" This is what you said. "The demo is the directory that is inside hadoop. And datset.csv is the file that contains data." Did you mean in HDFS? Does the command print anything $ hdfs dfs -cat /demo/demo/dataset.csv Please revert !
... View more
09-11-2018
03:35 PM
Hi Aditya, You need to quote the schema here. It's a reserved word. This works. select * from "SYSTEM"."FUNCTION";
... View more
03-27-2018
04:24 PM
1 Kudo
@Michael Bronson, This is the curl call for start curl -u $USER:$PASSWD -i -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo":{"context":"_PARSE_.START.AMBARI_METRICS","operation_level":{"level":"SERVICE","cluster_name":"hdp","service_name":"AMBARI_METRICS"}},"Body":{"ServiceInfo":{"state":"STARTED"}}}' http://localhost:8080/api/v1/clusters/hdp/services/AMBARI_METRICS
... View more
03-27-2018
08:38 AM
5 Kudos
In this article we will see how to produce messages using a simple python script and consume messages using ConsumeMQTT processor and put them in HDFS using PutHDFS Note: I'm using CentOS 7 and HDP 2.6.3 for this article . 1) Install MQTT sudo yum -y install epel-release
sudo yum -y install mosquitto . 2) Start MQTT sudo systemctl start mosquitto
sudo systemctl enable mosquitto . 3) Install paho-mqtt python library yum install python-pip
pip install paho-mqtt . 4) Configure MQTT password for the user. I have created a sample user 'aditya' and set the password to 'test' [root@test-instance-4 ~]# useradd aditya
[root@test-instance-4 ~]# sudo mosquitto_passwd -c /etc/mosquitto/passwd aditya
Password:
Reenter password: . 5) Disable anonymous login to MQTT Open the file (/etc/mosquitto/mosquitto.conf ) and add the below entries and restart mosquitto allow_anonymous false
password_file /etc/mosquitto/passwd sudo systemctl restart mosquitto . 6) Design the NiFi flow to consume messages and put into hdfs Configure MQTT processor: Right Click on ConsumeMQTT -> Configure -> Properties. Set Broker URI, Client Id, username, password, Topic filter and Max Queue Size Configure PutHDFS processor: Set Hadoop Configuration resources and Directory( to store messages) . 7) Create a sample python script to publish messages. Use mqttpublish.txt attached and rename it to MQTTPublish.py to publish messages . 😎 Run the Nifi flow. . 9) Run the python script attached. python MQTTPublish.py . 10) Check the directory to check if the messages are put in HDFS hdfs dfs -ls /user/aditya/
hdfs dfs -cat /user/aditya/* . Hope this helps 🙂 mqttpublish.txt
... View more
Labels:
03-28-2018
06:06 AM
3 Kudos
@Gayathri Devi, This prediction depends on the date you have. You may have labelled or unlabelled data based on which you have different algorithms. Assuming your data is labelled, then you have to find if you are trying to solve a regression problem or a classification problem. Based on that you can choose the algorithms. Since you have written that you want to find outliers , I'm assuming that it is a regression problem. Then you can use algorithms like Linear Regression, Support Vector Regression, Decision tree regression, Random forest regression etc. If your data is unlabelled, you have to use a unsupervised learning method. You will have algorithms like K-Means clustering, Hierarchical clustering etc. The main part of any solving machine learning problem is learning what your data is and choosing the right algorithm for your problem. So you may need to spend more time in analysing data and choosing the right algorithm. Here are few links for the concepts mentioned above. You can find these algorithms in spark. https://spark.apache.org/docs/latest/ml-guide.html https://machinelearningmastery.com/classification-versus-regression-in-machine-learning/ https://www.quora.com/What-is-the-main-difference-between-classification-problems-and-regression-problems-in-machine-learning https://machinelearningmastery.com/supervised-and-unsupervised-machine-learning-algorithms/ https://stackoverflow.com/questions/19170603/what-is-the-difference-between-labeled-and-unlabeled-data Happy machine learning 🙂 . -Aditya
... View more