Member since
08-09-2017
5
Posts
0
Kudos Received
0
Solutions
10-06-2017
11:17 PM
First of all, there are a discrepancies between docs and real life, properties names are not same. I have the following configuration, b built according to the docs: # Describe the sink a1.sinks.sink1.channel = channel1 a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink #a1.sinks.sink1.kafka.flumeBatchSize = 20 #a1.sinks.sink1.kafka.producer.acks = 1 #a1.sinks.sink1.kafka.producer.linger.ms = 1 #a1.sinks.sink1.kafka.producer.compression.type = snappy a1.sinks.sink1.kafka.bootstrap.servers = ip-172-30-4-132.ec2.internal:6667 a1.sinks.sink1.kafka.topic = test a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI #a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka #a1.sinks.sink1.brokerList = ip-172-30-4-132.ec2.internal:6667 #a1.sinks.sink1.topic = test #a1.sinks.sink1.producer.security.protocol = SASL_PLAINTEXT #a1.sinks.sink1.producer.sasl.mechanism = GSSAPI #a1.sinks.sink1.producer.sasl.kerberos.service.name = kafka When starting agent, got the following: 17/10/06 19:12:26 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: org.apache.flume.sink.kafka.KafkaSink 17/10/06 19:12:26 WARN kafka.KafkaSink: The Property 'topic' is not set. Using the default topic name: default-flume-topic 17/10/06 19:12:26 INFO kafka.KafkaSinkUtil: context={ parameters:{kafka.producer.security.protocol=SASL_PLAINTEXT, kafka.bootstrap.servers=ip-172-30-4-132.ec2.internal:6667, kafka.producer.sasl.mechanism=GSSAPI, channel=channel1, kafka.topic=test, type=org.apache.flume.sink.kafka.KafkaSink} } 17/10/06 19:12:26 ERROR node.AbstractConfigurationProvider: Sink sink1 has been removed due to an error during configuration org.apache.flume.conf.ConfigurationException: brokerList must contain at least one Kafka broker at org.apache.flume.sink.kafka.KafkaSinkUtil.addDocumentedKafkaProps(KafkaSinkUtil.java:55) Change config to older form, found in old Flume docs: # Describe the sink a1.sinks.sink1.channel = channel1 a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink #a1.sinks.sink1.kafka.flumeBatchSize = 20 #a1.sinks.sink1.kafka.producer.acks = 1 #a1.sinks.sink1.kafka.producer.linger.ms = 1 #a1.sinks.sink1.kafka.producer.compression.type = snappy #a1.sinks.sink1.kafka.bootstrap.servers = ip-172-30-4-132.ec2.internal:6667 #a1.sinks.sink1.kafka.topic = test #a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT #a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI #a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka a1.sinks.sink1.brokerList = ip-172-30-4-132.ec2.internal:6667 a1.sinks.sink1.topic = test a1.sinks.sink1.producer.security.protocol = SASL_PLAINTEXT a1.sinks.sink1.producer.sasl.mechanism = GSSAPI #a1.sinks.sink1.producer.sasl.kerberos.service.name = kafka Got this after the change: 17/10/06 19:13:45 INFO utils.VerifiableProperties: Verifying properties 17/10/06 19:13:45 INFO utils.VerifiableProperties: Property key.serializer.class is overridden to kafka.serializer.StringEncoder 17/10/06 19:13:45 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to ip-172-30-4-132.ec2.internal:6667 17/10/06 19:13:45 INFO utils.VerifiableProperties: Property request.required.acks is overridden to 1 17/10/06 19:13:45 INFO utils.VerifiableProperties: Property serializer.class is overridden to kafka.serializer.DefaultEncoder 17/10/06 19:13:45 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: sink1: Successfully registered new MBean. 17/10/06 19:13:45 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: sink1 started 17/10/06 19:13:45 INFO client.ClientUtils$: Fetching metadata from broker BrokerEndPoint(0,ip-172-30-4-132.ec2.internal,6667) with correlation id 0 for 1 topic(s) Set(test) 17/10/06 19:13:45 INFO producer.SyncProducer: Connected to ip-172-30-4-132.ec2.internal:6667 for producing 17/10/06 19:13:45 INFO producer.SyncProducer: Disconnecting from ip-172-30-4-132.ec2.internal:6667 17/10/06 19:13:45 WARN client.ClientUtils$: Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [BrokerEndPoint(0,ip-172-30-4-132.ec2.internal,6667)] failed java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140) Not even touching Kerberos at all. I need to build the following pipeline: source -> flume -> kafka -> flume -> hbase Last part (flume -> hbase) works, but I can not define Kafka sync correctly. What did I wrong? Thank you for suggestions!
... View more
Labels:
10-04-2017
11:58 PM
Tried to write data to Kerberised HBase by Flume. Setup agent according to Flume docs, created principal and key tab for the user. Kan get ticket from the command line, but Flume permanently gives error: Java config name: null Native config name: /etc/krb5.conf Loaded from native config Java config name: null Native config name: /etc/krb5.conf Loaded from native config >>> KdcAccessibility: reset >>> KdcAccessibility: reset >>> KeyTabInputStream, readName(): CS.INTERSET.COM >>> KeyTabInputStream, readName(): interset >>> KeyTab: load() entry length: 78; type: 18 Looking for keys for: interset@CS.INTERSET.COM Found unsupported keytype (18) for interset@CS.INTERSET.COM 17/10/04 19:44:46 ERROR lifecycle.LifecycleSupervisor: Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@56563488 counterGroup:{ name:null counters:{} } } - Exception follows. org.apache.flume.FlumeException: Failed to login to HBase using provided credentials. at org.apache.flume.sink.hbase.HBaseSink.start(HBaseSink.java:141) at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) at org.apache.flume.SinkRunner.start(SinkRunner.java:79) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Login failure for interset@CS.INTERSET.COM from keytab /tmp/interset.keytab: javax.security.auth.login.LoginException: Unable to obtain password from user Here is my agent configuration: #cat agent.conf a1.sources = source1 a1.sinks = sink1 a1.channels = channel1 # Describe/configure the source a1.sources.source1.type = seq a1.sources.source1.channel = channel1 # Use a channel which buffers events in memory a1.channels.channel1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.source1.channels = channel1 a1.sinks.sink1.channel = channel1 # Describe the sink a1.sinks.sink1.type = hbase a1.sinks.sink1.table = test a1.sinks.sink1.columnFamily = field1 a1.sinks.sink1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer a1.sinks.sink1.kerberosPrincipal = interset@CS.INTERSET.COM a1.sinks.sink1.kerberosKeytab = /tmp/interset.keytab a1.sinks.sink1.zookeeperQuorum = ip-172-30-4-33.ec2.internal:2181 a1.sinks.sink1.znodeParent = /hbase-secure a1.sinks.sink1.channel = channel1 Manual ticket collection works fine: $ kinit -kt /tmp/interset.keytab interset@CS.INTERSET.COM $ klist -e Ticket cache: FILE:/tmp/krb5cc_1001 Default principal: interset@CS.INTERSET.COM Valid starting Expires Service principal 10/04/2017 19:56:24 10/05/2017 19:56:24 krbtgt/CS.INTERSET.COM@CS.INTERSET.COM Etype (skey, tkt): aes256-cts-hmac-sha1-96, aes256-cts-hmac-sha1-96 $ Any idea how to solve this?
... View more
Labels:
08-14-2017
01:41 PM
Looking for best practice and prerequisites/requirements for setting up Ranger in AWS, protecting HDP and EMR as well.
... View more
Labels:
08-13-2017
09:20 PM
For example, if I have data for US and UK customers, I want agents from both countries see the whole list, but not see, say address, for the people not in local country. For example, data ohs this: id name address country 1 John 123 Main St. US 2 Jack 456 Front St. UK I want US agent see this: id name address country 1 John 123 Main St. US 2 Jack XXXXXXXX UK And UK agent see this: id name address country 1 John XXXXXXX US 2 Jack 456 Front St. UK Is there a way to have such setup in Ranger? BlueTalon can do this, do not see the reasons Ranger can not. Thank you! Leonid
... View more
Labels:
08-09-2017
08:01 PM
In such case, if we like to define livy.spark.yarn.queue, should it be same as spark.yarn.queue or it could be different queue? Thank you! Leonid , In such case, can we define livy.spark.yarn.queue to pint to different queue, or it have to be same as spark.yarn.queue?
... View more