Member since
02-13-2019
12
Posts
2
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4628 | 02-19-2019 05:17 AM |
02-04-2021
03:54 AM
1 Kudo
Also make sure that the HDFS Data Node is running on that server.
... View more
12-06-2020
02:46 PM
I'm using Cloudera QuickStart VM 5.13 and I installed their Kafka version. Listing Kafka Topics: /usr/bin/kafka-topics --list --zookeeper quickstart.cloudera:2181 Creating Kafka Topic: /usr/bin/kafka-topics --create --zookeeper quickstart.cloudera:2181 --replication-factor 1 --partitions 3 --topic myFirstTopic Start a Producer: /usr/bin/kafka-console-producer --broker-list quickstart.cloudera:9092 --topic myFirstTopic Start a Consumer: /usr/bin/kafka-console-consumer --bootstrap-server quickstart.cloudera:9092 --topic myFirstTopic --from-beginning Notes for your issue: Replications need to be less than or equal to the number of brokers. I changed offsets.topic.replication.factor in Kafka configuration from Cloudera Manager and I set it to 1 (Cause I have 1 broker) You can delete brokers from Zookeeper as shown in the link below. Then restart Kafka to recreate these brokers. Zookeeper CLI: Accessing Zookeeper CLI: /usr/bin/zookeeper-client List All: ls / Output: [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, ngdata, controller_epoch, solr, consumers, latest_producer_id_block, config, hbase] List Kafka Brokers: ls /brokers Output: [ids, topics, seqid] List Kafka Topics in Zookeeper: ls /brokers/topics Output: [myFirstTopic, __consumer_offsets] Delete a Path in Zookeeper: rmr /brokers
... View more
10-22-2020
04:59 AM
I did as @ssubhas said, setting the attributes to false. spark.sql("SET hive.enforce.bucketing=false")
spark.sql("SET hive.enforce.sorting=false")
spark.sql("SET spark.hadoop.hive.exec.dynamic.partition = true")
spark.sql("SET spark.hadoop.hive.exec.dynamic.partition.mode = nonstrict")
newPartitionsDF.write.mode(SaveMode.Append).format("hive").insertInto(this.destinationDBdotTableName) Spark can create the bucketed table in Hive with no issues. Spark inserted the data into the table, but it totally ignored the fact that the table is bucketed. So when I open a partition, I see only 1 file. When inserting, we should set hive.enforce.bucketing = true, not false. And you will face the following error in Spark logs. org.apache.spark.sql.AnalysisException: Output Hive table `hive_test_db`.`test_bucketing` is bucketed but Spark currently does NOT populate bucketed output which is compatible with Hive.; This means that Spark doesn't support insertion into bucketed Hive tables. The first answer in this Stackoverflow question, explains that what @ssubhas suggested is a workaround that doesn't guarantee bucketing.
... View more
06-28-2020
07:21 AM
I tried your query, but if the table has no comment, it produces a duplicate record for that table. So I modified it a bit mysql -u hive -p
<ENTER YOUR HIVE PASSWORD>
use metastore;
SELECT * FROM (SELECT DBS.NAME AS OWNER, TBLS.TBL_NAME as OBJECT_NAME, TBL_COMMENTS.TBL_COMMENT as OBJECT_DESCRIPTION, TBLS.TBL_ID as OBJECT_ID, TBLS.TBL_TYPE as OBJECT_TYPE, "VALID" as OBJECT_STATUS,COLUMNS_V2.COLUMN_NAME, COLUMNS_V2.COMMENT as COLUMN_DESCRIPTION, COLUMNS_V2.TYPE_NAME AS DATA_TYPE FROM DBS JOIN TBLS ON DBS.DB_ID = TBLS.DB_ID JOIN SDS ON TBLS.SD_ID = SDS.SD_ID JOIN COLUMNS_V2 ON COLUMNS_V2.CD_ID = SDS.CD_ID JOIN ( SELECT DISTINCT TBL_ID, TBL_COMMENT FROM ( SELECT TBLS.TBL_ID TBL_ID,TABLE_PARAMS.PARAM_KEY,TABLE_PARAMS.PARAM_VALUE, TABLE_PARAMS.PARAM_VALUE as TBL_COMMENT FROM TBLS JOIN TABLE_PARAMS ON TBLS.TBL_ID = TABLE_PARAMS.TBL_ID WHERE TABLE_PARAMS.PARAM_KEY = "comment" UNION ALL SELECT TBLS.TBL_ID TBL_ID,TABLE_PARAMS.PARAM_KEY,TABLE_PARAMS.PARAM_VALUE, "" as TBL_COMMENT FROM TBLS JOIN TABLE_PARAMS ON TBLS.TBL_ID = TABLE_PARAMS.TBL_ID WHERE TABLE_PARAMS.PARAM_KEY <> "comment" AND TBLS.TBL_ID NOT IN (SELECT TBL_ID FROM TABLE_PARAMS WHERE TABLE_PARAMS.PARAM_KEY = "comment") ) TBL_COMMENTS_INTERNAL) TBL_COMMENTS ON TBLS.TBL_ID = TBL_COMMENTS.TBL_ID) as view WHERE OWNER = "database_name_goes_here" AND OBJECT_NAME = "table_name_goes_here";
... View more
02-19-2019
05:17 AM
Back So far it's working fine. I also found the problem with the writing in the file. The garbage data between the interceptor and the message can contain literally anything. It contained \n which is the LF in Linux systems. This was causing the Kafka problem as well. Kafka see the \n and it assumes that the message is 2 messages, not 1, that's why when I changed the delimiter to \r\n it assumed the message to be 1 message. That's a good conclusion I guess. If you want to write in a file or apply a regex on it, then just replace \n and \r with an empty string so you don't bother with those annoying control characters. Thanks to whoever wanted to help me.
... View more
02-14-2019
02:34 AM
1 Kudo
So, I did the following in Kafka Consumer element of Streamsets: And now it works fine! I will keep testing it and come back here in a couple of days to mark the post as a solution. I still notice that the file that I write into has more lines than the number of records that enter it. Even if I marked that Ignore Control Characters in the Kafka element, it still happen.
... View more
02-14-2019
01:47 AM
Here is a screenshot when writing the 2 messages to a UTF8 file. I used Notepad++ to show all symbols: You can see the garbage characters between the Interceptor and the message Another thing I noticed, when I configured Kafka consumer data format to be text, I receive 2 message (Interceptor + message content) but when I configure it to be Binary I only receive 1 message. So maybe the problem is with Kafka Consumer Data Format?
... View more
02-14-2019
01:27 AM
So I just tested it with Streamsets, I sent the message 'hmmmm' which Flume separates the interceptor from the message itself and I did receive 2 records: Then I sent another message 'hi' and receive it as 1 record: So there is a problem for sure that I didn't figure out yet. The logs for Flume in both cases are shown below. I don't see anything weird there, what you guys think? Logs for hmmmm 14 Feb 2019 12:20:57,991 INFO [PollableSourceRunner-TaildirSource-source1] (org.apache.flume.source.taildir.ReliableTaildirEventReader.openFile:283) - Opening file: /usr/software/flumData/flumeStressAndKafkaFailureTest.txt, inode: 1275070426, pos: 21
14 Feb 2019 12:21:19,593 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227) - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1
14 Feb 2019 12:21:19,596 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252) - Updating checkpoint metadata: logWriteOrderID: 1550135209575, queueSize: 0, queueHead: 66366
14 Feb 2019 12:21:19,599 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1497 logWriteOrderID: 1550135209575
Logs for hi 14 Feb 2019 12:22:49,600 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:227) - Start checkpoint for /usr/software/flumData/checkpoint/checkpoint, elements to sync = 1
14 Feb 2019 12:22:49,607 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:252) - Updating checkpoint metadata: logWriteOrderID: 1550135209580, queueSize: 0, queueHead: 66366
14 Feb 2019 12:22:49,613 INFO [Log-BackgroundWorker-channel1] (org.apache.flume.channel. file.Log.writeCheckpoint:1052) - Updated checkpoint for file: /usr/software/flumData/data/log-19 position: 1701 logWriteOrderID: 1550135209580
... View more
02-14-2019
12:57 AM
Well, in the production case, I'm using Streamsets Pipeline with a Kafka Consumer element as source to stream. And I receive 2 messages, one is the interceptor and the other one is the message content without the interceptor. I will test it again now to make sure of that. Sometimes I also receive the full message with the interceptor appended to it. But I notice that there are garbage characters between the interceptor and the message which I have to parse using a regex expression. About the Flume side, no I'm not sure if flume actually sends 2 messages, can you tell me how can I check something like that? Is there some parameter in the logs configuration to set to show such a thing?
... View more
02-13-2019
05:30 AM
Hello Everyone, The scenario I'm trying to do is as follows: 1- Flume TAILDIR Source reading from a log file and appending a static interceptor to the beginning of the message. The interceptor consists of the host name and the host IP cause it's required with every log message I receive. 2- Flume Kafka Producer Sink that take those messages from the file and put them in a Kafka topic. The Flume configuration is as follows: tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1
tier1.sources.source1.interceptors=i1
tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###
tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1
tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy
So now I'm testing, I ran a Console Kafka Consumer and I started to write in the source file and I do receive the message with the header appended. Example: I write 'test' in the source file and press Enter then save the file Flume detect the file change, then it sends the new line to Kafka producer. My consumer get the following line: ###HostName###000.00.0.000###test The issue now is that sometimes, the interceptor doesn't work as expected. It's like Flume sends 2 messages, one contains the interceptor and the other one the message content. Example: I write 'hi you' in the source file and press Enter then save the file Flume detect the file change, then it sends the new line to Kafka producer. My consumer get the following 2 line: ###HostName###000.00.0.000### hi you And the terminal scrolls to the the new message content. This case always happen when I type 'hi you' in the text file, and since I read from a log file, then it's not predictable when it happens. Help and support will be much appreciated ^^ Thank you
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka