1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1932 | 04-03-2024 06:39 AM | |
| 3028 | 01-12-2024 08:19 AM | |
| 1660 | 12-07-2023 01:49 PM | |
| 2431 | 08-02-2023 07:30 AM | |
| 3382 | 03-29-2023 01:22 PM |
12-30-2016
03:23 AM
can you copy that file elsewhere and then delete it. You can rebuild the parquet directory? did you run fsck on that directory? https://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#fsck see here: https://dzone.com/articles/hdfs-cheat-sheet I wonder if it's because you are trying to read files and not the entire directory. can you read the entire directory (or copy to another directory) /data/tempparquetdata https://issues.apache.org/jira/browse/SPARK-3138
... View more
12-29-2016
08:33 PM
is hbase running? did you configure kylin common.KylinConfig:197 : KYLIN_CONF property was not set, will seek KYLIN_HOME env variable
https://github.com/KylinOLAP/Kylin/issues/12
Export KYLIN_HOME pointing to the extracted Kylin folder Make sure the user has the privilege to run hadoop, hive and hbase cmd in shell. If you are not so sure, you can run bin/check-env.sh, it will print out the detail information if you have some environment issues. see: http://kylin.apache.org/docs16/install/hadoop_env.html
... View more
12-29-2016
08:29 PM
You have to map the port for visibility outside of sandbox. is there enough ram on your sandbox for Kylin?
... View more
12-29-2016
08:17 PM
See this working example of a Kafka Producer package com.dataflowdeveloper.kafka
import java.io.ByteArrayOutputStream
import java.util.HashMap
import com.google.gson.Gson
import org.apache.avro.SchemaBuilder
import org.apache.avro.io.EncoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.io.{IOException, File, ByteArrayOutputStream}
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.SchemaBuilder
import org.apache.avro.Schema
/**
* Created by timothyspann on 4/4/16.
*/
object TwitterKafkaProducer {
private var gson = new Gson()
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
val logger: Logger = Logger.getLogger("com.dataflowdeveloper.kafka.KafkaSimulator")
// build a Tweet schema
val schema = SchemaBuilder
.record("tweet")
.fields
.name("tweet").`type`().stringType().noDefault()
.name("timestamp").`type`().longType().noDefault()
.endRecord
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = Array("hadoop", "hortonworks", "#hadoop", "#bigdata", "#spark", "#hortonworks", "#HDP")
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("Spark Streaming Twitter to Avro to Kafka Producer")
sparkConf.set("spark.cores.max", "24")
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
sparkConf.set("spark.sql.tungsten.enabled", "true")
sparkConf.set("spark.eventLog.enabled", "true")
sparkConf.set("spark.app.id", "TwitterKafkaProducer")
sparkConf.set("spark.io.compression.codec", "snappy")
sparkConf.set("spark.rdd.compress", "true")
sparkConf.set("spark.streaming.backpressure.enabled", "true")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters).map(gson.toJson(_))
try {
stream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerip:6667")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, Array[Byte]](props)
val topList = rdd.collect()
topList.foreach(a => {
val tweets = serializeTwitter(schema, new GenericRecordBuilder(schema)
.set("tweet", a)
.set("timestamp", time.milliseconds)
.build)
val message = new ProducerRecord[String, Array[Byte]]("meetup", null, tweets)
producer.send(message)
println("Sent %s".format(a.substring(0, 512)))
})
}
})
} catch {
case e: Exception =>
println("Twitter. Writing files after job. Exception:" + e.getMessage);
e.printStackTrace();
}
ssc.start()
ssc.awaitTermination()
}
// convert to avro byte array
def serializeTwitter(schema: Schema, tweet: GenericRecord): Array[Byte] = {
val out = new ByteArrayOutputStream()
try {
val encoder = EncoderFactory.get.binaryEncoder(out, null)
val writer = new GenericDatumWriter[GenericRecord](schema)
writer.write(tweet, encoder)
encoder.flush
out.close
} catch {
case e: Exception => None;
}
out.toByteArray
}
}
// scalastyle:on println
... View more
12-29-2016
08:04 PM
Turn off Filter Pushdown https://issues.apache.org/jira/browse/SPARK-11153 Can you read those files with anything? If so, I would write another copy in HDFS as ORC. If the file is too corrupt it is lost Error recovery If the file metadata is corrupt, the file is lost. If the column metdata is corrupt, that column chunk is lost (but column chunks for this column in other row groups are okay). If a page header is corrupt, the remaining pages in that chunk are lost. If the data within a page is corrupt, that page is lost. The file will be more resilient to corruption with smaller row groups. Potential extension: With smaller row groups, the biggest issue is placing the file metadata at the end. If an error happens while writing the file metadata, all the data written will be unreadable. This can be fixed by writing the file metadata every Nth row group.
Each file metadata would be cumulative and include all the row groups written so far. Combining this with the strategy used for rc or avro files using sync markers, a reader could recover partially written files. https://parquet.apache.org/documentation/latest/
... View more
12-29-2016
06:42 PM
see: https://community.hortonworks.com/questions/75269/hive-do-we-have-checksum-in-hive.html#answer-75287
... View more
12-29-2016
06:41 PM
You can use md5 on each column https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF string md5(string/binary) Calculates an MD5 128-bit checksum for the string or binary (as of Hive 1.3.0). The value is returned as a string of 32 hex digits, or NULL if the argument was NULL. Example: md5('ABC') = '902fbdd2b1df0c4f70b4a5d23525e932'.
... View more
12-28-2016
10:29 PM
Found an ancient install of HDP client. yum remove hdp-select-2.5.0.0-1245.el6.noarch Removed all old data
... View more
12-28-2016
10:27 PM
[root@tspanndev13 yum.repos.d]# yum install nifi
Loaded plugins: fastestmirror
Repository HDP-UTILS-1.1.0.21 is listed more than once in the configuration
Loading mirror speeds from cached hostfile
* base: centos.mirror.ndchost.com
* epel: mirror.sfo12.us.leaseweb.net
* extras: mirrors.sonic.net
* updates: mirrors.kernel.org
Resolving Dependencies
--> Running transaction check
---> Package nifi.noarch 0:1.1.0.2.1.1.0-2.el6 will be installed
--> Processing Dependency: nifi_2_1_1_0_2 for package: nifi-1.1.0.2.1.1.0-2.el6.noarch
--> Running transaction check
---> Package nifi_2_1_1_0_2.x86_64 0:1.1.0.2.1.1.0-2.el6 will be installed
--> Processing Dependency: hdf-select >= 2.1.1.0-2 for package: nifi_2_1_1_0_2-1.1.0.2.1.1.0-2.el6.x86_64
--> Running transaction check
---> Package hdf-select.noarch 0:2.1.1.0-2.el6 will be installed
--> Finished Dependency Resolution
Dependencies Resolved
============================================================================================================================================================================================================
Package Arch Version Repository Size
============================================================================================================================================================================================================
Installing:
nifi noarch 1.1.0.2.1.1.0-2.el6 HDF-2.1.1.0 2.5 k
Installing for dependencies:
hdf-select noarch 2.1.1.0-2.el6 HDF-2.1.1.0 10 k
nifi_2_1_1_0_2 x86_64 1.1.0.2.1.1.0-2.el6 HDF-2.1.1.0 835 M
Transaction Summary
============================================================================================================================================================================================================
Install 1 Package (+2 Dependent packages)
Total download size: 835 M
Installed size: 841 M
Is this ok [y/d/N]: y
Downloading packages:
(1/3): hdf-select-2.1.1.0-2.el6.noarch.rpm | 10 kB 00:00:00
(2/3): nifi-1.1.0.2.1.1.0-2.el6.noarch.rpm | 2.5 kB 00:00:00
(3/3): nifi_2_1_1_0_2-1.1.0.2.1.1.0-2.el6.x86_64.rpm | 835 MB 00:00:43
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Total 19 MB/s | 835 MB 00:00:43
Running transaction check
Running transaction test
Transaction check error:
file /usr/bin/conf-select from install of hdf-select-2.1.1.0-2.el6.noarch conflicts with file from package hdp-select-2.5.0.0-1245.el6.noarch
Error Summary
-------------
... View more
Labels:
- Labels:
-
Apache NiFi