Member since
08-05-2015
50
Posts
14
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
29356 | 04-05-2017 05:51 PM | |
6641 | 04-01-2017 09:50 AM | |
8112 | 08-22-2015 08:08 PM |
03-26-2017
09:12 PM
1 Kudo
Problem Statement: How do I get access to the Producer Record when I encounter an exception from my asynchronous send method returned within the Callback function used? Other Information I understand that the Callback can return a series of retriable and non-retriable exceptions. I also get that the Callback is operating on another thread. It is the notion that the Callback is on another thread that makes me wonder if I try to reference the ProducerRecord message variable, if I am guaranteed to get the same message that aligns to this exception, or if there is a chance that the main thread has continued on and the message is not another value by the time I attempt to reference it via the Callback(). I don't understand enough about Futures and Callbacks to be 100% confident and it's hard to validate when on my machine with multiple threads as well. Example snippet of the method, stripped down significantly: ProducerRecord<String, byte[]> message = null;
protected KafkaProducer<String, byte[]> aProducer = null;
aProducer = createKafkaProducer();
[...]
message = new ProducerRecord<String, byte[]>(producerConfig.getKafkaTopic(), byteString);
send();
[...]
public void send() {
aProducer.send(message, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// How do I find get the original message so that I can do something with it if needed?
throw new KafkaException("Asynchronous send failure: ", exception);
} else {
//NoOp
}
}
} Is it as simple as referencing the ProducerRecord message variable or do I have to approach it another way to get access to the message so that I can do something with it should I need to? appreciated!
... View more
Labels:
- Labels:
-
Apache Kafka
08-22-2016
05:43 PM
yes, thanks for the reply! I figured out the same thing earlier today as I went back to the Flume User Guide and started copying and pasting the properties in again... When I reviewed my config initiall, i didn't look before the attribute name to even see I was missing "hdfs". Definitely an ID10T and PEBKAC error. 🙂 Thanks for keeping me honest!
... View more
08-19-2016
09:53 PM
Problem: When ingesting avro event data from Kafka, the HDFS Sink keeps rolling files when they are very small (hundreds of bytes), despite my Flume configuration. I have made the proper configuration settings I believe, and I'm at a bit of a loss. Flume Config: a1.channels = ch-1
a1.sources = src-1
a1.sinks = snk-1
a1.sources.src-1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.src-1.channels = ch-1
a1.sources.src-1.zookeeperConnect = <OMITTED>
a1.sources.src-1.topic = aTopic
a1.sources.src-1.groupID = aTopic
#Inject the Schema into the header so the AvroEventSerializer can pick it up
a1.sources.src-1.interceptors=i1
a1.sources.src-1.interceptors.i1.type = static
a1.sources.src-1.interceptors.i1.key=flume.avro.schema.url
a1.sources.src-1.interceptors.i1.value=hdfs://aNameService/data/schema/simpleSchema.avsc
a1.channels.ch-1.type = memory
a1.sinks.snk-1.type = hdfs
a1.sinks.snk-1.channel = ch-1
a1.sinks.snk-1.hdfs.path = /data/table
a1.sinks.snk-1.hdfs.filePrefix = events
a1.sinks.snk-1.hdfs.fileSuffix = .avro
a1.sinks.snk-1.hdfs.rollInterval = 0
#Expecting 100MB files before rolling
a1.sinks.snk-1.hdfs.rollSize = 100000000
a1.sinks.snk-1.rollCount = 0
a1.sinks.snk-1.hdfs.batchSize = 1000
a1.sinks.snk-1.hdfs.fileType = DataStream
a1.sinks.snk-1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
I'll also note that I tried adding other configuration settings that didn't help and I omitted any of them from this config to improve clarity. I also saw that the resolution for some people was to check the replication factor as that is a determining factor in the BucketWriter - I am receiving no errors in the logs relating to under replication. Lastly, I am executing this from the command line and not through Cloudera Manager. Thanks for any help
... View more
Labels:
- Labels:
-
Apache Flume
-
Apache Kafka
-
Cloudera Manager
-
HDFS
04-03-2016
04:53 PM
It's not clear to me what is going on. What I recommend if no others have solutions, is to simplify the scenario and start eliminating variables. Some additional questions I have that may help: I'm assuming this sqoop import works outside of Oozie? Just curious, why do you have the sqoop command in <args> and not <command>? I don't see a free-form query so <args> isn't needed. (just curious) 🙂 Do other action types work fine? Have you tried using Hue to create a workflow with the sqoop action, and see if that works? Hue will generate the workflow and job.properties for you, and that might give you something to compare to in order to focus in on where the problem lies. Have you tried connecting to a different database to see if that has problems as well? (thinking in text here) Because you can reproduce the problem in two environments, I'm thinking it's more than likely a problem with the workflow itself, and less of a problem with the infrastructure management, unless both environments are mirrors of each other in Cloudera Manager or something.
... View more
03-31-2016
05:03 PM
The config-default.xml has "host.domain.com" because you wanted to generalize it, right? I'm assuming you've tried localhost with the proper port in your job_tracker and name_node values?
... View more
03-31-2016
03:20 PM
Just to check, shouldn't the username and password have double-hyphens in the Sqoop args or does it not matter? Just want to eliminate any confounding variables 🙂
... View more
03-31-2016
02:49 PM
2 Kudos
It took me a few moments, but it looks like you are doing something that I also ran into problems with initially, which is to execute a hive command through a shell action. I never found a resolve for this and thought that pursuing that course was going a bit against the purpose of the actions themselves (shell versus hive). I think eventually you could perform some hacks to get it working, it may not be a preferred path. The approach I took to resolve this was to adhere to the practices that Oozie lays out for me, instead of trying to bend it to the way I wanted to do it, which was to create a cool wrapper script that would dynamically execute whatever I passed in. I'm getting a bit off topic on this ,but if you want to do dynamic stuff like that, it might be better to dynamically create the hql out in a file, then dynamically create the workflow content of an already existing statically-called workflow next (again... I'm digressing so I'll stop) Try using a Hive action and create a separate hiveQL file with the SQL that you are wanting to perform. Otherwise, it looks like you were hitting on all of the right things to include, like the hive-site.xml, credentials, etc. <global>
<job-xml>${wfJobSite}</job-xml>
</global>
[...]
<action name="hive-action" cred="hcat">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<script>/directory-path/example.hql</script>
</hive>
<ok to="End"/>
<error to="Kill"/>
</action>
[...] I don't know for sure what is happening behind the scenes when Oozie executes a shell script and you try to run a Hive command, but my hypothesis is that when the script executes out on a data node, it loses the context of the credentials and details you pass in, and unless you expressly leverage those somehow in your script, it results in the script not executing hive actions properly. A friend of mine was able to get a script to execute sqoop but only after he copied JARs/files out to every data node. I don't recommend trying to bend Hadoop to your will like that, but instead try to leverage the tools as they were designed and perhaps commit to the tools to eventually bend them to your will. 🙂 [just my thoughts]
... View more
03-30-2016
04:54 AM
1 Kudo
Good catch! I did notice that in an example I had done, I didn't have the memory configurations even set and things worked fine. I do recall those memory settings being a bit guess and check for me. Something I should probably work on. 😉
... View more
03-29-2016
12:05 PM
1 Kudo
sorry... I blame Internet Explorer for not showing me the entirety of your morphline... What you have looks okay to me. I don't know that you nee the sanitizeUnknownSolrFields like I put in either. Your SOLR collection is created AND activated on zookeeper right?
... View more
03-29-2016
11:56 AM
I'm a bit hazy on the topic, but your Morphline file looks a bit light to me. I believe you should have some additional pieces beyond the SOLR_LOCATOR piece like follows: morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**", "org.apache.solr.**"]
commands : [
{ %function here, like readCSV or something % }
...
{ sanitizeUnknownSolrFields { solrLocator : ${SOLR_LOCATOR} } }
{ loadSolr {solrLocator : ${SOLR_LOCATOR} } }
]
}
]
... View more
- « Previous
-
- 1
- 2
- Next »