Member since
06-07-2016
923
Posts
321
Kudos Received
115
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2190 | 10-18-2017 10:19 PM | |
2370 | 10-18-2017 09:51 PM | |
10536 | 09-21-2017 01:35 PM | |
667 | 08-04-2017 02:00 PM | |
863 | 07-31-2017 03:02 PM |
10-30-2017
06:03 AM
Hi I am writing the most simplest of publishers. The publisher is on my laptop and cluster is remote. I have done telnet and everything works. Except I don't see my messages being published. The program starts by calling "readFile()" from a test main class. I am copying my code below: public class Publisher {
private static final String FILE_NAME = "/tmp/device-data.txt" ;
private static final String TOPIC = "device" ;
private static final String BOOTSTRAP_SERVERS = "<my host and port>" ;
private static final String PRODUCER_GROUP = "DeviceDataProducer" ;
private KafkaProducer<Long, String> producer = null ;
public Publisher (){
producer = new KafkaProducer<Long, String>(initializeKafkaConnection());
}
public void readFile(){
BufferedReader bufferedReader = null ;
try{
bufferedReader = new BufferedReader(new FileReader(FILE_NAME));
bufferedReader.lines().forEach(i -> publishToKafka(i));
}
catch(IOException exception){
exception.printStackTrace();
}
finally{
if (bufferedReader != null){
try{
bufferedReader.close();
}
catch(IOException exception){
System.out.println();
exception.printStackTrace();
}
}
producer.flush();
producer.close();
}
}
private void publishToKafka(String newEvent) {
long timestamp = System.currentTimeMillis();
ProducerRecord<Long, String> record = null ;
record = new ProducerRecord<>(TOPIC, timestamp, newEvent);
producer.send(record);
System.out.println(newEvent);
}
private Properties initializeKafkaConnection(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_GROUP);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 0);
//properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 0);
return properties ;
}
}
... View more
Labels:
- Labels:
-
Apache Kafka
10-26-2017
03:36 PM
@Ravikiran Dasari I would use Sqoop incremental import. Have you tried it? https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html#_incremental_imports
... View more
10-19-2017
03:21 PM
@Mateusz Grabowski Check in the components installed and I am sure smartsense is still installed. Hortonworks will not be collecting data but smartsense is still doing its job. Check in the list of components you are installing and I am sure it is still installed.
... View more
10-19-2017
03:10 PM
1 Kudo
@Xavier Vincelot You have to understand what bucketing really does when you are writing new records. For high cardinality columns, for example customer_id in your case, you can create thousands of customers and then bucket them into few files (let's say 256 buckets). Now when you run query, your customer_id will be hashed and Hive will search only the bucket where your customer-id is expected to exist and not a complete table space scan. That being said, it is still reading that one file which contains your customer_id and that file can be large. When you say, bucketing doesn't help, I think you are testing on a small scale or your buckets are not bigger compared to your cluster size (4 node cluster with 50 buckets but data small enough for four nodes, will parallel run query on all data even without bucketing and you won't see the benefit of bucketing - You do more complex operations like join between bucketing tables and you'll start seeing the benefits). So, bucketing definitely helps, but for a simple select, that benefit might not be very visible.
... View more
10-18-2017
10:19 PM
@Mateusz Grabowski those recommendations are based on smartsense analysis of your cluster. Smartsense uses machine learning and data analysis from hundreds of clusters and tuned them based on what it has seen in the past on other hundreds of clusters suggests optimizations. If you are sure about your settings and know about your workload and know what you are doing, then you should go with that. Here is a little article that explains how Smart sense comes up with tuning your cluster and optimizing your hardware for best use. https://hortonworks.com/blog/case-study-2x-hadoop-performance-with-hortonworks-smartsense-webinar-recap/
... View more
10-18-2017
10:10 PM
@Jason Bowles Partitioning runs after mapper and before Reducer. there is no getting around that. But I am a little confused about what you are trying to do so please bear with me. The way I look at it, you can create a custom partitioner where values for each key based on month and year will always go to same reducer so the reducer output will be such that values for same month and year will be in the same file making your access queries scan fewer files. Am I missing something? I am still confused what new partitioning post a reduce step will achieve. And one quick suggestion. Since this seems like a new job, why not write this in Spark? The API is much easier and more flexible to work with specially with such requirements.
... View more
10-18-2017
09:51 PM
2 Kudos
@Swati Sinha The exception you are getting is java.lang.NullPointerException at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector.get(WritableLongObjectInspector.java:36) at org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:243) Somehwhere in your data, it is expecting a long and is not getting a long value. Looking at your record in the log file, the only thing that jumps off are the following attributes: ,"_col126":254332105,"_col183":7141271 I think this is a malformed jason and your colon (:) should be in quotes and the values outside just like the rest of your json record. I could be wrong here but this is what it looks like right now.
... View more
10-18-2017
09:37 PM
1 Kudo
@Bertrand Goubot You need to talk to your Hortonworks account team to ask about HDF roadmap. As for upgrading only Nifi to a latest version is not possible. When Hortonworks releases a new version of HDF which includes Nifi, Kafka, Streaming Analytics Manager (powered by Apache Storm) and Schema Registry, all these components go through integration testing to make sure an Enterprise Grade Software release. If you just upgrade one version for example Nifi to 1.4, then that version has not gone through rigorous integration testing and things might break. This is the reason why such an upgrade will not be supported and will render your HDF unsupported. So, you really need to engage your account team to find what your options are and how you "might" be able to upgrade sooner (if at all possible) and whether they can share product roadmap with you.
... View more
10-18-2017
04:29 AM
@Neha G The answer to both your questions is Yes (for Active Directory integration and installing Kerberos client). You need to first understand how Kerberos works and how it integrates with Hadoop before attempting to create users and connecting to cluster with users authenticated by Keberos. The following link does a really good job explaining how to setup Kerberos and integrate with Active Directory. https://hortonworks.com/blog/enabling-kerberos-hdp-active-directory-integration/
... View more
10-16-2017
06:25 PM
@Neha G There are two ways to access services from mc2 to your cluster on mc1. First is you have to create principals who you will have to give access to cluster. Even this may not be necessary by using service accounts and then allowing proxy users. Now once you decide on what strategy to use, for example either create new principals and giving them access or use service principals with proxy users, then you create keytabs for principals and use those keytabs to login using kinit. If you are writing a Java program that runs from mc2, then use UserGroupInformation class in your Java program.
... View more
10-13-2017
05:22 PM
@Bilel Boubakri May be you have a wrong regex. Can you share your regex and what's happening?
... View more
10-11-2017
01:19 AM
@David Sheard Are you talking about the green button on the bottom right of the screen on your app - shown in my screenshot here ? You just need to click it to run your app. Storm status is not shown in Streaming Analytics App.
... View more
10-11-2017
12:55 AM
@Abhijeet Rajput The error you are getting is related to not setting tez.tez-ui.history-url.base value. In my case it is set to http://<hostname>:8080/#/main/view/TEZ/tez_cluster_instance where hostname is same as ambari hostname - I am guessing for you it will be localhost but I can be wrong. Once you set this value under Tez config, your error should go away.
... View more
10-10-2017
03:58 PM
@Ganesh Ganjare Do you want to speed up the get process? This has to supported by your file system. Ignore Nifi for a moment. Can you create a Java program to read file from local file system with multiple threads? Answer is yes. But what would really happen? The file is sitting on one spinning disk. That's your limiting factor. Each thread will be spinning the disk to random locations and making it slower than it would be with one thread. That's why you should read one file with one thread sequentially. Now, if you have a system like HDFS, that distributed one file to multiple disks and multiple nodes, well, then you can use multiple threads in parallel to read parts of file from each disk but notice on a single disk, operation is still single threaded (one mapper per disk). So, no your get operation cannot be multi threaded.
... View more
10-10-2017
03:42 PM
1 Kudo
@sally sally Here is what you want to read. I recommend reading both links - it's same webpage and its not too long. https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#flowfile-repository https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html#DeeperView --> specific answer to your question.
... View more
10-10-2017
03:22 PM
@Eric England Until Hortonworks supports Nifi 1.4 and publishes a new management pack for the new version, your only option is to create your own management pack. One way Hortonworks provide value to its Enterprise customers is to make sure that all parts of a version are tested together - integration testing. If you upgrade one component from let's say HDF 3.0 then that creates a risk from support stand point. Since everything Hortonworks does is open source, you can just create your own management pack and manage new version of Nifi from Ambari. https://cwiki.apache.org/confluence/display/AMBARI/Management+Packs -->how to create management pack.
... View more
10-09-2017
04:49 PM
@Mrinmoy Choudhury Pig has built in functions for converting case. https://pig.apache.org/docs/r0.9.1/func.html#lower https://pig.apache.org/docs/r0.9.1/func.html#upper You will have to determine if a word is upper or lower case using something like "word == word.lower" or "word == word.upper" and then convert to what you desire.
... View more
10-05-2017
04:59 AM
@Leonid Fedotov In your /etc/krb5.conf file, can you please check under [libdefaults ] your supported encryption types. Do they include one of the following (from your klist -e): Etype (skey, tkt): aes256-cts-hmac-sha1-96, aes256-cts-hmac-sha1-96
... View more
10-03-2017
02:18 PM
@nicole wells It's difficult for anyone here to speak on behalf of Splunk. You should talk to your Splunk account team. That being said, I played with Hunk back in 2014 and the licensing at the time was based on number of mappers. I can only guess, but i think splunk Hadoop connect is likely using Map Reduce to push data to Hadoop and its licensing will be based on number of mappers. If its using Spark, then it might be based on number of executors. Only their account team can tell you. That being said, like the other answer, for moving data out of Splunk into Hadoop, its going to be significantly cheaper and easier to use Nifi. Its open source nature allows you to install and test it right now and then you can talk to your Hortonworks account team.
... View more
10-02-2017
02:40 PM
@ROHIT AILA I understand. That's why, I think you need to call kafka-consumer-group.sh script from your java program to get what you are looking for.
... View more
10-02-2017
02:23 AM
@Triffids G See if you can find this file in your local file system under /usr/hdp/current/spark2/spark2-client. If it is there then you can copy it from this location to hdfs using following command (remove existing corrupted file first). hdfs dfs -rm hdp/apps/2.6.0.3-8/spark2/spark2-hdp-yarn-archive.tar.gz hdfs dfs -put /usr/hdp/current/spark2/spark2-client/<file name> /hdp/apps/2.6.0.3-8/spark2
... View more
10-01-2017
11:55 PM
@Wolfgang nobody Can you give "yarn" user the right to impersonate in core-site.xml? Try the following: <property>
<name>hadoop.proxyuser.yarn.groups</name>
<value>group your user is part of. Can be comma separated list or '*' (without quotes) for all</value>
<description>Allow the superuser yarn to impersonate any members of the group mentioned in group list</description>
</property>
<property>
<name>hadoop.proxyuser.yarn.hosts</name>
<value>host1,host2</value>
<description>The superuser can connect only from host1 and host2 to impersonate a user. Make it '*' so it can connect from any host</description>
</property>
... View more
09-30-2017
09:39 PM
@ROHIT AILA
Have you tried using consumer.position(TopicPartition partition)? or run the following script from your java program. You can also run the script with zookeeper instead of bootsrap-server. kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --new-consumer --group groupname --describe
... View more
09-30-2017
08:29 PM
@nitins Nobody can speak on behalf of product team here on Hortonworks Community. This is a question that only Product team can answer and usually, they don't like to give target dates.
... View more
09-30-2017
06:46 AM
@Chokri Ben Necib Please see my updated answer. Not sure if it will help, but it might work.
... View more
09-30-2017
06:11 AM
1 Kudo
@Tamil Selvan K Cloudbreak is used to provision, configure and elastically grow clusters in cloud. You would use Ambari to get cluster details you are looking for, which includes cluster name, configuration values and so on. And if you need to get these values without using the interface, you can use Ambari API. So, what you are looking for is available through Ambari and not cloudbreak.
... View more
09-30-2017
06:06 AM
@Prabhu Muthaiyan
Here is how you would do it. first in your spark-env.sh set HADOOP_CONF_DIR to where your hdfs-site.xml, core-site.xml and hive-site.xml exist, such that your program when it runs is able to pick up these files and know how to connect to Hive. Then you basically code similar to below import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate();
DataSet<Row> emp1 = spark.sql("SELECT col1, col2, col3 from emp1 where <condition goes here>");
emp1.write().saveAsTable("emp2") ;
//or use this emp1.write().mode("append").saveAsTable("emp2") ;
you can have write modes which are following: SaveMode.Overwrite : overwrite the existing data. - SaveMode.Append : append the data. - SaveMode.Ignore : ignore the operation (i.e. no-op). - SaveMode.ErrorIfExists : default option, throw an exception at runtime
... View more
09-30-2017
06:06 AM
@Prabhu Muthaiyan
Here is how you would do it. first in your spark-env.sh set HADOOP_CONF_DIR to where your hdfs-site.xml, core-site.xml and hive-site.xml exist, such that your program when it runs is able to pick up these files and know how to connect to Hive. Then you basically code similar to below import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.enableHiveSupport()
.getOrCreate();
DataSet<Row> emp1 = spark.sql("SELECT col1, col2, col3 from emp1 where <condition goes here>");
emp1.write().saveAsTable("emp2") ;
//or use this emp1.write().mode("append").saveAsTable("emp2") ;
you can have write modes which are following: SaveMode.Overwrite : overwrite the existing data. - SaveMode.Append : append the data. - SaveMode.Ignore : ignore the operation (i.e. no-op). - SaveMode.ErrorIfExists : default option, throw an exception at runtime
... View more
09-30-2017
03:34 AM
@sudi ts run the following: hadoop fs -ls /user/mzhou1/.snapshot then also run fs -du -s /user/mzhou1/.snapshot
... View more
09-29-2017
02:36 PM
I just updated my answer. Yes, Distcp can be used.
... View more