Member since
01-19-2017
3651
Posts
623
Kudos Received
364
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
172 | 12-22-2024 07:33 AM | |
109 | 12-18-2024 12:21 PM | |
428 | 12-17-2024 07:48 AM | |
298 | 08-02-2024 08:15 AM | |
3578 | 04-06-2023 12:49 PM |
12-20-2024
07:52 AM
2 Kudos
@nifier 3 weeks old posting but I still hope it help resolve your reporting task.NiFi has a built-in Data Provenance feature that tracks the lineage of data as it moves through the flow. To capture file transfer details: 1. Enable Provenance Reporting in NiFi Provenance Events: NiFi records events such as SEND, RECEIVE, DROP, and ROUTE. For SFTP file transfers, look for SEND events. Steps to Enable Provenance Reporting: Log in to the NiFi UI. Go to the Provenance tab (accessible from the top menu). Configure the Provenance Repository in nifi.properties to store sufficient nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository nifi.provenance.repository.max.storage.time=30 days nifi.provenance.repository.max.storage.size=1 GB Ensure the max.storage.time or max.storage.size is configured to retain events for the desired reporting period. 2. Query Provenance Data You can query and filter provenance events to generate the report: Go to the Provenance tab in the NiFi UI. Filter the events using criteria: Component Name: Filter for the SFTP processor (PutSFTP). Event Type: Select SEND. Date Range: Specify the desired time frame. Download the filtered results as a CSV file. 3. Automate Reporting with NiFi Reporting Tasks To generate periodic reports automatically: Use the SiteToSiteProvenanceReportingTask: In the NiFi canvas, navigate to Controller Settings. Add a new Reporting Task and select SiteToSiteProvenanceReportingTask. Configure the Reporting Task to: Specify the target location for the report. Filter for SEND events related to your file transfer processors. Schedule the Reporting Task to run periodically (e.g., daily or weekly). 4. Include File Name and Transfer Status NiFi provenance events include metadata such as the file name, size, and transfer status: File Name: Captured in the filename attribute. Transfer Status: Success: The event is logged with SEND. Failure: Look for errors or failed processor logs (use a LogMessage processor to capture failure events in flow). 5. Alternative: Push Logs to External Tools You can push the provenance data to an external system for detailed analysis and reporting: Elasticsearch/Kibana: Use the PutElasticsearch processor to send provenance events to Elasticsearch and visualize them in Kibana. Custom Script: Use the ExecuteScript processor to write a Python or Groovy script to extract, filter, and format the provenance data into a report. 6. Sample Workflow for Reporting Use a QueryProvenance processor to fetch provenance events for the desired period. Filter for SEND events from the SFTP processor. Route successful and failed events to different processors (PutFile for saving logs). Format the report (CSV/JSON) using processors like UpdateAttribute and ConvertRecord. By combining these steps, you can efficiently generate a report for all file transfers in the given period, including file names and transfer statuses.
... View more
12-19-2024
11:07 PM
1 Kudo
@enam I see you have used the correct InvokeHTTP processor which is used to send API requests and handle responses, including error codes 400, 500 etc. Configuration for InvokeHTTP: Set Required Properties: HTTP Method: POST Remote URL: http://192.168.200.162:2031/nostroliquidity........ Return Code: Ensure Success Codes includes only 2xx responses by default. To enable Error Responses add the following settings: "Always Output Response": Set to true. Output Response Attributes: Include attributes such as status.code and status.message to capture response metadata. This ensures that the processor outputs responses regardless of status code. Now route Responses Based on Status Code Use the RouteOnAttribute processor to differentiate between success and error responses add two conditions: Success Route: Check for status.code >= 200 AND status.code < 300. Error Route: Check for status.code >= 400. Connect the InvokeHTTP processor's Response relationship to the RouteOnAttribute processor. To write Responses to Appropriate Locations Use a PutFile or PutHDFS processor for both success and error routes: Success Route: Write successful responses to a specific directory. Error Route: Write error responses 400, 500 to a separate directory. Include response details for debugging. GenerateFlowFile --> InvokeHTTP --> RouteOnAttribute --> [Success] PutFile --> [Error] PutFile Important Configuration Notes Configure Failure Handling in InvokeHTTP: Connect the InvokeHTTP processor’s Original relationship to a LogMessage processor or another flow to avoid losing the original flowfile. Customize Filenames or Attributes: Use UpdateAttribute to set filenames or directories dynamically based on attributes like status.code or status.message. Capture Full API Responses: Ensure that Response Body from the API is written as the content of the output file. Can you run the above flow and revert Happy hadooping
... View more
12-19-2024
09:08 AM
@Velankanni If you are still having the problem you can you try this JOLT remember the spoiler tag distorts the JSON JOLT Spec [ { "operation": "shift", "spec": { "data": { "getItemListing": { "edges": { "*": { "node": { "identifier": "[&1].identifier", "parentItems": { "*": { "parentIdentifier": "[&3].[&1].parentIdentifier" } }, "treatmentClusterIds": { "*": { "metadata": { "*": { "treatmentClusterIDs": "[&4].[&2].[&1].TreatmentId" } }, "element": { "name": "[&4].[&2].[&1].Element", "labelingClusters": { "*": { "labelingCluster": "[&5].[&3].[&2].LabelingCluster" } } } } } } } } } } } }, { "operation": "shift", "spec": { "*": { "*": { "*": { "*": { "*": { "*": { "$": "[#6].identifier", "parentIdentifier": "[#6].parentIdentifier", "Element": "[#6].Element", "TreatmentId": "[#6].TreatmentId", "LabelingCluster": "[#6].LabelingCluster" } } } } } } } } ] Place the Input JSON in a file input.json. Use a JOLT processor Apply the spec above and verify the output matches the Output JSON format. Happy hadooping
... View more
12-19-2024
08:44 AM
@Abhijith_Nayak To achieve the desired behavior where Impala queries automatically run in the respective resource pools based on the user, you can configure Impala Admission Control to handle this routing seamlessly. Here's how you can implement it. 1. Enable Resource Pools in Cloudera Manager Ensure that Dynamic Resource Pools are enabled in Cloudera Manager: Go to Cloudera Manager > Impala > Configuration > Admission Control. Enable Admission Control if it isn’t already. 2. Create Resource Pools You already have two resource pools (resource_pool_1 and resource_pool_2). Ensure these pools are properly configured: resource_pool_1: For users A, B, and C. resource_pool_2: For users D and E. Verify the following settings for each pool: Memory and CPU resources are allocated appropriately for the expected workloads. Query concurrency limits are set based on your cluster’s capacity. 3. Configure Submission Access Control Map the users to their respective resource pools: Navigate to Cloudera Manager > Impala > Configuration > Admission Control > Resource Pools. For resource_pool_1, under Submission Access Control, add: A,B,C 3. For resource_pool_2, under Submission Access Control, add D,E This ensures that only the specified users can submit queries to their designated pools. Configure Default Resource Pool Selection Use the pool_mapping configuration to automatically route queries based on the user. This eliminates the need for users to specify the pool explicitly when submitting queries. Steps: Navigate to Cloudera Manager > Impala > Configuration > Admission Control > Pool Mapping Rules. Add rules to map users to their pools user:A pool:resource_pool_1 user:B pool:resource_pool_1 user:C pool:resource_pool_1 user:D pool:resource_pool_2 user:E pool:resource_pool_2 5. Validate the Configuration Restart the Impala services to apply the changes. Run test queries as each user to confirm the routing works as expected: Log in as user A, B, C, D, and E and execute queries without specifying the resource pool. Use the following command in the Impala shell to check the assigned resource pool for a query: PROFILE; Look for the Admission result section to verify the query ran in the correct pool. Expected Outcome Users A, B, and C will have their queries automatically routed to resource_pool_1. Users D and E will have their queries automatically routed to resource_pool_2. No manual pool specification will be required during query submission. This configuration ensures proper workload isolation and efficient resource utilization in your cluster. Let me know if further clarification is needed Happy hadooping !!!
... View more
12-18-2024
12:21 PM
1 Kudo
@allen_chu Maybe I didn't understand the question well but here are the differences and explanation to help you understand and configure the 2 options correctly 1. Difference Between spark_shuffle and spark2_shuffle spark_shuffle Used for Apache Spark 1.x versions. Refers to the shuffle service for older Spark releases that rely on the original shuffle mechanism. Declared in YARN Node Manager's configuration (yarn-site.xml): <property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property> spark2_shuffle Introduced for Apache Spark 2.x and later versions. Handles shuffle operations for newer Spark versions, which have an updated shuffle mechanism with better performance and scalability. Declared similarly in yarn-site.xml: <property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark2_shuffle</value>
</property> 2. Why Two Shuffle Services? Backward Compatibility: spark_shuffle is retained for Spark 1.x jobs to continue running without modifications. Separate Service: spark2_shuffle ensures that jobs running on Spark 2.x+ use an optimized and compatible shuffle service without interfering with Spark 1.x jobs. Upgrade Path: In clusters supporting multiple Spark versions, both shuffle services may coexist to support jobs submitted using Spark 1.x and Spark 2.x simultaneously. 3. Configuration in YARN To enable the shuffle service for both versions, configure the NodeManager to start both services: <property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle,spark2_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark2_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property> 4. Key Points Use spark_shuffle for jobs running with Spark 1.x. Use spark2_shuffle for jobs running with Spark 2.x or later. In modern setups, spark2_shuffle is the primary shuffle service since Spark 1.x is largely deprecated. Happy hadooping
... View more
12-18-2024
08:50 AM
@drewski7 I have just picked your ticket I hope I can help you resolve this issue if its still unresolved. There are are couple of configurations changes and implementations that have to done. 1. Overview OAuth allows Kafka clients to obtain access tokens from an external authentication provider like OAuth providers to authenticate with the Kafka broker. This process involves configuring the Kafka broker, OAuth provider, and Kafka clients. 2. Prerequisites Kafka cluster with SASL/OAUTHBEARER enabled. An OAuth provider set up to issue access tokens. Kafka clients that support SASL/OAUTHBEARER. Required libraries for OAuth integration (e.g. kafka-clients, oauth2-client, or keycloak adapters). 3. Procedure Step 1: Configure the OAuth Provider Set up an OAuth provider (e.g., Keycloak, Okta, etc.) to act as the identity provider (IdP). Register a new client application for Kafka in the OAuth provider: Set up client ID and client secret for Kafka clients. Configure scopes, roles, or claims required for authorization. Enable grant types like Client Credentials or Password (depending on your use case). Note down the following details: Authorization Server URL (e.g.https://authlogin.northwind.com/token). Client ID and Client Secret. Step 2: Configure the Kafka Broker Enable SASL/OAUTHBEARER Authentication: Edit the Kafka broker configuration (/config/server.properties) sasl.enabled.mechanisms=OAUTHBEARER listener.name.<listener-name>.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.token.endpoint.uri="https://auth.example.com/token" \ oauth.client.id="kafka-broker-client-id" \ oauth.client.secret="kafka-broker-client-secret" \ oauth.scope="kafka-scope"; Replace <listener-name> with (SASL_PLAINTEXT, SASL_SSL) as appropriate. Configure ACLs (Optional): If using authorization, configure ACLs to grant specific permissions to authenticated users. Restart the Kafka Broker: Restart the Kafka broker to apply the changes sudo systemctl restart kafka Step 3: Configure the Kafka Client Add required dependencies to your Kafka client application: For Java applications, add the Kafka and OAuth dependencies to your pom.xml or build.gradle. pom.xml example <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.nimbusds</groupId> <artifactId>oauth2-oidc-sdk</artifactId> <version>9.4</version> </dependency> 2. Configure OAuth in the Kafka Client: Specify the SASL mechanism and the OAuth token endpoint in the client configuration Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "OAUTHBEARER"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + "oauth.token.endpoint.uri=\"https://auth.example.com/token\" " + "oauth.client.id=\"kafka-client-id\" " + "oauth.client.secret=\"kafka-client-secret\";"); 3. Implement Token Retrieval (Optional): Use an external tool or library to retrieve and manage tokens if you need a custom implementation. curl -X POST -d "grant_type=client_credentials&client_id=kafka-client-id&client_secret=kafka-client-secret" \ https://auth.example.com/token 4. Create the Kafka Producer/Consumer: Use the above configuration to initialize a Kafka producer or consumer KafkaProducer<String, String> producer = new KafkaProducer<>(props); Step 4: Test the Authentication Produce and consume messages to verify OAuth-based authentication: kafka-console-producer.sh --broker-list <broker-address> --topic <topic-name> --producer.config <client-config> kafka-console-consumer.sh --bootstrap-server <broker-address> --topic <topic-name> --consumer.config <client-config> Ensure logs indicate successful authentication using SASL/OAUTHBEARER. Step 5: Monitor and Debug Check Kafka broker logs for errors related to OAuth authentication. Verify token expiration and renewal mechanisms. Ensure the OAuth provider is reachable from the Kafka brokers and clients. Happy Hadooping I hope the above steps helps in the diagnosis and resolution of you Kafka OAuth issue
... View more
12-17-2024
12:41 PM
1 Kudo
@JSSSS The error is this "java.io.IOException: File /user/JS/input/DIC.txt._COPYING_ could only be written to 0 of the 1 minReplication nodes. There are 3 datanode(s) running and 3 node(s) are excluded in this operation." All the 3 datanode according to the log are excludeNodes=[192.168.1.81:9866, 192.168.1.125:9866, 192.168.1.8> with replication factor of 3 , writes should succeed to all the 3 datanodes else the write fails. The cluster may have under-replicated or unavailable blocks due to excluded nodes HDFS cannot use these nodes, possibly due to: Disk space issues. Write errors or disk failures. Network connectivity problems between the NameNode and DataNodes. 1. Verify if the DataNodes are live and connected to the NameNode hdfs dfsadmin -report Look for the "Live nodes" and "Dead nodes" section If all 3 DataNodes are excluded, they might show up as dead or decommissioned. Ensure the DataNodes have sufficient disk space for the write operation df -h Look at the HDFS data directories (/hadoop/hdfs/data) If disk space is full, clear unnecessary files or increase disk capacity hdfs dfs -rm -r /path/to/old/unused/files View the list of excluded nodes cat $HADOOP_HOME/etc/hadoop/datanodes.exclude If nodes are wrongly excluded: Remove their entries from datanodes.exclude. Refresh the NameNode to apply changes hdfs dfsadmin -refreshNodes Block Placement Policy: If the cluster has DataNodes with specific restrictions (e.g., rack awareness), verify the block placement policy grep dfs.block.replicator.classname $HADOOP_HOME/etc/hadoop/hdfs-site.xml Default: org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault Happy hadooping
... View more
12-17-2024
11:53 AM
@Sid17 Can you try this JOLT [ { "operation": "shift", "spec": { "data": { "getCarListing": { "edges": { "*": { "node": { "carClusterIds": { "*": { "element": { "name": "[].element" }, "businessRelations": { "*": { "countries": { "*": { "countryCode": "[].businessRelations[].countryCode" } } } } } } } } } } } } }, { "operation": "cardinality", "spec": { "[]": "ONE" } } ] Hope it works
... View more
12-17-2024
10:31 AM
@denysobukhov If this issue hasn't been resolved I am suspecting the HS2 idle Timeout and Thread pool size. Can you please do the below and share the out come. 1. Address Server-Side Resource or Timeout Issues Increase HiveServer2 Idle Timeout By default, HiveServer2 may close idle connections after a certain period. Increase this timeout: Update the HiveServer2 config: hive.server2.idle.session.timeout (default: 600000 ms / 10 minutes). Set it to a larger value, e.g., 3600000 (1 hour). hive.server2.idle.operation.timeout (default: 5 minutes for operations). Increase to match your app's use case. SET hive.server2.idle.session.timeout=3600000; SET hive.server2.idle.operation.timeout=3600000; Adjust Thread Pool Size If HiveServer2 runs out of threads to handle requests, it can drop connections: Increase hive.server2.threads to a higher value in HiveServer2 configurations. Restart HiveServer2 after changes. First check the default hive.server2.thrift.max.worker.threads jstack -l <HiveServere2_ProccessId> | grep ".Thread.Stat" | wc -l Happy hadooping
... View more
12-17-2024
10:14 AM
@Viki_Nodejs if you haven't resolved this issue could you try the below steps and revert. 1. Install the Required NPM Packages Use the hive-driver package for Node.js, which supports HiveServer2 over HTTP/HTTPS. npm install hive-driver 2. Prerequisites Ensure you have: HiveServer2 URL: Includes the hostname and port. SSL Configuration: Paths to your .jks trust store and its password. Hive httppath: Set to cliservice. Authentication details (if required): Username/password or Kerberos configuration. 3. Configure the Connection Here's an example of how to set up the connection using the hive-driver: const { HiveClient, TCLIServiceTypes } = require('hive-driver'); async function connectToHive() { const client = new HiveClient(TCLIServiceTypes); // Configure the Hive connection const connection = client.connect({ host: '<HIVE_SERVER_HOSTNAME>', // e.g., hive.example.com port: 10001, // HiveServer2 port, typically 10001 for HTTPS options: { path: '/cliservice', // HTTP path to HiveServer2 ssl: true, // Enable SSL sslOptions: { rejectUnauthorized: true, // Ensure certificates are verified ca: '<path/to/truststore.pem>' // Convert your JKS truststore to PEM format }, // Authentication username: '<YOUR_USERNAME>', password: '<YOUR_PASSWORD>', // You can add session configurations here } }); try { // Open the connection await connection.openSession(); console.log('Connected to Hive'); // Example query const result = await connection.executeStatement('SELECT * FROM your_table LIMIT 10'); console.log(result); // Close the session await connection.closeSession(); } catch (error) { console.error('Error connecting to Hive:', error); } finally { // Ensure the connection is closed await connection.close(); } } connectToHive(); 4. Key Point to Note !!!!!!!!! SSL Truststore [Very Important] Hive uses .jks files for its truststore, but hive-driver requires a .pem file for SSL. Convert your .jks file to .pem using the following commands: keytool -importkeystore -srckeystore truststore.jks -destkeystore truststore.p12 -deststoretype PKCS12 openssl pkcs12 -in truststore.p12 -out truststore.pem -nokeys I also saw an EAI_FAIL error in the screenshot this is related to not being able to resolve the DNS. Hope this helps
... View more