Member since
06-05-2015
9
Posts
0
Kudos Received
0
Solutions
10-21-2019
03:32 PM
Spark Version: 2.4 Spark Streaming kafka: spark-streaming-kafka-0-10_2.11 Observe the throw new Exception(); in while loop. Even if the batch fails because of the exception, i see the offset committed, I am expecting some lag here as the processing failed, what is wrong here ? enable.auto.commit : false JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
... View more
Labels:
08-23-2017
02:07 PM
Sorry for the delayed reply. Unfortunately, there is not enough info in that log snippet to determine why the agent installation failed. Do you know whether the agent installation failed on the CM instance or one of the cluster instances? If the former, look earlier in the Director log to see if there are any additional details about the specific failure. If the latter, also look in the CM log.
... View more
08-07-2017
01:45 PM
I am currently using CDH 5.8 in my env. I have to test the compatibility of my project jars before moving to CDH 5.9. To check the compatibility, I wanted to download quickstart VM 5.9 and get my projects working there. It would be great if someone can share 5.9 quickstart vm archived link.
... View more
Labels:
07-17-2017
07:32 AM
Reading directories recursively isn't supported by Impala, only Hive. There is a closed JIRA asking for this to be added. https://issues.apache.org/jira/browse/IMPALA-1944#
... View more
06-07-2017
03:39 PM
I have flume sinks that access the database and filesystems. The connection parameters of these system change as I move from on env to other. Current Implementation: Provide configuration file at flume start up using the flume configuration. Java Configuration Options for Flume Agent
-Dapp.config.file=hdfs://.../config.json But I don't think its a good HDFS application. Is there a way we can provide custom environment configuration at flume source channel sink configuration itself. I know we can provide above config at a sink level, but is there a way to provide it agent level and get the data in MycustomSink.java. Please let me know if a better approach is available
... View more
04-26-2017
11:13 AM
Thanks for the response.
... View more
06-08-2015
02:41 PM
Thankyou for you help 🙂 I even tried copying the phoenix-4.3.0-server.jar to /usr/lib/hbase/lib/ I restarted the Hbase Cluster. But now my region server is not coming up at all.
... View more