Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Dynamically add partitions to HCatalogDatasetRepository

Highlighted

Dynamically add partitions to HCatalogDatasetRepository

New Contributor

Hi,

I'm trying to accomplish to add dynamically add partions to a HCatalogDatasetRepository once data are written. I build my solution on top of https://github.com/cloudera/cdk-examples/tree/master/dataset-staging.

Here is my modifed CreateStagedDataset class:
...
// create an Avro dataset to temporarily hold data
fileRepo.create("logs-staging", new DatasetDescriptor.Builder()
.format(Formats.AVRO)
.schemaUri("resource:simple-log.avsc")
.partitionStrategy(new PartitionStrategy.Builder()
.day("timestamp", "day")
.get())
.get());

// Construct an HCatalog dataset repository using managed Hive tables
DatasetRepository hCatalogRepo = new HCatalogDatasetRepository.Builder().get();

// Create a dataset of users with the Avro schema in the repository
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.format(Formats.AVRO)
.schemaUri("resource:simple-log.avsc")
.partitionStrategy(new PartitionStrategy.Builder()
.year("timestamp", "year")
.month("timestamp", "month")
.day("timestamp", "day")
.get()).get();

hCatalogRepo.create("logs", descriptor);
...
That works fine. I also generated sample data with GenerateSimpleLogs evenutally landing at /tmp/data/logs-staging. Now i'm trying to merge all avro files form yesterday to a single avro file and load it into a new partition into the HCatalogDatasetRepository. Here is my modified code of StagingToPersistentSerial:

...
public int run(String[] args) throws Exception {
// open the repository
final DatasetRepository fileRepo = new FileSystemDatasetRepository.Builder()
.rootDirectory(new Path("/tmp/data")).get();

final Calendar now = Calendar.getInstance();
final long yesterdayTimestamp = now.getTimeInMillis() - DAY_IN_MILLIS;
// the destination dataset

DatasetRepository hCatalogRepo = new HCatalogDatasetRepository.Builder().get();
final Dataset persistent = hCatalogRepo.load("logs");
final DatasetWriter<GenericRecord> writer = persistent.newWriter();
writer.open();

// the source dataset: yesterday's partition in the staging area
final Dataset staging = fileRepo.load("logs-staging");
final PartitionKey yesterday = getPartitionKey(staging,yesterdayTimestamp);

final DatasetReader<GenericRecord> reader = staging.getPartition(yesterday, false).newReader();

final GenericRecordBuilder builder = new GenericRecordBuilder(persistent.getDescriptor().getSchema());

try {
reader.open();
// yep, it's that easy.
for (GenericRecord record : reader) {
builder.set("timestamp", record.get("timestamp"));
builder.set("component", record.get("component"));
builder.set("level", record.get("level"));
builder.set("message", record.get("message"));
writer.write(builder.build());
}
} finally {
reader.close();
writer.flush();
}
...
}

After executing this i have the following structrue on hdfs:

/user/hive/warehouse/logs/year=2013/month=10/day=28/1383041224811-12.avro

That's all fine, but Hive does not recognize the newly created partition. I have to add the partition manually, e.g.

ALTER TABLE logs ADD PARTITION (year = '2013', month = '10', day = '28') location '/user/hive/warehouse/logs/year=2013/month=10/day=28';

 

Is there a way to update the metastore directly within the CDK once merged data is been written?

Thanks, jp

1 REPLY 1

Re: Dynamically add partitions to HCatalogDatasetRepository

New Contributor

I just uploaded my code  to github:

 

https://github.com/jkoenig/cdk-examples/tree/dataset-hcatalog/dataset-hcatalog

 

Cheers, jp