Member since
07-21-2021
542
Posts
13
Kudos Received
17
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2555 | 05-06-2022 11:10 AM | |
3508 | 04-12-2022 11:59 PM | |
3117 | 03-17-2022 09:57 AM | |
1765 | 03-17-2022 09:54 AM | |
2422 | 03-14-2022 08:49 AM |
01-20-2023
09:38 PM
you can create custom nar file and then put into lib folder of $NIFI_HOME directory and restart your nifi server. Add dependecy in processor module & then write a java code then build and create your nar file. <dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>4.1.2</version>
</dependency>
<dependency>
<groupId>com.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>5.1</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency> /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.anoop.converter;
import com.opencsv.CSVReader;
import com.opencsv.exceptions.CsvValidationException;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Tags({"csvToExcel"})
@CapabilityDescription("This processor can convert CSV flow files into Excel flow file")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class CsvToExcel extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("original")
.description("The original file")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = Collections.emptyList();
relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
session.write(flowFile, new Converter());
session.putAttribute(flowFile,"convertedIntoExcel","true");
session.transfer(flowFile,REL_SUCCESS);
}
}
class Converter implements StreamCallback {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try {
streamConversion(in,out);
} catch (CsvValidationException e) {
throw new RuntimeException(e);
}
}
private void streamConversion(InputStream in, OutputStream out) throws IOException, CsvValidationException {
CSVReader csvReader = new CSVReader(new InputStreamReader(in));
XSSFWorkbook workbook = new XSSFWorkbook();
XSSFSheet sheet = workbook.createSheet("Sheet1");
String[] rowData = null;
int rowNum = 0;
while ((rowData = csvReader.readNext()) != null) {
Row row = sheet.createRow(rowNum++);
int colNum = 0;
for (String cellData : rowData) {
Cell cell = row.createCell(colNum++);
cell.setCellValue(cellData);
}
}
workbook.write(out);
workbook.close();
}
}
... View more
10-28-2022
01:06 PM
@D5ha Not all processors write to the content repository nor is content of a FlowFile ever modified in the content after it is created. Once a FlowFile is created in NiFi it exists as is until terminated. A NiFi FlowFile consists of two parts, FlowFile Attributes (metatadata about the FlowFile which includes details about the FlowFIle's content location in the content_repository) and the FlowFile content itself. When a downstream processor modifies the content of a FlowFile, what is really happening is a new content is written to a new content claim in the content_repository, the original content still remains unchanged. From what you shared, you appear to have just one content_repository. Within that single content_repository, NiFi creates a bunch of sub-directories. NiFi does this because of the massive number of content claims a user's dataflow(s) may hold for better indexing and seeking. What is very important to also understand is that a content claim in the content_repository can hold the content for 1 or more FlowFiles. It is not always one content claim per FlowFiles content. It is also very possible to have multiple queued FlowFiles pointing to the exact same content claim and offset (same exact content). This happens when you dataflow clones a FlowFile (for example routing same outbound relationship from a processor multiple times). So you should never manually delete claims from any content repository as you may delete content for multiple FlowFiles. That being said, you can use data provenance to locate the content_repository (container), subdirectory (section), Content Claim filename(Identifier), Content offset byte where content begins in that claim (Offset), and number of bytes from offset to end of content in the claim (Size). Right click on a processor and select "view data provenance" from displayed context menu: This will list all FlowFiles for which provenance still holds index data on that were processed by this processor: Click the Show Lineage icon (looks like 3 connected circles) to the far right of a FlowFile. You can right click on "clone" and "join" events to find/expand any parent flowfiles in the lineage (the event dot created for the processor on which you said show provenance will be colored red in the lineage graph): Each white circle is a different FlowFile. clicking on a white circle will highlight dataflow path for that FlowFile. Right clicking on an event like "create" and selecting "view details" will tell you all about what is known about that FlowFile (this includes a tab about the "content"): Container corresponds to the following property in the nifi.properties file: nifi.content.repository.directory.default= Section corresponds to subdirectory within the above content repository path. Identifier is the content claim filename. Offset is the byte on which content for this FlowFile begins within that identifier. Size is number of bytes of you reach end of content for that FlowFile's content in the Identifier. I also created an article on how to index the Content Identifier. Indexing a field allows you to locate a content claim and the search for it in your data provenance to find all FlowFile(s) that pointed at it. You can then look view the details of all those FlowFile(s) to see full content calim details as above: https://community.cloudera.com/t5/Community-Articles/How-to-determine-which-FlowFiles-are-associated-to-the-same/ta-p/249185 If you found that the provided solution(s) assisted you with your query, please take a moment to login and click Accept as Solution below each response that helped. Thank you, Matt
... View more
10-26-2022
11:19 PM
Thanks for your response. I tried to call the sales to purchase an individual license, they do not sell an individual license. Is there anyway to install an open source Azure quickstart ? https://docs.cloudera.com/cdp-public-cloud/cloud/azure-quickstart/topics/mc-azure-quickstart.html#mc-azure-quickstart I finally installed a docker version of Cloudera QuickStart . however the File Browsing is missing from HDFS.
... View more
05-17-2022
01:15 AM
The linked thread is a walkthrough on how to secure a NiFi Registry instance locally. I’m looking for instructions on how to connect to a secure NiFi Registry deployed on CDP Data Hub. I’m running on AWS infrastructure. The Data Hub is deployed using default settings and resides in a private subnet.
... View more
04-13-2022
12:05 AM
Hello Please refer to https://community.cloudera.com/t5/Community-Articles/Using-RStudio-as-an-Editor-with-ML-Runtimes/ta-p/325166 Was your question answered on cloudera community portal ? Make sure to mark the answer as the accepted solution. If you find a reply useful, say thanks by clicking on the thumbs up button.
... View more
03-17-2022
11:54 AM
Hello @Koffi The balancer will do the job for you, please refer to the below Official docs before configuring it. 1- Overview of the HDFS Balancer 2- Configuring the Balancer Was your question answered? Make sure to mark the answer as the accepted solution. If you find a reply useful, say thanks by clicking on the thumbs up button.
... View more
03-17-2022
09:54 AM
Hello @Soa
Hive partition divides the table into a number of partitions and these partitions can be further subdivided into more manageable parts known as Buckets or Clusters. The Bucketing concept is based on Hash function, which depends on the type of the bucketing column. Records which are bucketed by the same column will always be saved in the same bucket.
The Bucketing concept is based on Hash function, which depends on the type of the bucketing column. Records which are bucketed by the same column will always be saved in the same bucket. Here, CLUSTERED BY clause is used to divide the table into buckets. each partition will be created as a directory. But in Hive Buckets, each bucket will be created as a file. Bucketing can also be done even without partitioning on Hive tables.
Bucketed tables allow much more efficient sampling than the non-bucketed tables. Allowing queries on a section of data for testing and debugging purpose when the original data sets are very huge. Here, the user can fix the size of buckets according to the need. This concept also provides the flexibility to keep the records in each bucket to be sorted by one or more columns. Since the data files are equal sized parts, map-side joins will be faster on the bucketed tables.
Was your question answered? Make sure to mark the answer as the accepted solution. If you find a reply useful, say thanks by clicking on the thumbs up button.
... View more
03-15-2022
08:41 AM
Hello @Azhar_Shaikh Thanks for the reply, as it turns out it wasn't a service account problem. We found that the ListS3's output included a 'key' field, and this is what was required in The FetchS3Object processor for 'Object Key'. So the fix I applied was to split the json into individual records (SplitJson), then pull the keys out as attributes (EvaluateJsonPath) then input ${key} into the FetchS3 processor.. worked a treat.
... View more
03-14-2022
08:49 AM
@RajeshReddy for tag based policies you can refer to https://docs.cloudera.com/runtime/7.2.10/security-ranger-authorization/topics/security-ranger-tag-based-policies.html Was your question answered? Make sure to mark the answer as the accepted solution. If you find a reply useful, say thanks by clicking on the thumbs up button.
... View more