Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Pyspark dataframe: How to replace

Solved Go to solution

Pyspark dataframe: How to replace

New Contributor

I want to replace "," to "" with all column

for example

I want to replace "," to ""

should I do ?

+----+---+-----+------+

| _c0|_c1| _c2| _c3|

+----+---+-----+------+

|null| ,|12,34|1234,5|

+----+---+-----+------+

↓↓↓

+----+---+-----+------+

| _c0|_c1| _c2| _c3|

+----+---+-----+------+

|null||1234|12345|

+----+---+-----+------+

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Pyspark dataframe: How to replace

Expert Contributor

Hi,

Please try changing

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

to

udf = UserDefinedFunction(lambda x: re.sub(',','',str(x)), StringType())

Some fields may not be string. So it throws exception.

4 REPLIES 4

Re: Pyspark dataframe: How to replace

Expert Contributor

@Funamizu Koshi

import re

from pyspark.sql.functions import UserDefinedFunction

from pyspark.sql.types import *

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

new_df = df.select(*[udf(column).alias(column) for column in df.columns])

new_df.collect()

Re: Pyspark dataframe: How to replace

New Contributor

Thank you for your reply

But error was occurred.

Please tell me how to resolve. ..

>>> test = spark.read.csv("test/test.csv") 17/09/19 20:28:45 WARN DataSource: Error while looking for metadata directory.

>>> test.show() +----+---+-----+------+ | _c0|_c1| _c2| _c3| +----+---+-----+------+ |null| ,|12,34|1234,5| +----+---+-----+------+

>>> import re

>>>

>>> from pyspark.sql.functions import UserDefinedFunction

>>>

>>> from pyspark.sql.types import *

>>> udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

>>> new_df = test.select(*[udf(column).alias(column) for column in test.columns])

>>> new_df.collect()

17/09/19 20:29:38 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 30, lxjtddsap048.lixil.lan, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/09/19 20:29:38 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 391, in collect port = self._jdf.collectToPython() File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o529.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 33, lxjtddsap048.lixil.lan, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2760) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply(Dataset.scala:2757) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply(Dataset.scala:2757) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2780) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2757) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more

スクリーンショット-2017-09-19-午後202942-午後.png

Re: Pyspark dataframe: How to replace

Expert Contributor

Hi,

Please try changing

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

to

udf = UserDefinedFunction(lambda x: re.sub(',','',str(x)), StringType())

Some fields may not be string. So it throws exception.

Re: Pyspark dataframe: How to replace

New Contributor

Oh!,I could !!!

>>> new_df.collect()

[Row(_c0='None', _c1='', _c2='1234', _c3='12345')]

Very Thank you

and sorry for my poor English

I have to learn UDF and English!