Member since
11-27-2019
4
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1947 | 11-28-2019 04:29 AM |
01-31-2021
09:27 AM
I have data that comes in records as CSV Format. Sample Data : a,b,c,d,e
1,2,3,4,5
1,2,3,4,5
1,2,3,4,5 I want to read the record. then add 3-6 columns based on some conditions and process the data. then export the data in CSV format. a,a1,b,b1,c,c1,d,d1,e,e2
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55
1,11,2,22,3,33,4,44,5,55 I can use processors but I need to build a new processor. I don't know how to read the records and process them. I am trying to write some code but I failed. public class readcsv extends AbstractProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.description("Specifies the Controller Service to use for writing out the records")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger())) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Failed to parse incoming data", e);
}
}
});
}
}
... View more
Labels:
- Labels:
-
Apache NiFi
01-13-2021
06:53 AM
Hi, I am trying to connect to the Clickhouse database. I can't fix this error. NIFI 1.12.1 StandardControllerServiceNode[service=DBCPConnectionPool[id=f73ad0c3-0176-1000-0546-94e4c6c6fe9b], versionedComponentId=null, processGroup=StandardProcessGroup[identifier=f73973ae-0176-1000-a421-ff32b5b5cb70,name=NiFi Flow], active=true] Failed to invoke @OnEnabled method due to java.lang.NoClassDefFoundError: Could not initialize class ru.yandex.clickhouse.ClickHouseDriver: Could not initialize class ru.yandex.clickhouse.ClickHouseDriver
... View more
Labels:
- Labels:
-
Apache NiFi
11-28-2019
04:29 AM
1 Kudo
i fix the problem . i changed the Ojdbc8.jar to Ojdbc7.jar thanks
... View more
11-27-2019
04:38 AM
Hello
i need to connect Nifi to Oracle DB but i faced a problem in the connection.
i copy Ojdbc.jar to lib directory
i choose QueryDatabaseTable processor ( i am not sure if it's correct )
- Database Connection URL : jdbc:oracle:thin:@//HostName:XXXX/XXXXXXX
- the Database Driver Class Name : oracle.jdbc.driver.OracleDriver
- Database Driver Location : C:\Users\d1156\Desktop\nifi-1.10.0\lib\ojdbc8.jar
after i start the processor this error appears :
unable to execute SQL select query - cannot create JDBC driver of class ' Oracle.jdbc.driver.oracleDriver'
Thank you .
... View more
Labels:
- Labels:
-
Apache NiFi