Member since
09-30-2021
3
Posts
0
Kudos Received
0
Solutions
03-25-2022
03:04 AM
I have a PySpark application that has local modules next to the application Python script, something like this: .
├── foobar
│ ├── config.py
│ ├── foobar.py
│ └── __init__.py
├── application.DEV.ini
├── application.PROD.ini
├── application.py
├── requirements.txt
└── submit-application.sh I am trying to use an Oozie workflow to package all script and local module files, but apparently, they are always delivered flattened, dumped into the root directory of the container, regardless any configuration I used. This prevents the Python script from loading the local modules, causing ModuleNotFoundError: No module named 'foobar' errors, because files from the sub-directories are placed in the root of the container. It seems that the # notation does not work with sub-directories. This is my Oozie workflow.xml file: <workflow-app name= "Data-Extraction-WF" xmlns= "uri:oozie:workflow:0.5" >
< global >
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</ global >
<start to= "Data-Extraction" />
<action name= "Data-Extraction" >
<shell xmlns= "uri:oozie:shell-action:1.0" >
< exec >submit-application.sh</ exec >
<file>app/__init__.py #app/__init__.py</file>
<file>app/config.py #app/config.py</file>
<file>app/foobar.py #app/foobar.py</file>
<file>application.DEV.ini #application.DEV.ini</file>
<file>application.PROD.ini #application.PROD.ini</file>
<file>application.py #application.py</file>
<file>submit-application.sh #submit-application.sh</file>
<capture-output/>
</shell>
<ok to= "success" />
<error to= "failure" />
</action>
<kill name= "failure" >
<message>Workflow failed, error message: [${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name= "success" />
</workflow-app> Is there any way to tell Oozie to place file artifacts to a sub-directory? How could I upload all my Python project files in the original structure so that it works with Oozie? I am on CDH 6, and I cannot find any documentation on this.
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Oozie
09-30-2021
08:36 AM
We are experiencing a strange issue, where Avro files created by Flume out of the input JSON records received via Kafka appear to be corrupted when landed to HDFS via the HDFS Sink. We are on the following platform: Version: Cloudera Enterprise 6.3.4 (#6763501 built by jenkins on 20201028-1951 git: d5d1c716a1d40105495ed484bd6e49813617be03) Java VM Name: Java HotSpot(TM) 64-Bit Server VM Java Version: 1.8.0_181 I stripped down our data model to a bare minimum, which consistently exposes the same behavior. This is our sample JSON message: {"idEnrichedEvent":"ogtpcdwkgk"} This is our Avro schema -- we expect one field "idEnrichedEvent" to be present for the sample. {
"type": "record",
"name": "foobar",
"fields": [
{
"name": "idEnrichedEvent",
"type": [
"null",
"string"
]
}
]
} This is how our Flume configuration looks like: foobar.sources = kafka-source
foobar.channels = hdfs-channel
foobar.sinks = hdfs-sink
foobar.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
foobar.sources.kafka-source.kafka.bootstrap.servers = XXX, XXX
foobar.sources.kafka-source.consumer.group.id = XXX
foobar.sources.kafka-source.topic = XXX
foobar.sources.kafka-source.batchSize = 100
foobar.sources.kafka-source.channels = hdfs-channel
foobar.sources.kafka-source.kafka.consumer.security.protocol = SSL
foobar.sources.kafka-source.kafka.consumer.ssl.truststore.location=XXX
foobar.sources.kafka-source.kafka.consumer.ssl.truststore.password=changeit
foobar.sources.kafka-source.kafka.consumer.ssl.keystore.location=XXX
foobar.sources.kafka-source.kafka.consumer.ssl.keystore.password=XXX
foobar.sources.kafka-source.kafka.consumer.ssl.key.password=XXX
foobar.sources.kafka-source.interceptors = hostNameInterceptor
foobar.sources.kafka-source.interceptors.hostNameInterceptor.type = host
foobar.sources.kafka-source.interceptors.hostNameInterceptor.preserveExisting = false
foobar.sources.kafka-source.interceptors.hostNameInterceptor.hostHeader = hostname
foobar.sinks.hdfs-sink.channel = hdfs-channel
foobar.sinks.hdfs-sink.type = hdfs
foobar.sinks.hdfs-sink.hdfs.fileType = DataStream
foobar.sinks.hdfs-sink.hdfs.fileSuffix = .avro
foobar.sinks.hdfs-sink.hdfs.writeFormat = Text
foobar.sinks.hdfs-sink.hdfs.path = hdfs://nameservice1/XXX/foobar/t_period_cd=%Y-%m-%d
foobar.sinks.hdfs-sink.hdfs.filePrefix = topic_data-%{hostname}
foobar.sinks.hdfs-sink.hdfs.inUsePrefix = ._
foobar.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
foobar.sinks.hdfs-sink.hdfs.kerberosPrincipal = XXX@XXX
foobar.sinks.hdfs-sink.hdfs.kerberosKeytab = /var/lib/flume-ng/XXX.keytab
foobar.sinks.hdfs-sink.hdfs.idleTimeout=600
foobar.sinks.hdfs-sink.hdfs.rollSize = 128000000
foobar.sinks.hdfs-sink.hdfs.rollCount = 1000
foobar.sinks.hdfs-sink.hdfs.rollInterval = 3600
foobar.sinks.hdfs-sink.hdfs.minBlockReplicas = 1
foobar.sinks.hdfs-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
foobar.sinks.hdfs-sink.serializer.compressionCodec=snappy
foobar.sinks.hdfs-sink.serializer.schemaURL = hdfs://nameservice1/XXX/avro/foobar.avsc
foobar.channels.hdfs-channel.capacity = 1000
foobar.channels.hdfs-channel.transactionCapacity = 1000
foobar.channels.hdfs-channel.type = memory While avro-tools correctly shows the desired schema like this: yarn jar /opt/cloudera/parcels/CDH/jars/avro-tools-1.8.2-cdh6.3.4.jar getmeta XXXX/foobar/t_period_cd=2021-09-30/topic_data-XX.XX.XXX.XX.XXXXXXXXXXXXX.avro
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
avro.schema {"type":"record","name":"foobar","fields":[{"name":"idEnrichedEvent","type":["null","string"]}]}
avro.codec snappy it fails with the following error when trying to print the data. yarn jar /opt/cloudera/parcels/CDH/jars/avro-tools-1.8.2-cdh6.3.4.jar tojson XXXX/foobar/t_period_cd=2021-09-30/topic_data-XX.XX.XXX.XX.XXXXXXXXXXXXX.avro
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -62
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:436)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:220)
at org.apache.avro.tool.DataFileReadTool.run(DataFileReadTool.java:76)
at org.apache.avro.tool.Main.run(Main.java:87)
at org.apache.avro.tool.Main.main(Main.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:313)
at org.apache.hadoop.util.RunJar.main(RunJar.java:227) Can someone point me to the right direction? Am I missing something here with the configuration or is this a bug?
... View more
Labels:
- Labels:
-
Apache Flume