Created 09-19-2017 07:52 AM
I want to replace "," to "" with all column
for example
I want to replace "," to ""
should I do ?
+----+---+-----+------+
| _c0|_c1| _c2| _c3|
+----+---+-----+------+
|null| ,|12,34|1234,5|
+----+---+-----+------+
↓↓↓
+----+---+-----+------+
| _c0|_c1| _c2| _c3|
+----+---+-----+------+
|null||1234|12345|
+----+---+-----+------+
Created 09-19-2017 02:22 PM
Hi,
Please try changing
udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())
to
udf = UserDefinedFunction(lambda x: re.sub(',','',str(x)), StringType())
Some fields may not be string. So it throws exception.
Created 09-19-2017 08:34 AM
import re
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())
new_df = df.select(*[udf(column).alias(column) for column in df.columns])
new_df.collect()
Created 09-19-2017 11:37 AM
Thank you for your reply
But error was occurred.
Please tell me how to resolve. ..
>>> test = spark.read.csv("test/test.csv") 17/09/19 20:28:45 WARN DataSource: Error while looking for metadata directory.
>>> test.show() +----+---+-----+------+ | _c0|_c1| _c2| _c3| +----+---+-----+------+ |null| ,|12,34|1234,5| +----+---+-----+------+
>>> import re
>>>
>>> from pyspark.sql.functions import UserDefinedFunction
>>>
>>> from pyspark.sql.types import *
>>> udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())
>>> new_df = test.select(*[udf(column).alias(column) for column in test.columns])
>>> new_df.collect()
17/09/19 20:29:38 WARN TaskSetManager: Lost task 0.0 in stage 12.0 (TID 30, lxjtddsap048.lixil.lan, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/09/19 20:29:38 ERROR TaskSetManager: Task 0 in stage 12.0 failed 4 times; aborting job Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 391, in collect port = self._jdf.collectToPython() File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o529.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 33, lxjtddsap048.lixil.lan, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1435) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2760) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply(Dataset.scala:2757) at org.apache.spark.sql.Dataset$anonfun$collectToPython$1.apply(Dataset.scala:2757) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2780) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2757) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 174, in main process() File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 169, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 220, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 138, in dump_stream for obj in iterator: File "/usr/hdp/current/spark2-client/python/pyspark/serializers.py", line 209, in _batched for item in iterator: File "<string>", line 1, in <lambda> File "/usr/hdp/current/spark2-client/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 1, in <lambda> File "/usr/lib64/python3.4/re.py", line 179, in sub return _compile(pattern, flags).sub(repl, string, count) TypeError: expected string or buffer at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRDD.scala:193) at org.apache.spark.api.python.PythonRunner$anon$1.<init>(PythonRDD.scala:234) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144) at org.apache.spark.sql.execution.python.BatchEvalPythonExec$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:797) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more
Created 09-19-2017 02:22 PM
Hi,
Please try changing
udf = UserDefinedFunction(lambda x: re.sub(',','',x), StringType())
to
udf = UserDefinedFunction(lambda x: re.sub(',','',str(x)), StringType())
Some fields may not be string. So it throws exception.
Created 09-20-2017 02:01 AM
Oh!,I could !!!
>>> new_df.collect()
[Row(_c0='None', _c1='', _c2='1234', _c3='12345')]
Very Thank you
and sorry for my poor English
I have to learn UDF and English!