Member since
03-06-2017
32
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2833 | 10-09-2018 01:24 PM |
01-06-2020
10:53 PM
Hi, I have issues with other processors, but some processors do not have this issue for example UpdateAttributes. If you add this processor it will be work.
... View more
10-27-2019
12:27 AM
Thanks you Matt. I will try and will update Question
... View more
10-17-2019
06:35 AM
Hi All If in connection(queue between two processors) I switch on RoundRobin strategy flow files start to stuck there for minutes. Some investigation showed, that flow files stuck only on the primary node. No any Warns or Errors in logs. I am ready to provide any information Please, any help. Thanks
... View more
- Tags:
- NiFi
- nifi-processor
Labels:
- Labels:
-
Apache NiFi
10-09-2018
01:24 PM
In the end, I created the new cluster with HDF 3.2.0. With NiFi Registry made it very fast. Thanks to all which tried to help.
... View more
08-10-2018
05:11 AM
After each downgrade I changed it to 102. but this is not help.
... View more
08-09-2018
09:28 AM
Very strange issue. When I downgrade in cluster table I see desired_stack_id=51, but need to be 102. How ambari decide to which stack to downgrade? in update history all good
... View more
08-09-2018
09:14 AM
1) 2) 3) -4) 5) empty Any ideas?
... View more
08-09-2018
05:58 AM
This is the last upgrade_history
... View more
08-09-2018
05:27 AM
I made downgrade, service check on the zookeeper. Nothing helped. Which configs do I need to change in zookeeper?
stack: repo_version: cluster:
... View more
08-08-2018
08:44 PM
In this table, all values were set correctly. No nulls. Maybe this table work as log?
... View more
08-08-2018
08:11 PM
May be I need use ambari-server set-current cli command and not update in database?
... View more
08-08-2018
07:50 PM
My Ambari 2.6.2.2. I tried to downgrade and start an upgrade again. Same issue all the time. I had before this issue:https://community.hortonworks.com/questions/211829/upgrade-hdf-310-to-hdf-312.html . And maid manual changes in DB (only stack_id update) Maybe I need change more things? Thanks
... View more
08-08-2018
07:28 PM
Hi All, I try run rolling upgrade with subject versions. And it fails with the very strange message. I added the image with current state and in ambari-server.log I see this lines:
WARN [Server Action Executor Worker 2015] ServerActionExecutor:471 - Task #2015 failed to complete execution due to thrown exception: java.lang.NullPointerException:null
java.lang.NullPointerException
at org.apache.ambari.server.serveraction.upgrades.ConfigureAction.execute(ConfigureAction.java:200)
at org.apache.ambari.server.serveraction.ServerActionExecutor$Worker.execute(ServerActionExecutor.java:550)
at org.apache.ambari.server.serveraction.ServerActionExecutor$Worker.run(ServerActionExecutor.java:466)
at java.lang.Thread.run(Thread.java:745)
Please help. Thanks, Ilya
... View more
Labels:
- Labels:
-
Apache Ambari
-
Cloudera DataFlow (CDF)
08-08-2018
09:12 AM
In the incognito mode, I see the same result. This is errors in console:
... View more
08-08-2018
08:54 AM
Hi All, I tried upgrading from HDF 3.1.1.0 to HDF 3.1.2. I upgraded to the new version of Ambari 2.6.2 from Ambari 2.6.0 And now Ambari not work as needed. In upper menu do not exist, all menu items as " Dashboard Services Hosts Alerts Admin" not there. If I click on specific service and press on button "Service Actions" only start and stop there and circle which show the loading process. And I can to continue upgrade process because Ambari asks to run a service check. If I install the new version of HDF 3.1.2 (not upgrade) all work as needed. Please help. In logs of Ambari I cannot find anything strange. Update: I found some strange rows in ambari-server.log Maybe something in scheduler? ClusterImpl:1953 - No service found for config types '[cluster-env]', service config version not created
<pre>08 Aug 2018 16:38:02,103 INFO [main] QuartzScheduler:305 - Scheduler meta-data: Quartz Scheduler (v2.2.1) 'ExecutionScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 5 threads.
Using job-store 'org.quartz.impl.jdbcjobstore.JobStoreTX' - which supports persistence. and is not clustered.
08 Aug 2018 16:38:01,400 WARN [main] Errors:173 - The following warnings have been detected with resource and/or provider classes:
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.ActionService.getActionDefinitions(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.ActionService.getActionDefinition(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getServices(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getService(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getServiceComponents(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getServiceComponent(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getRootHosts(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getRootHost(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getRootHostComponents(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getRootHostComponent(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String,java.lang.String,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RootServiceService.getRootHostComponent(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RequestService.getRequests(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.RequestService.getRequest(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.BlueprintService.getBlueprints(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.BlueprintService.getBlueprint(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.StacksService.getStacks(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.StacksService.getStack(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String), should not consume any entity.
WARNING: A HTTP GET method, public javax.ws.rs.core.Response org.apache.ambari.server.api.services.StacksService.getStackLevelConfigurations(java.lang.String,javax.ws.rs.core.HttpHeaders,javax.ws.rs.core.UriInfo,java.lang.String,java.lang.String), should not consume any entity.
... View more
Labels:
- Labels:
-
Apache Ambari
-
Cloudera DataFlow (CDF)
07-29-2018
03:05 PM
@hmatta Thanks for your answer. We user ranger plugin. If we user HDF 3.1.1.0 what should we do for resolving? Is it enough update to 3.1.2.0?
... View more
07-29-2018
01:02 PM
Hi All, We set jmxremote and saw what NiFi has a memory leak. GC can not clean located memory, and after 2-3 days NiFi starts to work strange(all the time tries switch primary, monitoring API does not answer). We made the update of HDF from 3.0 to 3.1. Our environment is production, therefore we are limited for actions. In our flow, we use all standard processors and one ExecuteScript (ECMAScript) Java8 If you know how to search leak source or have any idea how to search we will be grateful. Thanks, Ilya
... View more
Labels:
- Labels:
-
Apache NiFi
07-19-2018
06:49 AM
Hello All, I asked previously about PutSQL behavior on the node or network failure.Here But now I have the more complicated issue. How with NiFi and PutSQL processor I can guarantee "exactly one" policy? Which flow do I need to build? Thanks, Ilya
... View more
Labels:
- Labels:
-
Apache NiFi
07-11-2018
03:17 PM
Hi All, I have the theoretical question. What happened if PutSQL sent the request to SQL server and exactly in this time Node failed(die/power shutdown). PutSQL did not receive the answer from the SQL server if rows were inserted or not. What does happen in this way? Will this rows be sent one more time? or will be sent to fail connection? Or something else will happen? Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
07-10-2017
01:42 PM
Thanks Koji, This was very helpful. May be do you know any ZK client app which use ZookeeperStateProvider's serialize and deserialize methods?
... View more
07-02-2017
03:06 PM
Hi All, I use 2 standard processors GenerateTableFetch and QueryDatabaseTable. For some reason I need to set state for GenerateTableFetch processor from QueryDatabaseTable. I checked previous value in zookeeper and it very strange, with invisible characters. I made copy-paste from one to another, without success. Any ideas how I can change state for this processors manualy? Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
07-02-2017
02:37 PM
@Bryan Bende Thank you very much. This type works. My next step will be refactoring.
... View more
06-29-2017
09:14 AM
Hi all, I have strange issue with NoClassDefFoundError.
I get standard processor GenerateTableFetch and change it for my needs. But I always get error java.lang.NoClassDefFoundError: org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:138)
at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:113)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:699)
at org.apache.nifi.NiFi.<init>(NiFi.java:160)
at org.apache.nifi.NiFi.main(NiFi.java:267)
Caused by: java.lang.ClassNotFoundException: org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 21 common frames omitted
I read all relevant answers here and in google, but without success. Files: nifi-teg-bundle/pom.xml <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.2.0</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-bundle</artifactId>
<version>1.0</version>
<packaging>pom</packaging>
<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
<extensions>true</extensions>
</plugin>
</plugins>
</build>
<modules>
<module>nifi-teg-processors</module>
<module>nifi-teg-nar</module>
</modules>
</project>
nifi-teg-bundle/nifi-teg-processors/pom.xml <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-bundle</artifactId>
<version>1.0</version>
</parent>
<artifactId>nifi-teg-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-utils</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
nifi-teg-bundle/nifi-teg-nar/pom.xml <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-bundle</artifactId>
<version>1.0</version>
</parent>
<artifactId>nifi-teg-nar</artifactId>
<version>1.0</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-teg-processors</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>
nifi-teg-bundle/nifi-teg-processors/src/main/java/org/apache/nifi/processors/teg/MyProcessor.java package org.apache.nifi.processors.teg;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.db.DatabaseAdapter;
import org.apache.nifi.processors.standard.AbstractDatabaseFetchProcessor;
import org.apache.nifi.processors.standard.QueryDatabaseTable;
import org.apache.nifi.processors.standard.ExecuteSQL;
import org.apache.nifi.processors.standard.ListDatabaseTables;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@TriggerSerially
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"})
@SeeAlso({QueryDatabaseTable.class, ExecuteSQL.class, ListDatabaseTables.class})
@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, "
+ "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, "
+ "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums. This "
+ "processor is intended to be run on the Primary Node only.\n\n"
+ "This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:\n"
+ " - If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many "
+ "fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.\n"
+ " - If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.\n"
+ " - If incoming connection(s) are specified and a flow file is available to a processor task, the flow file's attributes may be used in Expression Language for such fields "
+ "as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.")
@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for "
+ "the specified column(s) will be retained for use in future executions of the query. This allows the Processor "
+ "to fetch only those records that have max values greater than the retained values. This can be used for "
+ "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor "
+ "per the State Management documentation")
@WritesAttributes({
@WritesAttribute(attribute = "generatetablefetch.sql.error", description = "If the processor has incoming connections, and processing an incoming flow file causes "
+ "a SQL Exception, the flow file is routed to failure and this attribute is set to the exception message."),
@WritesAttribute(attribute = "generatetablefetch.tableName", description = "The name of the database table to be queried."),
@WritesAttribute(attribute = "generatetablefetch.columnNames", description = "The comma-separated list of column names used in the query."),
@WritesAttribute(attribute = "generatetablefetch.whereClause", description = "Where clause used in the query to get the expected rows."),
@WritesAttribute(attribute = "generatetablefetch.maxColumnNames", description = "The comma-separated list of column names used to keep track of data "
+ "that has been returned since the processor started running."),
@WritesAttribute(attribute = "generatetablefetch.limit", description = "The number of result rows to be fetched by the SQL statement."),
@WritesAttribute(attribute = "generatetablefetch.offset", description = "Offset to be used to retrieve the corresponding partition.")
})
public class MyProcessor extends AbstractDatabaseFetchProcessor {
public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder()
.name("gen-table-fetch-partition-size")
.displayName("Partition Size")
.description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in "
+ "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A "
+ "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows "
+ "in the table.")
.defaultValue("10000")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor MSSQL_ISOLATION_LEVEL = new PropertyDescriptor.Builder()
.name("gen-table-fetch-isolation-level")
.displayName("Isolation Level")
.description("MSSQL Isolation level for transactions")
.required(false)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor RIGHT_BOUND_WHERE = new PropertyDescriptor.Builder()
.name("gen-table-fetch-right-bounded")
.displayName("Right Bounded")
.description("Whether to include the new max value(s) as a right hand boundary in the where statement. "
+ "If this is set to false duplicate data may be returned by consecutive executions of the "
+ "processor if there are deletions or if the table is high volume.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("This relationship is only used when SQL query execution (using an incoming FlowFile) failed. The incoming FlowFile will be penalized and routed to this relationship. "
+ "If no incoming connection(s) are specified, this relationship is unused.")
.build();
public MyProcessor() {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
r.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(DBCP_SERVICE);
pds.add(DB_TYPE);
pds.add(TABLE_NAME);
pds.add(COLUMN_NAMES);
pds.add(RIGHT_BOUND_WHERE);
pds.add(MAX_VALUE_COLUMN_NAMES);
pds.add(QUERY_TIMEOUT);
pds.add(PARTITION_SIZE);
pds.add(MSSQL_ISOLATION_LEVEL);
propDescriptors = Collections.unmodifiableList(pds);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propDescriptors;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return super.customValidate(validationContext);
}
@Override
@OnScheduled
public void setup(final ProcessContext context) {
// Pre-fetch the column types if using a static table name and max-value columns
if (!isDynamicTableName && !isDynamicMaxValues) {
super.setup(context);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
ProcessSession session = sessionFactory.createSession();
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
if (fileToProcess == null) {
// Incoming connection with no flow file available, do no work (see capability description)
return;
}
}
final ComponentLog logger = getLogger();
final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue());
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(fileToProcess).getValue();
final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions(fileToProcess).getValue();
final int partitionSize = context.getProperty(PARTITION_SIZE).evaluateAttributeExpressions(fileToProcess).asInteger();
final String isolationLevel = context.getProperty(MSSQL_ISOLATION_LEVEL).evaluateAttributeExpressions(fileToProcess).getValue();
final Boolean rightBounded = context.getProperty(RIGHT_BOUND_WHERE).asBoolean();
final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
} catch (final IOException ioe) {
logger.error("Failed to retrieve observed maximum values from the State Manager. Will not perform "
+ "query until this is accomplished.", ioe);
context.yield();
return;
}
try {
// Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually
// set as the current state map (after the session has been committed)
final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap());
// Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The
// executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the
// specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the
// latest observed maximum values.
String whereClause = null;
List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames)
? new ArrayList<>(0)
: Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size());
String columnsClause = null;
List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1);
maxValueSelectColumns.add("COUNT(*)");
// For each maximum-value column, get a WHERE filter and a MAX(column) alias
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
// final String fullyQualifiedStateKey = getStateKey(tableName, colName);
// String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
// if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
// // If the table name is static and the fully-qualified key was not found, try just the column name
// maxValue = statePropertyMap.get(getStateKey(null, colName));
// }
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
// Integer type = columnTypeMap.get(fullyQualifiedStateKey);
// if (type == null && !isDynamicTableName) {
// // If the table name is static and the fully-qualified key was not found, try just the column name
// type = columnTypeMap.get(getStateKey(null, colName));
// }
// if (type == null) {
// // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
// throw new IllegalArgumentException("No column type found for: " + colName);
// }
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
whereClause = StringUtils.join(maxValueClauses, " AND ");
columnsClause = StringUtils.join(maxValueSelectColumns, ", ");
// Build a SELECT query with maximum-value columns (if present)
final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null);
long rowCount = 0;
String isolationLevelQuery = "";
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.SECONDS).intValue();
st.setQueryTimeout(queryTimeout); // timeout in seconds
if(isolationLevel!= null){
isolationLevelQuery = String.format("SET TRANSACTION ISOLATION LEVEL %s;", isolationLevel);
}
logger.debug("Executing {}", new Object[]{selectQuery});
ResultSet resultSet;
resultSet = st.executeQuery(isolationLevelQuery.concat(selectQuery));
if (resultSet.next()) {
// Total row count is in the first column
rowCount = resultSet.getLong(1);
// Update the state map with the newly-observed maximum values
ResultSetMetaData rsmd = resultSet.getMetaData();
for (int i = 2; i <= rsmd.getColumnCount(); i++) {
//Some JDBC drivers consider the columns name and label to be very different things.
// Since this column has been aliased lets check the label first,
// if there is no label we'll use the column name.
String resultColumnName = (StringUtils.isNotEmpty(rsmd.getColumnLabel(i))?rsmd.getColumnLabel(i):rsmd.getColumnName(i)).toLowerCase();
String fullyQualifiedStateKey = getStateKey(tableName, resultColumnName);
String resultColumnCurrentMax = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(resultColumnCurrentMax) && !isDynamicTableName) {
// If we can't find the value at the fully-qualified key name and the table name is static, it is possible (under a previous scheme)
// the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new
// maximum value is observed, it will be stored under the fully-qualified key from then on.
resultColumnCurrentMax = statePropertyMap.get(resultColumnName);
}
int type = rsmd.getColumnType(i);
if (isDynamicTableName) {
// We haven't pre-populated the column type map if the table name is dynamic, so do it here
columnTypeMap.put(fullyQualifiedStateKey, type);
}
try {
String newMaxValue = getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, dbAdapter.getName());
if (newMaxValue != null) {
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
}
} catch (ParseException | IOException pie) {
// Fail the whole thing here before we start creating flow files and such
throw new ProcessException(pie);
}
}
} else {
// Something is very wrong here, one row (even if count is zero) should be returned
throw new SQLException("No rows returned from metadata query: " + selectQuery);
}
// If Right Bounded, for each maximum-value column get a right bounding WHERE condition
if(rightBounded){
IntStream.range(0, maxValueColumnNameList.size()).forEach((index) -> {
String colName = maxValueColumnNameList.get(index);
maxValueSelectColumns.add("MAX(" + colName + ") " + colName);
String maxValue = getColumnStateMaxValue(tableName, statePropertyMap, colName);
if (!StringUtils.isEmpty(maxValue)) {
Integer type = getColumnType(tableName, colName);
// Add a condition for the WHERE clause
maxValueClauses.add(colName + " <= " + getLiteralByType(type, maxValue, dbAdapter.getName()));
}
});
//Update WHERE list to include new right hand boundaries
whereClause = StringUtils.join(maxValueClauses, " AND ");
}
final long numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
// Generate SQL statements to read "pages" of data
for (long i = 0; i < numberOfFetches; i++) {
long limit = partitionSize == 0 ? null : partitionSize;
long offset = partitionSize == 0 ? null : i * partitionSize;
final String maxColumnNames = StringUtils.join(maxValueColumnNameList, ", ");
final String query = dbAdapter.getSelectStatement(tableName, columnNames, whereClause, maxColumnNames, limit, offset);
FlowFile sqlFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess);
sqlFlowFile = session.write(sqlFlowFile, out -> out.write(query.getBytes()));
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.tableName", tableName);
if (columnNames != null) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.columnNames", columnNames);
}
if (StringUtils.isNotBlank(whereClause)) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.whereClause", whereClause);
}
if (StringUtils.isNotBlank(maxColumnNames)) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.maxColumnNames", maxColumnNames);
}
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.limit", String.valueOf(limit));
if (partitionSize != 0) {
sqlFlowFile = session.putAttribute(sqlFlowFile, "generatetablefetch.offset", String.valueOf(offset));
}
session.transfer(sqlFlowFile, REL_SUCCESS);
}
if (fileToProcess != null) {
session.remove(fileToProcess);
}
} catch (SQLException e) {
if (fileToProcess != null) {
logger.error("Unable to execute SQL select query {} due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
fileToProcess = session.putAttribute(fileToProcess, "generatetablefetch.sql.error", e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
} else {
logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e});
throw new ProcessException(e);
}
}
session.commit();
try {
// Update the state
stateManager.setState(statePropertyMap, Scope.CLUSTER);
} catch (IOException ioe) {
logger.error("{} failed to update State Manager, observed maximum values will not be recorded. "
+ "Also, any generated SQL statements may be duplicated.",
new Object[]{this, ioe});
}
} catch (final ProcessException pe) {
// Log the cause of the ProcessException if it is available
Throwable t = (pe.getCause() == null ? pe : pe.getCause());
logger.error("Error during processing: {}", new Object[]{t.getMessage()}, t);
session.rollback();
context.yield();
}
}
private String getColumnStateMaxValue(String tableName, Map<String, String> statePropertyMap, String colName) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
String maxValue = statePropertyMap.get(fullyQualifiedStateKey);
if (StringUtils.isEmpty(maxValue) && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
maxValue = statePropertyMap.get(getStateKey(null, colName));
}
return maxValue;
}
private Integer getColumnType(String tableName, String colName) {
final String fullyQualifiedStateKey = getStateKey(tableName, colName);
Integer type = columnTypeMap.get(fullyQualifiedStateKey);
if (type == null && !isDynamicTableName) {
// If the table name is static and the fully-qualified key was not found, try just the column name
type = columnTypeMap.get(getStateKey(null, colName));
}
if (type == null) {
// This shouldn't happen as we are populating columnTypeMap when the processor is scheduled or when the first maximum is observed
throw new IllegalArgumentException("No column type found for: " + colName);
}
return type;
}
}
I thought what I added all possible dependencies. Anybody please help me. Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
05-23-2017
09:01 AM
I used GenerateTableFetch for query generation split by id. All queries go to ExecuteScript processor, where I change sql( in you case add join) After that use ExecuteSQL
... View more
04-26-2017
08:11 AM
1 Kudo
Hello all, We planing build data warehouse on hive. With streaming data to it. I am looking for best practices or any ideas. I found only one article about this, but it very old. For any experience sharing will be very appreciate Thanks
... View more
Labels:
- Labels:
-
Apache Hive
03-06-2017
02:29 PM
It is not make sense, QueryDatabaseTable has state in cluster scope. I think this is make it possible to get data parallel. Or state exist for something else?
... View more