Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Create custom processor to convert csv to excel

avatar
Explorer

Hello Everyone 

 

Can I Create custom processor in apache nifi to convert CSVfile to excel file ??

 

is that possible and if yes how ? 

Thank you 

 

2 ACCEPTED SOLUTIONS

avatar
Master Collaborator

hello @sa 

 

Yes you can create a custom processor in Nifi.  

 

You can refer to https://stackoverflow.com/questions/68937735/how-to-convert-csv-to-excel-using-python-with-pandas-in...

 

Thanks,

Azhar

Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

View solution in original post

avatar

Hi @Saraali Thank you for asking a great question! Allow me to expand a bit on the answer posted earlier by @Azhar_Shaikh.

He's correct that you could write a Python script leveraging the Pandas API to programmatically create an MS Excel file, and then call that script in NiFi using ExecuteStreamCommand, although perhaps using ExecuteScript might be a better candidate, depending on how your overall flow is designed and what external software you feel like installing or configuring.

There's a reasonably well-documented set of classes/methods in the Pandas API that would allow you to, once you have the data from your .csv file read in, convert the data to a Pandas DataFrame and then write the DataFrame to an Excel file. If your software development skills are limited to Python, that would be a workable approach.

 

My reading of your question, however, was that you were asking about writing a custom processor, not invoking a script. If you are not limited to Python like the original poster in the above-referenced Stack Overflow thread, you should consider writing a full-on NiFi processor in Java and leverage libraries such as the Apache POI library or The JExcel library. You can use either library to programmatically read, write and modify the content of an Excel spreadsheet from a Java program, but the later library only provides support for processing Excel files in the .xls (1997-2003) format. This approach requires some significant software development skills, because it doesn't involve just Java programming but a certain amount of familiarity with the associated tools, principally Maven. Telling you how to do that would involve a substantial, article-length tutorial. I still recommend Andy LoPresto's conference session from the 2019 DataWorks Summit Conference, Custom Processor Development with Apache NiFi to folks new to NiFi processor development that want to get an overview of what's involved.

 

If you don't have those software development skills or the time to obtain them, I would suggest you engage Professional Services to develop the processor you need. If you're a Cloudera Subscription Support customer, we can connect you with your Account team to discuss your potential project. Let me know if you are interested in this path by using the community's private message functionality to transmit your contact information.

 

This thread will remain open so other community members with greater expertise with custom NiFi processor development can contribute, if they so desire.

 

 

Bill Brooks, Community Moderator
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

View solution in original post

4 REPLIES 4

avatar
Master Collaborator

hello @sa 

 

Yes you can create a custom processor in Nifi.  

 

You can refer to https://stackoverflow.com/questions/68937735/how-to-convert-csv-to-excel-using-python-with-pandas-in...

 

Thanks,

Azhar

Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar

Hi @Saraali Thank you for asking a great question! Allow me to expand a bit on the answer posted earlier by @Azhar_Shaikh.

He's correct that you could write a Python script leveraging the Pandas API to programmatically create an MS Excel file, and then call that script in NiFi using ExecuteStreamCommand, although perhaps using ExecuteScript might be a better candidate, depending on how your overall flow is designed and what external software you feel like installing or configuring.

There's a reasonably well-documented set of classes/methods in the Pandas API that would allow you to, once you have the data from your .csv file read in, convert the data to a Pandas DataFrame and then write the DataFrame to an Excel file. If your software development skills are limited to Python, that would be a workable approach.

 

My reading of your question, however, was that you were asking about writing a custom processor, not invoking a script. If you are not limited to Python like the original poster in the above-referenced Stack Overflow thread, you should consider writing a full-on NiFi processor in Java and leverage libraries such as the Apache POI library or The JExcel library. You can use either library to programmatically read, write and modify the content of an Excel spreadsheet from a Java program, but the later library only provides support for processing Excel files in the .xls (1997-2003) format. This approach requires some significant software development skills, because it doesn't involve just Java programming but a certain amount of familiarity with the associated tools, principally Maven. Telling you how to do that would involve a substantial, article-length tutorial. I still recommend Andy LoPresto's conference session from the 2019 DataWorks Summit Conference, Custom Processor Development with Apache NiFi to folks new to NiFi processor development that want to get an overview of what's involved.

 

If you don't have those software development skills or the time to obtain them, I would suggest you engage Professional Services to develop the processor you need. If you're a Cloudera Subscription Support customer, we can connect you with your Account team to discuss your potential project. Let me know if you are interested in this path by using the community's private message functionality to transmit your contact information.

 

This thread will remain open so other community members with greater expertise with custom NiFi processor development can contribute, if they so desire.

 

 

Bill Brooks, Community Moderator
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

avatar
Community Manager

@Saraali, Has any of the replies helped resolve your issue? If so, can you please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future?  



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar
New Contributor

you can create custom nar file and then put into lib folder of $NIFI_HOME directory and restart your nifi server.

 

Add dependecy in processor module & then write a java code then build and create your nar file.

<dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>4.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>4.1.2</version>
        </dependency>
        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>commons-logging</artifactId>
                    <groupId>commons-logging</groupId>
                </exclusion>
            </exclusions>
        </dependency>
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.anoop.converter;

import com.opencsv.CSVReader;
import com.opencsv.exceptions.CsvValidationException;
import org.apache.nifi.annotation.behavior.*;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@Tags({"csvToExcel"})
@CapabilityDescription("This processor can convert CSV flow files into Excel flow file")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
public class CsvToExcel extends AbstractProcessor {


    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("original")
            .description("The original file")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = Collections.emptyList();

        relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {}

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }

        session.write(flowFile, new Converter());
        session.putAttribute(flowFile,"convertedIntoExcel","true");
        session.transfer(flowFile,REL_SUCCESS);
    }
}

class Converter implements StreamCallback {

    @Override
    public void process(InputStream in, OutputStream out) throws IOException {
        try {
            streamConversion(in,out);
        } catch (CsvValidationException e) {
            throw new RuntimeException(e);
        }
    }

    private void streamConversion(InputStream in, OutputStream out) throws IOException, CsvValidationException {
        CSVReader csvReader = new CSVReader(new InputStreamReader(in));

        XSSFWorkbook workbook = new XSSFWorkbook();
        XSSFSheet sheet = workbook.createSheet("Sheet1");

        String[] rowData = null;
        int rowNum = 0;
        while ((rowData = csvReader.readNext()) != null) {
            Row row = sheet.createRow(rowNum++);
            int colNum = 0;
            for (String cellData : rowData) {
                Cell cell = row.createCell(colNum++);
                cell.setCellValue(cellData);
            }
        }

        workbook.write(out);
        workbook.close();
    }
}