Reply
jp
New Contributor
Posts: 3
Registered: ‎08-16-2013

Dynamically add partitions to HCatalogDatasetRepository

Hi,

I'm trying to accomplish to add dynamically add partions to a HCatalogDatasetRepository once data are written. I build my solution on top of https://github.com/cloudera/cdk-examples/tree/master/dataset-staging.

Here is my modifed CreateStagedDataset class:
...
// create an Avro dataset to temporarily hold data
fileRepo.create("logs-staging", new DatasetDescriptor.Builder()
.format(Formats.AVRO)
.schemaUri("resource:simple-log.avsc")
.partitionStrategy(new PartitionStrategy.Builder()
.day("timestamp", "day")
.get())
.get());

// Construct an HCatalog dataset repository using managed Hive tables
DatasetRepository hCatalogRepo = new HCatalogDatasetRepository.Builder().get();

// Create a dataset of users with the Avro schema in the repository
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.format(Formats.AVRO)
.schemaUri("resource:simple-log.avsc")
.partitionStrategy(new PartitionStrategy.Builder()
.year("timestamp", "year")
.month("timestamp", "month")
.day("timestamp", "day")
.get()).get();

hCatalogRepo.create("logs", descriptor);
...
That works fine. I also generated sample data with GenerateSimpleLogs evenutally landing at /tmp/data/logs-staging. Now i'm trying to merge all avro files form yesterday to a single avro file and load it into a new partition into the HCatalogDatasetRepository. Here is my modified code of StagingToPersistentSerial:

...
public int run(String[] args) throws Exception {
// open the repository
final DatasetRepository fileRepo = new FileSystemDatasetRepository.Builder()
.rootDirectory(new Path("/tmp/data")).get();

final Calendar now = Calendar.getInstance();
final long yesterdayTimestamp = now.getTimeInMillis() - DAY_IN_MILLIS;
// the destination dataset

DatasetRepository hCatalogRepo = new HCatalogDatasetRepository.Builder().get();
final Dataset persistent = hCatalogRepo.load("logs");
final DatasetWriter<GenericRecord> writer = persistent.newWriter();
writer.open();

// the source dataset: yesterday's partition in the staging area
final Dataset staging = fileRepo.load("logs-staging");
final PartitionKey yesterday = getPartitionKey(staging,yesterdayTimestamp);

final DatasetReader<GenericRecord> reader = staging.getPartition(yesterday, false).newReader();

final GenericRecordBuilder builder = new GenericRecordBuilder(persistent.getDescriptor().getSchema());

try {
reader.open();
// yep, it's that easy.
for (GenericRecord record : reader) {
builder.set("timestamp", record.get("timestamp"));
builder.set("component", record.get("component"));
builder.set("level", record.get("level"));
builder.set("message", record.get("message"));
writer.write(builder.build());
}
} finally {
reader.close();
writer.flush();
}
...
}

After executing this i have the following structrue on hdfs:

/user/hive/warehouse/logs/year=2013/month=10/day=28/1383041224811-12.avro

That's all fine, but Hive does not recognize the newly created partition. I have to add the partition manually, e.g.

ALTER TABLE logs ADD PARTITION (year = '2013', month = '10', day = '28') location '/user/hive/warehouse/logs/year=2013/month=10/day=28';

 

Is there a way to update the metastore directly within the CDK once merged data is been written?

Thanks, jp

Highlighted
jp
New Contributor
Posts: 3
Registered: ‎08-16-2013

Re: Dynamically add partitions to HCatalogDatasetRepository

I just uploaded my code  to github:

 

https://github.com/jkoenig/cdk-examples/tree/dataset-hcatalog/dataset-hcatalog

 

Cheers, jp