Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

Pyspark dataframe: How to replace

avatar
New Member

I want to replace "," to "" with all column

for example

I want to replace "," to ""

should I do ?

+----+---+-----+------+

| _c0|_c1| _c2| _c3|

+----+---+-----+------+

|null| ,|12,34|1234,5|

+----+---+-----+------+

↓↓↓

+----+---+-----+------+

| _c0|_c1| _c2| _c3|

+----+---+-----+------+

|null||1234|12345|

+----+---+-----+------+

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Hi,

Please try changing

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

to

udf = UserDefinedFunction(lambda x: re.sub(',','',str(x)), StringType())

Some fields may not be string. So it throws exception.

View solution in original post

4 REPLIES 4

avatar
Super Collaborator

@Funamizu Koshi

import re

from pyspark.sql.functions import UserDefinedFunction

from pyspark.sql.types import *

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

new_df = df.select(*[udf(column).alias(column) for column in df.columns])

new_df.collect()

avatar
New Member

Thank you for your reply

But error was occurred.

Please tell me how to resolve. ..

>>> test = spark.read.csv("test/test.csv") 17/09/19 20:28:45 WARN DataSource: Error while looking for metadata directory.

>>> test.show() +----+---+-----+------+ | _c0|_c1| _c2| _c3| +----+---+-----+------+ |null| ,|12,34|1234,5| +----+---+-----+------+

>>> import re

>>>

>>> from pyspark.sql.functions import UserDefinedFunction

>>>

>>> from pyspark.sql.types import *

>>> udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

>>> new_df = test.select(*[udf(column).alias(column) for column in test.columns])

>>> new_df.collect()

17/09/19 20:29:38 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 30, lxjtddsap048.lixil.lan, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/09/19 20:29:38 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 391, in collect port = self._jdf.collectToPython() File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o529.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 33, lxjtddsap048.lixil.lan, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2760) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply(Dataset.scala:2757) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply(Dataset.scala:2757) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2780) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2757) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more

スクリーンショット-2017-09-19-午後202942-午後.png

avatar
Super Collaborator

Hi,

Please try changing

udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())

to

udf = UserDefinedFunction(lambda x: re.sub(',','',str(x)), StringType())

Some fields may not be string. So it throws exception.

avatar
New Member

Oh!,I could !!!

>>> new_df.collect()

[Row(_c0='None', _c1='', _c2='1234', _c3='12345')]

Very Thank you

and sorry for my poor English

I have to learn UDF and English!