Member since
05-27-2019
2
Posts
0
Kudos Received
0
Solutions
05-27-2019
08:47 AM
In a mapper job, we can do the following as well. Basically it follows the same pattern of CF renaming. public class RowKeyRenameImporter extends TableMapper<ImmutableBytesWritable, Mutation> {
private static final Log LOG = LogFactory.getLog(RowKeyRenameImporter.class);
public final static String WAL_DURABILITY = "import.wal.durability";
public final static String ROWKEY_RENAME_IMPL = "row.key.rename";
private List<UUID> clusterIds;
private Durability durability;
private RowKeyRename rowkeyRenameImpl;
/**
* @param row The current table row key.
* @param value The columns.
* @param context The current context.
* @throws IOException When something is broken with the data.
*/
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException {
try {
writeResult(row, value, context);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void writeResult(ImmutableBytesWritable key, Result result, Context context)
throws IOException, InterruptedException {
Put put = null;
if (LOG.isTraceEnabled()) {
LOG.trace("Considering the row." + Bytes.toString(key.get(), key.getOffset(), key.getLength()));
}
processKV(key, result, context, put);
}
protected void processKV(ImmutableBytesWritable key, Result result, Context context, Put put)
throws IOException, InterruptedException {
LOG.info("Renaming the row " + key.toString());
ImmutableBytesWritable renameRowKey = rowkeyRenameImpl.rowKeyRename(key);
for (Cell kv : result.rawCells()) {
if (put == null) {
put = new Put(renameRowKey.get());
}
Cell renamedKV = convertKv(kv, renameRowKey);
addPutToKv(put, renamedKV);
if (put != null) {
if (durability != null) {
put.setDurability(durability);
}
put.setClusterIds(clusterIds);
context.write(key, put);
}
}
}
// helper: create a new KeyValue based on renaming of row Key
private static Cell convertKv(Cell kv, ImmutableBytesWritable renameRowKey) {
byte[] newCfName = CellUtil.cloneFamily(kv);
kv = new KeyValue(renameRowKey.get(), // row buffer
renameRowKey.getOffset(), // row offset
renameRowKey.getLength(), // row length
newCfName, // CF buffer
0, // CF offset
kv.getFamilyLength(), // CF length
kv.getQualifierArray(), // qualifier buffer
kv.getQualifierOffset(), // qualifier offset
kv.getQualifierLength(), // qualifier length
kv.getTimestamp(), // timestamp
KeyValue.Type.codeToType(kv.getTypeByte()), // KV Type
kv.getValueArray(), // value buffer
kv.getValueOffset(), // value offset
kv.getValueLength()); // value length
return kv;
}
protected void addPutToKv(Put put, Cell kv) throws IOException {
put.add(kv);
}
... View more