Member since
08-08-2024
49
Posts
5
Kudos Received
2
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 474 | 09-08-2025 01:12 PM | |
| 545 | 08-22-2025 03:01 PM |
10-06-2025
01:18 PM
Hello @Brenda99, The question is very wide, there are many things that can help to improve the performance. Some basic recomendations are documented here: https://docs.cloudera.com/cdp-private-cloud-base/7.3.1/tuning-spark/topics/spark-admin_spark_tuning.html Take a look on the documentation, that could help you. Also, it will worth to talk with the team in charge of your account to found deeper performance tuning analysis.
... View more
09-18-2025
10:09 AM
Yes, you're right. Looks like Java Kerberos makes the applications to not always have an application name that we can use here. I was reading about other option that makes the processes to fallback from one to another enctype. But that will need to have "allow_weak_crypto = true" and as you mentioned that is not possible in your scenario. Not sure if what you need is possible somehow.
... View more
09-15-2025
09:56 PM
Hello @asand3r, Glad to see you on the community. Directly on NiFi you cannot specify the those encryptions per processor. What comes to my mind is to configure per realm user, this should work. In the krb5.conf you can tell specifically for each realm user, something like this: [appdefaults] hdfs = { default_tgs_enctypes = arcfour-hmac-md5 aes256-cts-hmac-sha1-96 aes128-cts-hmac-sha1-96 permitted_enctypes = arcfour-hmac-md5 aes256-cts-hmac-sha1-96 aes128-cts-hmac-sha1-96 } This will target any application using a principal with 'hdfs' in its name. You may need to be more specific in some cases, for example, using the full principal name. In your NiFi HDFS processors, you'll need to set the Kerberos Principal property to a value that matches the [appdefaults] section.
... View more
09-15-2025
09:44 PM
Hello @Jack_sparrow That should be possible. You don't need to manually specify partitions or HDFS paths; Spark handles this automatically when you use a DataFrameReader. First, you will need to read the source table using "spark.read.table()". Since table is a Hive partitioned table, Spark will automatically discover and read all 100 partitions in parallel, as long as you have enough executors and cores available. Then, Spark creates a logical plan to read the data. Repartition the data is next, To ensure you have exactly 10 output partitions and to control the parallelism for the write operation, you can use the "repartition(10)" method. This will shuffle the data to create 10 new partitions, which will be processed by 10 different tasks. And then, write the table. Use "write.saveAsTable()". You must specify the format using ".format("parquet")."
... View more
09-11-2025
04:33 PM
Hello @ShellyIsGolden, Glad to see you in our community. Welcome! ChatGPT was not that wrong (😆), in fact that makes sense. The PostgreSQL documentation refers that method as possible: String url = "jdbc:postgresql://localhost:5432/postgres?options=-c%20search_path=test,public,pg_catalog%20-c%20statement_timeout=90000"; https://jdbc.postgresql.org/documentation/use/#connection-parameters Have you tested the JDBC connection outside of NiFi? Maybe with psql command like this: psql -d postgresql://myurl:5432/mydatabase?options=-c%20search_path=myschema,public&stringtype=unspecified Also, check with your PG team to see if that connect string is possible and test more on that side.
... View more
09-11-2025
02:37 PM
Hello @ariajesus, Welcome to our community. Glad to see you here. How did you create the resource? As a File Resource or as a Python Environment? Here are the steps how you can create it: https://docs.cloudera.com/data-engineering/1.5.4/use-resources/topics/cde-create-python-virtual-env.html
... View more
09-11-2025
11:04 AM
Hello @Jack_sparrow, Spark should automatically do it, you can control that with these settings: Input splits are controlled by spark.sql.files.maxPartitionBytes (default 128MB). If smaller, more splits or parallel tasks will be executed. spark.sql.files.openCostInBytes (default 4MB) influences how Spark coalesces small files. Shuffle parallelism spark.sql.shuffle.partitions (default 200). Configiure around 2–3 times per total executor cores. Also, make sure df.write.parquet() doesn’t set everything into few files only. For that, you can use .repartition(n) to increase the parallelism before writing.
... View more
09-09-2025
09:26 AM
Hello @Jack_sparrow, Glad to see you again in the forums. 1. The resource allocation is something kind of complicated to tell, because it depends in a lot of factors. Take in mind how big is your data, how much memory you have on the cluster, do not forget the overhead and other things. There is very useful information here: https://docs.cloudera.com/cdp-private-cloud-base/7.3.1/tuning-spark/topics/spark-admin-tuning-resource-allocation.html Under that parent section there is more tuning suggestions on each topic. 2. From the second option I understand that you want to read the data separately using each type. That should be possible with something like this: if input_path.endswith(".parquet"): df = spark.read.parquet(input_path) elif input_path.endswith(".orc"): df = spark.read.orc(input_path) elif input_path.endswith(".txt") or input_path.endswith(".csv"): df = spark.read.text(input_path) # o .csv con opciones else: raise Exception("Unsupported file format") Then, you can handle each data in a separate way. 3. The data movement should avoid going to the driver, to avoid issues and extra work, so collect() or .toPandas() are not the best options. If you want to move data without transformations, distcp should be a good option. To write you can use this: df.write.mode("overwrite").parquet("ofs://ozone/path/out") And other suggestions can be tuning the partitions with "spark.sql.files.maxPartitionBytes" and change the compression to snappy using "spark.sql.parquet.compression.codec".
... View more
09-08-2025
01:12 PM
1 Kudo
Hello @Jack_sparrow, Glad to see you on the Community. As far as I know, df.write is not possible to be used on an rdd.foreach or rdd.foreachpartition. The reason is because df.write is a driver-side action, it triggers a Spark job. rdd.foreach or rdd.foreachpartition are executors, and executors cannot trigger jobs. Check these references: https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-foreachpartition-foreach https://stackoverflow.com/questions/46964250/nullpointerexception-creating-dataset-dataframe-inside-foreachpartition-foreach https://sparkbyexamples.com/spark/spark-foreachpartition-vs-foreach-explained The option that looks like it works for you is this: df.write.partitionBy Something like this: df.write.partitionBy("someColumn").parquet("/path/out")
... View more
08-27-2025
01:26 PM
Hi @MattWho, I think you tagged the wrong person. @yoonli, take a look on @MattWho update.
... View more
- « Previous
-
- 1
- 2
- Next »