/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.nifi.processors.standard; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLNonTransientException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.xml.bind.DatatypeConverter; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; @SupportsBatching @SeeAlso({}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"sql", "put", "rdbms", "database", "update", "insert", "relational"}) @CapabilityDescription("Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command " + "to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes " + "with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be " + "a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.") @ReadsAttributes({ @ReadsAttribute(attribute = "fragment.identifier", description = "If the property is true, this attribute is used to determine whether or " + "not two FlowFiles belong to the same transaction."), @ReadsAttribute(attribute = "fragment.count", description = "If the property is true, this attribute is used to determine how many FlowFiles " + "are needed to complete the transaction."), @ReadsAttribute(attribute = "fragment.index", description = "If the property is true, this attribute is used to determine the order that the FlowFiles " + "in a transaction should be evaluated."), @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The type of each Parameter is specified as an integer " + "that represents the JDBC Type of the parameter."), @ReadsAttribute(attribute = "sql.args.N.value", description = "Incoming FlowFiles are expected to be parameterized SQL statements. The value of the Parameters are specified as " + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute = "sql.args.N.format", description = "This attribute is always optional, but default options may not always work for your data. " + "Incoming FlowFiles are expected to be parameterized SQL statements. In some cases " + "a format option needs to be specified, currently this is only applicable for binary data types. For binary data types " + "available options are 'ascii', 'base64' and 'hex'. In 'ascii' format each string character in your attribute value represents a single byte, this is the default format " + "and the format provided by Avro Processors. In 'base64' format your string is a Base64 encoded string. In 'hex' format the string is hex encoded with all " + "letters in upper case and no '0x' at the beginning.") }) @WritesAttributes({ @WritesAttribute(attribute = "sql.generated.key", description = "If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, " + "this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.") }) public class PutDeleteInsertSQL extends AbstractProcessor { static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder() .name("JDBC Connection Pool") .description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. " + "The Connection Pool is necessary in order to determine the appropriate database column types.") .identifiesControllerService(DBCPService.class) .required(true) .build(); static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder() .name("Support Fragmented Transactions") .description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. " + "If the fragment.count value is greater than 1, the Processor will not process any FlowFile will that fragment.identifier until all are available; " + "at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. " + "This Provides atomicity of those SQL statements. If this value is false, these attributes will be ignored and the updates will occur independent of one another.") .allowableValues("false") .defaultValue("false") .build(); static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder() .name("Transaction Timeout") .description("If the property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute " + "to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship") .required(false) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") .description("The preferred number of FlowFiles to put to the database in a single transaction") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("100") .build(); static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder() .name("Obtain Generated Keys") .description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. " + "This may result in slightly slower performance and is not supported by all databases.") .allowableValues("false") .defaultValue("false") .build(); // 删除字段 static final PropertyDescriptor DELETE_KEYS = new PropertyDescriptor.Builder().name("Obtain delete key") .description("delete key, just one column, assume it is primary key").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); //排序去重字段 static final PropertyDescriptor ORDER_DISTINCT_KEYS = new PropertyDescriptor.Builder().name("DESC sort Distinct key") .description("distinct the record by 'Obtain delete keys',delete the old records by the the parameter").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is routed to this relationship after the database is successfully updated") .build(); static final Relationship REL_RETRY = new Relationship.Builder() .name("retry") .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") .build(); static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, " + "such as an invalid query or an integrity constraint violation") .build(); static final Relationship REL_REPEAT = new Relationship.Builder() .name("repeat") .description("A FlowFile is routed to this relationship if there are any delete key repeat flowfile, to keep there are not repeat record by insert to db " ) .build(); private static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); private static final String FRAGMENT_ID_ATTR = "fragment.identifier"; private static final String FRAGMENT_INDEX_ATTR = "fragment.index"; private static final String FRAGMENT_COUNT_ATTR = "fragment.count"; private static final Pattern LONG_PATTERN = Pattern.compile("^\\d{1,19}$"); @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(CONNECTION_POOL); properties.add(SUPPORT_TRANSACTIONS); properties.add(TRANSACTION_TIMEOUT); properties.add(BATCH_SIZE); properties.add(OBTAIN_GENERATED_KEYS); properties.add(DELETE_KEYS); properties.add(ORDER_DISTINCT_KEYS); return properties; } @Override public Set getRelationships() { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); rels.add(REL_RETRY); rels.add(REL_FAILURE); rels.add(REL_REPEAT); return rels; } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final FlowFilePoll poll = pollFlowFiles(context, session); if (poll == null) { return; } final List flowFilesss = poll.getFlowFiles(); if (flowFilesss == null) { return; } final Pair,List> pair= getRepeateFlowFile(flowFilesss, context, session); final List flowFiles = pair.getV(); getLogger().info("there should to handle file is "+flowFiles.size()); final long startNanos = System.nanoTime(); final boolean obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean(); final Map statementMap = new HashMap<>(); // Map SQL to a PreparedStatement and FlowFiles final List sentFlowFiles = new ArrayList<>(); // flowfiles that have been sent final List processedFlowFiles = new ArrayList<>(); // all flowfiles that we have processed final Set enclosuresToExecute = new LinkedHashSet<>(); // the enclosures that we've processed // Because we can have a transaction that is necessary across multiple FlowFiles, things get complicated when // some FlowFiles have been transferred to a relationship and then there is a failure. As a result, we will just // map all FlowFiles to their destination relationship and do the session.transfer at the end. This way, if there // is a failure, we can route all FlowFiles to failure if we need to. final Map destinationRelationships = new HashMap<>(); final List repeateFlowFile = pair.getK(); if(repeateFlowFile !=null && !repeateFlowFile.isEmpty()){ for(FlowFile aff:repeateFlowFile){ destinationRelationships.put(aff, REL_REPEAT); } } getLogger().info("there should to repeat file is "+repeateFlowFile.size()); final DBCPService dbcpService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); final String deleteKeys = context.getProperty(DELETE_KEYS).toString(); try (final Connection conn = dbcpService.getConnection()) { final boolean originalAutoCommit = conn.getAutoCommit(); try { conn.setAutoCommit(false); for (final FlowFile flowFile : flowFiles) { processedFlowFiles.add(flowFile); final String sql = getSQL(session, flowFile); // Get the appropriate PreparedStatement to use. final StatementFlowFileEnclosure enclosure; try { enclosure = getEnclosure(sql, conn, statementMap, obtainKeys, poll.isFragmentedTransaction()); } catch (final SQLNonTransientException e) { getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); destinationRelationships.put(flowFile, REL_FAILURE); continue; } final PreparedStatement stmt = enclosure.getStatement(); // set the appropriate parameters on the statement. try { setParameters(stmt, flowFile.getAttributes()); } catch (final SQLException | ProcessException pe) { getLogger().error("Cannot update database for {} due to {}; routing to failure", new Object[] {flowFile, pe.toString()}, pe); destinationRelationships.put(flowFile, REL_FAILURE); continue; } // If we need to obtain keys, we cannot do so in a a Batch Update. So we have to execute the statement and close it. if (obtainKeys) { try { // Execute the actual update. stmt.executeUpdate(); // attempt to determine the key that was generated, if any. This is not supported by all // database vendors, so if we cannot determine the generated key (or if the statement is not an INSERT), // we will just move on without setting the attribute. FlowFile sentFlowFile = flowFile; final String generatedKey = determineGeneratedKey(stmt); if (generatedKey != null) { sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey); } stmt.close(); sentFlowFiles.add(sentFlowFile); } catch (final SQLNonTransientException e) { getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {flowFile, e}); destinationRelationships.put(flowFile, REL_FAILURE); continue; } catch (final SQLException e) { getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {flowFile, e}); destinationRelationships.put(flowFile, REL_RETRY); continue; } } else { // We don't need to obtain keys. Just add the statement to the batch. stmt.addBatch(); enclosure.addFlowFile(flowFile); enclosuresToExecute.add(enclosure); } } // If we are not trying to obtain the generated keys, we will have // PreparedStatement's that have batches added to them. We need to execute each batch and close // the PreparedStatement. for (final StatementFlowFileEnclosure enclosure : enclosuresToExecute) { Statement statement=null; try { String deleteStr = getDeleteStatement(enclosure,session,deleteKeys); getLogger().debug(deleteStr); statement=conn.createStatement(); statement.execute(deleteStr); final PreparedStatement stmt = enclosure.getStatement(); stmt.executeBatch(); sentFlowFiles.addAll(enclosure.getFlowFiles()); } catch (final BatchUpdateException e) { // If we get a BatchUpdateException, then we want to determine which FlowFile caused the failure, // and route that FlowFile to failure while routing those that finished processing to success and those // that have not yet been executed to retry. If the FlowFile was // part of a fragmented transaction, then we must roll back all updates for this connection, because // other statements may have been successful and been part of this transaction. final int[] updateCounts = e.getUpdateCounts(); final int offendingFlowFileIndex = updateCounts.length; final List batchFlowFiles = enclosure.getFlowFiles(); if (poll.isFragmentedTransaction()) { // There are potentially multiple statements for this one transaction. As a result, // we need to roll back the entire transaction and route all of the FlowFiles to failure. conn.rollback(); final FlowFile offendingFlowFile = batchFlowFiles.get(offendingFlowFileIndex); getLogger().error("Failed to update database due to a failed batch update. A total of {} FlowFiles are required for this transaction, so routing all to failure. " + "Offending FlowFile was {}, which caused the following error: {}", new Object[] {flowFiles.size(), offendingFlowFile, e}); session.transfer(flowFiles, REL_FAILURE); return; } // In the presence of a BatchUpdateException, the driver has the option of either stopping when an error // occurs, or continuing. If it continues, then it must account for all statements in the batch and for // those that fail return a Statement.EXECUTE_FAILED for the number of rows updated. // So we will iterate over all of the update counts returned. If any is equal to Statement.EXECUTE_FAILED, // we will route the corresponding FlowFile to failure. Otherwise, the FlowFile will go to success // unless it has not yet been processed (its index in the List > updateCounts.length). int failureCount = 0; int successCount = 0; int retryCount = 0; for (int i = 0; i < updateCounts.length; i++) { final int updateCount = updateCounts[i]; final FlowFile flowFile = batchFlowFiles.get(i); if (updateCount == Statement.EXECUTE_FAILED) { destinationRelationships.put(flowFile, REL_FAILURE); failureCount++; } else { destinationRelationships.put(flowFile, REL_SUCCESS); successCount++; } } if (failureCount == 0) { // if no failures found, the driver decided not to execute the statements after the // failure, so route the last one to failure. final FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length); destinationRelationships.put(failedFlowFile, REL_FAILURE); failureCount++; } if (updateCounts.length < batchFlowFiles.size()) { final List unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size()); for (final FlowFile flowFile : unexecuted) { destinationRelationships.put(flowFile, REL_RETRY); retryCount++; } } getLogger().error("Failed to update database due to a failed batch update. There were a total of {} FlowFiles that failed, {} that succeeded, " + "and {} that were not execute and will be routed to retry; ", new Object[] {failureCount, successCount, retryCount}); } catch (final SQLNonTransientException e) { getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {enclosure.getFlowFiles(), e}); for (final FlowFile flowFile : enclosure.getFlowFiles()) { destinationRelationships.put(flowFile, REL_FAILURE); } continue; } catch (final SQLException e) { getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {enclosure.getFlowFiles(), e}); for (final FlowFile flowFile : enclosure.getFlowFiles()) { destinationRelationships.put(flowFile, REL_RETRY); } continue; } finally { if(statement !=null){ try{ statement.close(); }catch(final Exception e){ getLogger().error("Failed to close statement {};",e); } } enclosure.getStatement().close(); } } } finally { try { conn.commit(); } finally { // make sure that we try to set the auto commit back to whatever it was. if (originalAutoCommit) { try { conn.setAutoCommit(originalAutoCommit); } catch (final SQLException se) { } } } } // Determine the database URL String url = "jdbc://unknown-host"; try { url = conn.getMetaData().getURL(); } catch (final SQLException sqle) { } // Emit a Provenance SEND event final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); for (final FlowFile flowFile : sentFlowFiles) { session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true); } for (final FlowFile flowFile : sentFlowFiles) { destinationRelationships.put(flowFile, REL_SUCCESS); } } catch (final SQLException e) { // Failed FlowFiles are all of them that we have processed minus those that were successfully sent final List failedFlowFiles = processedFlowFiles; failedFlowFiles.removeAll(sentFlowFiles); // All FlowFiles yet to be processed is all FlowFiles minus those processed final List retry = flowFiles; retry.removeAll(processedFlowFiles); final Relationship rel; if (e instanceof SQLNonTransientException) { getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[] {failedFlowFiles, e}); rel = REL_FAILURE; } else { getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[] {failedFlowFiles, e}); rel = REL_RETRY; } for (final FlowFile flowFile : failedFlowFiles) { destinationRelationships.put(flowFile, rel); } for (final FlowFile flowFile : retry) { destinationRelationships.put(flowFile, Relationship.SELF); } } for (final Map.Entry entry : destinationRelationships.entrySet()) { session.transfer(entry.getKey(), entry.getValue()); } } private Pair,List> getRepeateFlowFile(final List input,final ProcessContext context, final ProcessSession session){ final String desckey = context.getProperty(ORDER_DISTINCT_KEYS).toString().trim().toUpperCase(); final Map map = new TreeMap( new Comparator() { public int compare(String obj1, String obj2) { // sort by return obj2.compareTo(obj1); } }); final String deleteKey = context.getProperty(DELETE_KEYS).toString().trim().toUpperCase(); final Set pkDistinct = new HashSet(); //连接字符串 final String concatKey="||||||||"; for(FlowFile aFlowFile : input){ //SQL final String sql = getSQL(session, aFlowFile); // 得到Delete语句 final String sqlTemp = sql.toUpperCase(); final String coloumTemp = sqlTemp.substring(sqlTemp.indexOf("(") + 1, sqlTemp.indexOf(")")); final String[] allColoumns = coloumTemp.split(","); final String pkValue = getColoumnsValue(deleteKey,aFlowFile,allColoumns); final String desckeyValue = getColoumnsValue(desckey,aFlowFile,allColoumns); final String pkConcatDesc = pkValue+concatKey+desckeyValue; final String pkConcat = pkValue + concatKey; pkDistinct.add(pkConcat); map.put(pkConcatDesc, aFlowFile); } final List repeateFlowFile = new ArrayList(); final List whileHandeFlowFile = new ArrayList(); for(String pkConcatKey: pkDistinct){ int i=0; for(String pkConcatDescKey: map.keySet()){ if(pkConcatDescKey.startsWith(pkConcatKey) && i ==0){ whileHandeFlowFile.add(map.get(pkConcatDescKey)); i++; } } } for(FlowFile afile:input ){ boolean result =false; for(FlowFile awhile:whileHandeFlowFile){ if(awhile.equals(afile)){ result = true; } } if(!result){ repeateFlowFile.add(afile); } } return new Pair,List>(repeateFlowFile, whileHandeFlowFile); } private static class Pair { private K k; private V v; public Pair(K k,V v){ this.k=k; this.v=v; } public K getK() { return k; } public V getV() { return v; } } private String getColoumnsValue(final String coloumnName,final FlowFile aFlowFile,final String[] allColoumns){ int index = 0; int k = 1; for (String column : allColoumns) { String columnTemp = column.trim().toUpperCase(); if (columnTemp.equals(coloumnName)) { index = k; break; } k++; } if (index == 0) { getLogger().error("delete or distinct key is wrong;"); throw new RuntimeException("delete or distinct key is wrong;"); } return aFlowFile.getAttributes().get("sql.args." + index + ".value"); } /** * 产生delete statement * * @param enclosure 一个SQL对应一个enclosure * @param session * @param deleteKeys 需要删除的KEYS * @return */ private String getDeleteStatement(StatementFlowFileEnclosure enclosure, ProcessSession session, String deleteKey) { final List flowFiles = enclosure.getFlowFiles(); final String sql = getSQL(session, flowFiles.get(0)); // 得到Delete语句 String sqlTemp = sql.toUpperCase(); String tableNameTemp = sqlTemp.substring(sqlTemp.indexOf("INTO") + 4, sqlTemp.indexOf("(") - 1); final String coloumTemp = sqlTemp.substring(sqlTemp.indexOf("(") + 1, sqlTemp.indexOf(")")); final String[] allColoumns = coloumTemp.split(","); StringBuffer deleteSqlSB = new StringBuffer(); deleteSqlSB.append("DELETE FROM ").append(tableNameTemp).append(" where ").append(deleteKey).append(" in ("); Integer index = 0; final String keyTemp = deleteKey.trim().toUpperCase(); int k = 1; for (String column : allColoumns) { String columnTemp = column.trim().toUpperCase(); if (columnTemp.equals(keyTemp)) { index = k; break; } k++; } if (index == 0) { getLogger().error("delete key is wrong;"); throw new RuntimeException("delete key is wrong;"); } // 主键的类型,只支持两种 final int jdbcType = Integer.parseInt(flowFiles.get(0).getAttributes().get("sql.args." + index + ".type")); boolean isNumber = true; switch (jdbcType) { case Types.SMALLINT: case Types.INTEGER: case Types.BIGINT: case Types.REAL: case Types.DECIMAL: case Types.NUMERIC: break; default: isNumber = false; } StringBuffer pklist = new StringBuffer(); int i = 1; for (FlowFile ffile : flowFiles) { final String sqlaff = getSQL(session, ffile); // 得到Delete语句 final String sqlTempaff = sqlaff.toUpperCase(); final String coloumTempaff = sqlTempaff.substring(sqlTempaff.indexOf("(") + 1, sqlTempaff.indexOf(")")); final String[] allColoumnsaff = coloumTempaff.split(","); Integer indexaff = 0; int kaff = 1; for (String columnaff : allColoumnsaff) { String columnTempaff = columnaff.trim().toUpperCase(); if (columnTempaff.equals(keyTemp)) { indexaff = kaff; break; } kaff++; } if (indexaff == 0) { getLogger().error("delete key is wrong;"); throw new RuntimeException("delete key is wrong;"); } Map attributes = ffile.getAttributes(); final String valueAttrName = "sql.args." + indexaff + ".value"; final String parameterValue = attributes.get(valueAttrName); if (i == flowFiles.size()) { if (isNumber) { pklist.append(parameterValue).append(")"); } else { pklist.append("'").append(parameterValue).append("')"); } } else { if (isNumber) { pklist.append(parameterValue).append(","); } else { pklist.append("'").append(parameterValue).append("',"); } } i++; } return deleteSqlSB.append(pklist).toString(); } /** * Pulls a batch of FlowFiles from the incoming queues. If no FlowFiles are available, returns null. * Otherwise, a List of FlowFiles will be returned. * * If all FlowFiles pulled are not eligible to be processed, the FlowFiles will be penalized and transferred back * to the input queue and an empty List will be returned. * * Otherwise, if the Support Fragmented Transactions property is true, all FlowFiles that belong to the same * transaction will be sorted in the order that they should be evaluated. * * @param context the process context for determining properties * @param session the process session for pulling flowfiles * @return a FlowFilePoll containing a List of FlowFiles to process, or null if there are no FlowFiles to process */ private FlowFilePoll pollFlowFiles(final ProcessContext context, final ProcessSession session) { // Determine which FlowFile Filter to use in order to obtain FlowFiles. final boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean(); boolean fragmentedTransaction = false; final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); List flowFiles; if (useTransactions) { final TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(); flowFiles = session.get(filter); fragmentedTransaction = filter.isFragmentedTransaction(); } else { flowFiles = session.get(batchSize); } if (flowFiles.isEmpty()) { return null; } // If we are supporting fragmented transactions, verify that all FlowFiles are correct if (fragmentedTransaction) { final Relationship relationship = determineRelationship(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS)); if (relationship != null) { // if transferring back to self, penalize the FlowFiles. if (relationship == Relationship.SELF) { // penalize all of the FlowFiles that we are going to route to SELF. final ListIterator itr = flowFiles.listIterator(); while (itr.hasNext()) { final FlowFile flowFile = itr.next(); final FlowFile penalized = session.penalize(flowFile); itr.remove(); itr.add(penalized); } } session.transfer(flowFiles, relationship); return null; } // sort by fragment index. Collections.sort(flowFiles, new Comparator() { @Override public int compare(final FlowFile o1, final FlowFile o2) { return Integer.compare(Integer.parseInt(o1.getAttribute(FRAGMENT_INDEX_ATTR)), Integer.parseInt(o2.getAttribute(FRAGMENT_INDEX_ATTR))); } }); } return new FlowFilePoll(flowFiles, fragmentedTransaction); } /** * Returns the key that was generated from the given statement, or null if no key * was generated or it could not be determined. * * @param stmt the statement that generated a key * @return the key that was generated from the given statement, or null if no key * was generated or it could not be determined. */ private String determineGeneratedKey(final PreparedStatement stmt) { try { final ResultSet generatedKeys = stmt.getGeneratedKeys(); if (generatedKeys != null && generatedKeys.next()) { return generatedKeys.getString(1); } } catch (final SQLException sqle) { // This is not supported by all vendors. This is a best-effort approach. } return null; } /** * Returns the StatementFlowFileEnclosure that should be used for executing the given SQL statement * * @param sql the SQL to execute * @param conn the connection from which a PreparedStatement can be created * @param stmtMap the existing map of SQL to PreparedStatements * @param obtainKeys whether or not we need to obtain generated keys for INSERT statements * @param fragmentedTransaction whether or not the SQL pertains to a fragmented transaction * * @return a StatementFlowFileEnclosure to use for executing the given SQL statement * * @throws SQLException if unable to create the appropriate PreparedStatement */ private StatementFlowFileEnclosure getEnclosure(final String sql, final Connection conn, final Map stmtMap, final boolean obtainKeys, final boolean fragmentedTransaction) throws SQLException { StatementFlowFileEnclosure enclosure = stmtMap.get(sql); if (enclosure != null) { return enclosure; } if (obtainKeys) { // Create a new Prepared Statement, requesting that it return the generated keys. PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); if (stmt == null) { // since we are passing Statement.RETURN_GENERATED_KEYS, calls to conn.prepareStatement will // in some cases (at least for DerbyDB) return null. // We will attempt to recompile the statement without the generated keys being returned. stmt = conn.prepareStatement(sql); } // If we need to obtain keys, then we cannot do a Batch Update. In this case, // we don't need to store the PreparedStatement in the Map because we aren't // doing an addBatch/executeBatch. Instead, we will use the statement once // and close it. return new StatementFlowFileEnclosure(stmt); } else if (fragmentedTransaction) { // We cannot use Batch Updates if we have a transaction that spans multiple FlowFiles. // If we did, we could end up processing the statements out of order. It's quite possible // that we could refactor the code some to allow for this, but as it is right now, this // could cause problems. This is because we have a Map. // If we had a transaction that needed to execute Stmt A with some parameters, then Stmt B with // some parameters, then Stmt A with different parameters, this would become problematic because // the executeUpdate would be evaluated first for Stmt A (the 1st and 3rd statements, and then // the second statement would be evaluated). final PreparedStatement stmt = conn.prepareStatement(sql); return new StatementFlowFileEnclosure(stmt); } final PreparedStatement stmt = conn.prepareStatement(sql); enclosure = new StatementFlowFileEnclosure(stmt); stmtMap.put(sql, enclosure); return enclosure; } /** * Determines the SQL statement that should be executed for the given FlowFile * * @param session the session that can be used to access the given FlowFile * @param flowFile the FlowFile whose SQL statement should be executed * * @return the SQL that is associated with the given FlowFile */ private String getSQL(final ProcessSession session, final FlowFile flowFile) { // Read the SQL from the FlowFile's content final byte[] buffer = new byte[(int) flowFile.getSize()]; session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { StreamUtils.fillBuffer(in, buffer); } }); // Create the PreparedStatement to use for this FlowFile. final String sql = new String(buffer, StandardCharsets.UTF_8); return sql; } /** * Sets all of the appropriate parameters on the given PreparedStatement, based on the given FlowFile attributes. * * @param stmt the statement to set the parameters on * @param attributes the attributes from which to derive parameter indices, values, and types * @throws SQLException if the PreparedStatement throws a SQLException when the appropriate setter is called */ private void setParameters(final PreparedStatement stmt, final Map attributes) throws SQLException { for (final Map.Entry entry : attributes.entrySet()) { final String key = entry.getKey(); final Matcher matcher = SQL_TYPE_ATTRIBUTE_PATTERN.matcher(key); if (matcher.matches()) { final int parameterIndex = Integer.parseInt(matcher.group(1)); final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); if (!isNumeric) { throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral type"); } final int jdbcType = Integer.parseInt(entry.getValue()); final String valueAttrName = "sql.args." + parameterIndex + ".value"; final String parameterValue = attributes.get(valueAttrName); final String formatAttrName = "sql.args." + parameterIndex + ".format"; final String parameterFormat = attributes.containsKey(formatAttrName)? attributes.get(formatAttrName):""; try { setParameter(stmt, valueAttrName, parameterIndex, parameterValue, jdbcType, parameterFormat); } catch (final NumberFormatException nfe) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type", nfe); } catch (ParseException pe) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to a timestamp", pe); } catch (UnsupportedEncodingException uee) { throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted to UTF-8", uee); } } } } /** * Determines which relationship the given FlowFiles should go to, based on a transaction timing out or * transaction information not being present. If the FlowFiles should be processed and not transferred * to any particular relationship yet, will return null * * @param flowFiles the FlowFiles whose relationship is to be determined * @param transactionTimeoutMillis the maximum amount of time (in milliseconds) that we should wait * for all FlowFiles in a transaction to be present before routing to failure * @return the appropriate relationship to route the FlowFiles to, or null if the FlowFiles * should instead be processed */ Relationship determineRelationship(final List flowFiles, final Long transactionTimeoutMillis) { int selectedNumFragments = 0; final BitSet bitSet = new BitSet(); for (final FlowFile flowFile : flowFiles) { final String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); if (fragmentCount == null && flowFiles.size() == 1) { return null; } else if (fragmentCount == null) { getLogger().error("Cannot process {} because there are {} FlowFiles with the same fragment.identifier " + "attribute but not all FlowFiles have a fragment.count attribute; routing all to failure", new Object[] {flowFile, flowFiles.size()}); return REL_FAILURE; } final int numFragments; try { numFragments = Integer.parseInt(fragmentCount); } catch (final NumberFormatException nfe) { getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not an integer; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); return REL_FAILURE; } if (numFragments < 1) { getLogger().error("Cannot process {} because the fragment.count attribute has a value of '{}', which is not a positive integer; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentCount}); return REL_FAILURE; } if (selectedNumFragments == 0) { selectedNumFragments = numFragments; } else if (numFragments != selectedNumFragments) { getLogger().error("Cannot process {} because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } final String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR); if (fragmentIndex == null) { getLogger().error("Cannot process {} because the fragment.index attribute is missing; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } final int idx; try { idx = Integer.parseInt(fragmentIndex); } catch (final NumberFormatException nfe) { getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not an integer; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); return REL_FAILURE; } if (idx < 0) { getLogger().error("Cannot process {} because the fragment.index attribute has a value of '{}', which is not a positive integer; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile, fragmentIndex}); return REL_FAILURE; } if (bitSet.get(idx)) { getLogger().error("Cannot process {} because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier; " + "routing all FlowFiles with this fragment.identifier to failure", new Object[] {flowFile}); return REL_FAILURE; } bitSet.set(idx); } if (selectedNumFragments == flowFiles.size()) { return null; // no relationship to route FlowFiles to yet - process the FlowFiles. } long latestQueueTime = 0L; for (final FlowFile flowFile : flowFiles) { if (flowFile.getLastQueueDate() != null && flowFile.getLastQueueDate() > latestQueueTime) { latestQueueTime = flowFile.getLastQueueDate(); } } if (transactionTimeoutMillis != null) { if (latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) { getLogger().error("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: {}", new Object[] {flowFiles}); return REL_FAILURE; } } getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue"); return Relationship.SELF; // not enough FlowFiles for this transaction. Return them all to queue. } /** * Determines how to map the given value to the appropriate JDBC data type and sets the parameter on the * provided PreparedStatement * * @param stmt the PreparedStatement to set the parameter on * @param attrName the name of the attribute that the parameter is coming from - for logging purposes * @param parameterIndex the index of the SQL parameter to set * @param parameterValue the value of the SQL parameter to set * @param jdbcType the JDBC Type of the SQL parameter to set * @throws SQLException if the PreparedStatement throws a SQLException when calling the appropriate setter */ private void setParameter(final PreparedStatement stmt, final String attrName, final int parameterIndex, final String parameterValue, final int jdbcType, final String valueFormat) throws SQLException, ParseException, UnsupportedEncodingException { if (parameterValue == null) { stmt.setNull(parameterIndex, jdbcType); } else { switch (jdbcType) { case Types.BIT: case Types.BOOLEAN: stmt.setBoolean(parameterIndex, Boolean.parseBoolean(parameterValue)); break; case Types.TINYINT: stmt.setByte(parameterIndex, Byte.parseByte(parameterValue)); break; case Types.SMALLINT: stmt.setShort(parameterIndex, Short.parseShort(parameterValue)); break; case Types.INTEGER: stmt.setInt(parameterIndex, Integer.parseInt(parameterValue)); break; case Types.BIGINT: stmt.setLong(parameterIndex, Long.parseLong(parameterValue)); break; case Types.REAL: stmt.setFloat(parameterIndex, Float.parseFloat(parameterValue)); break; case Types.FLOAT: case Types.DOUBLE: stmt.setDouble(parameterIndex, Double.parseDouble(parameterValue)); break; case Types.DECIMAL: case Types.NUMERIC: stmt.setBigDecimal(parameterIndex, new BigDecimal(parameterValue)); break; case Types.DATE: stmt.setDate(parameterIndex, new Date(Long.parseLong(parameterValue))); break; case Types.TIME: stmt.setTime(parameterIndex, new Time(Long.parseLong(parameterValue))); break; case Types.TIMESTAMP: long lTimestamp=0L; if(LONG_PATTERN.matcher(parameterValue).matches()){ lTimestamp = Long.parseLong(parameterValue); }else { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); java.util.Date parsedDate = dateFormat.parse(parameterValue); lTimestamp = parsedDate.getTime(); } stmt.setTimestamp(parameterIndex, new Timestamp(lTimestamp)); break; case Types.BINARY: case Types.VARBINARY: case Types.LONGVARBINARY: byte[] bValue; switch(valueFormat){ case "": case "ascii": bValue = parameterValue.getBytes("ASCII"); break; case "hex": bValue = DatatypeConverter.parseHexBinary(parameterValue); break; case "base64": bValue = DatatypeConverter.parseBase64Binary(parameterValue); break; default: throw new ParseException("Unable to parse binary data using the formatter `" + valueFormat + "`.",0); } stmt.setBinaryStream(parameterIndex, new ByteArrayInputStream(bValue), bValue.length); break; case Types.CHAR: case Types.VARCHAR: case Types.LONGNVARCHAR: case Types.LONGVARCHAR: stmt.setString(parameterIndex, parameterValue); break; default: stmt.setObject(parameterIndex, parameterValue, jdbcType); break; } } } /** * A FlowFileFilter that is responsible for ensuring that the FlowFiles returned either belong * to the same "fragmented transaction" (i.e., 1 transaction whose information is fragmented * across multiple FlowFiles) or that none of the FlowFiles belongs to a fragmented transaction */ static class TransactionalFlowFileFilter implements FlowFileFilter { private String selectedId = null; private int numSelected = 0; private boolean ignoreFragmentIdentifiers = false; public boolean isFragmentedTransaction() { return !ignoreFragmentIdentifiers; } @Override public FlowFileFilterResult filter(final FlowFile flowFile) { final String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR); final String fragCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR); // if first FlowFile selected is not part of a fragmented transaction, then // we accept any FlowFile that is also not part of a fragmented transaction. if (ignoreFragmentIdentifiers) { if (fragmentId == null || "1".equals(fragCount)) { return FlowFileFilterResult.ACCEPT_AND_CONTINUE; } else { return FlowFileFilterResult.REJECT_AND_CONTINUE; } } if (fragmentId == null || "1".equals(fragCount)) { if (selectedId == null) { // Only one FlowFile in the transaction. ignoreFragmentIdentifiers = true; return FlowFileFilterResult.ACCEPT_AND_CONTINUE; } else { // we've already selected 1 FlowFile, and this one doesn't match. return FlowFileFilterResult.REJECT_AND_CONTINUE; } } if (selectedId == null) { // select this fragment id as the chosen one. selectedId = fragmentId; numSelected++; return FlowFileFilterResult.ACCEPT_AND_CONTINUE; } if (selectedId.equals(fragmentId)) { // fragment id's match. Find out if we have all of the necessary fragments or not. final int numFragments; if (NUMBER_PATTERN.matcher(fragCount).matches()) { numFragments = Integer.parseInt(fragCount); } else { numFragments = Integer.MAX_VALUE; } if (numSelected >= numFragments - 1) { // We have all of the fragments we need for this transaction. return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } else { // We still need more fragments for this transaction, so accept this one and continue. numSelected++; return FlowFileFilterResult.ACCEPT_AND_CONTINUE; } } else { return FlowFileFilterResult.REJECT_AND_CONTINUE; } } } /** * A simple, immutable data structure to hold a List of FlowFiles and an indicator as to whether * or not those FlowFiles represent a "fragmented transaction" - that is, a collection of FlowFiles * that all must be executed as a single transaction (we refer to it as a fragment transaction * because the information for that transaction, including SQL and the parameters, is fragmented * across multiple FlowFiles). */ private static class FlowFilePoll { private final List flowFiles; private final boolean fragmentedTransaction; public FlowFilePoll(final List flowFiles, final boolean fragmentedTransaction) { this.flowFiles = flowFiles; this.fragmentedTransaction = fragmentedTransaction; } public List getFlowFiles() { return flowFiles; } public boolean isFragmentedTransaction() { return fragmentedTransaction; } } /** * A simple, immutable data structure to hold a Prepared Statement and a List of FlowFiles * for which that statement should be evaluated. */ private static class StatementFlowFileEnclosure { private final PreparedStatement statement; private final List flowFiles = new ArrayList<>(); public StatementFlowFileEnclosure(final PreparedStatement statement) { this.statement = statement; } public PreparedStatement getStatement() { return statement; } public List getFlowFiles() { return flowFiles; } public void addFlowFile(final FlowFile flowFile) { this.flowFiles.add(flowFile); } @Override public int hashCode() { return statement.hashCode(); } @Override public boolean equals(final Object obj) { if (obj == null) { return false; } if (obj == this) { return false; } if (!(obj instanceof StatementFlowFileEnclosure)) { return false; } final StatementFlowFileEnclosure other = (StatementFlowFileEnclosure) obj; return statement.equals(other.getStatement()); } } }