Member since
06-09-2018
9
Posts
2
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4393 | 06-11-2018 11:56 PM |
07-12-2018
10:22 PM
Hey Bryan thanks so much for taking the time! I think I'm almost there! The hint about the unicode issue helping me get past the first slew of errors. I seem to be running into a length one now however: @pandas_udf("array<string>")
def stringClassifier(lookupstring, first, last):
lookupstring = lookupstring.to_string().encode("utf-8")
first = first.to_string().encode("utf-8")
last = last.to_string().encode("utf-8")
#this part takes the 3 strings above and reaches out to another library to do a string match
result = process.extract(lookupstring, lookup_list, limit=4000)
match_list = [item for item in result if item[0].startswith(first) and item[0].endswith(last)]
result2 = process.extractOne(lookupstring, match_list)
if result2 is not None and result2[0][1] > 75:
answer = pd.Series(list(result2[0]))
return answer
else:
fail = ["N/A","0"]
return pd.Series(fail)
RuntimeError: Result vector from pandas_udf was not the required length: expected 1, got 2 I'm initially passing three strings as variables to the function which then get passed to another library. The result is a tuple which I covert to a list then to a pandas Series object. I'm curious how I can make a 2 item array object a length of 1 ..? I'm obviously missing some basics here. @Bryan C
... View more
07-11-2018
02:33 PM
Hi! I have a UDF that returns a tuple object: stringSchema = StructType([
StructField("fixedRoute", StringType(), False),
StructField("accuracy", IntegerType(), False)])
def stringClassifier(x,y,z):
... do some code
return (value1,value2)
stringClassifier_udf = udf(stringClassifier, stringSchema)
I use it in a dataframe like this: df = df.select(['route', 'routestring', stringClassifier_udf(x,y,z).alias('newcol')])
This works fine. I later split that tuple into two distinct columns. The UDF however does some string matching and is somewhat slow as it collects to the driver and then filters through a 10k item list to match a string. (it does this for every row). I've been reading about pandas_udf and Apache Arrow and was curious if running this same function would be possible with pandas_udf... or if this would be help improve the performance..? I think my hangup is that the return value of the UDF is a tuple item... here is my attempt: from pyspark.sql.functions import pandas_udf, PandasUDFType
stringSchema = StructType([
StructField("fixedRoute", StringType(), False),
StructField("accuracy", IntegerType(), False)])
@pandas_udf(stringSchema)
def stringClassifier(x,y,z):
... do some code
return (value1,value2)
Of course this is gives me errors and I've tried decorating the function with: @pandas_udf('list', PandasUDFType.SCALAR) My errors looks like this: NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(fixedRoute,StringType,false),StructField(accuracy,IntegerType,false))) is not supported Any idea if there is a way to make this work? Thanks!
... View more
06-17-2018
04:02 AM
Hi I've installed anaconda on my Hortonworks sandbox VM to try to use Python3. When I run a spark-submit file I'm met with this error: [root@sandbox ~]# spark-submit script.py
SPARK_MAJOR_VERSION is set to 2, using Spark2
File "/usr/bin/hdp-select", line 205
print "ERROR: Invalid package - " + name
^
SyntaxError: Missing parentheses in call to 'print'. Did you mean print("ERROR: Invalid package - " + name)?
ls: cannot access /usr/hdp//hadoop/lib: No such file or directory
Exception in thread "main" java.lang.IllegalStateException: hdp.version is not set while running Spark under HDP, please set through HDP_VERSION in spark-env.sh or add a java-opts file in conf with -Dhdp.version=xxx I've set these environment variables: export SPARK_MAJOR_VERSION=2
export PYSPARK_PYTHON=/root/miniconda3/bin/python I suspect from some other posts this error may have something to do with me trying to use Python 3 with spark submit? If this is the problem, is there a trick to make this work? [root@sandbox ~]# which python
/root/miniconda3/bin/python
[root@sandbox ~]# python -V
Python 3.6.5 :: Anaconda, Inc.
[root@sandbox ~]# Thanks!
... View more
Labels:
- Labels:
-
Apache Spark
06-15-2018
03:08 AM
Hi are there any tricks in reading a CSV into a dataframe and defining one of the columns as an array. Check it out, here is my CSV file: 1|agakhanpark,science centre,sunnybrookpark,laird,leaside,mountpleasant,avenue
2|agakhanpark,wynford,sloane,oconnor,pharmacy,hakimilebovic,goldenmile,birchmount All I want to do is transform this into a dataframe which would look something like: Col1 Col2
1 [agakhanpark,science centre,sunnybrookpark,laird,leaside,mountpleasant,avenue]
2 [agakhanpark,wynford,sloane,oconnor,pharmacy,hakimilebovic,goldenmile,birchmount] I'm able to define the dataframe was an array but when i go to show() I get a big long error. Here's the pyspark code data_schema = [StructField('id', IntegerType(), False),StructField('route', ArrayType(StringType()),False)]
final_struc = StructType(fields=data_schema)
spark = SparkSession.builder.appName('Alex').getOrCreate()
df = spark.read.option("delimiter", "|").csv('output2.csv',schema=final_struc)
df.show() Traceback (most recent call last): File "/Users/awitte/Documents/GitHub/cmx-hadoop-pipe/sparkProcess.py", line 20, in <module> df.show() File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/sql/dataframe.py", line 336, in show print(self._jdf.showString(n, 20)) File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/local/Cellar/apache-spark/2.2.0/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o35.showString. : java.lang.UnsupportedOperationException: CSV data source does not support array<string> data type. Any thoughts?
... View more
Labels:
- Labels:
-
Apache Hadoop
-
Apache Spark
06-11-2018
11:56 PM
Figured this out- my above approach worked whereby I can order the list and then just grab the last line in the list window. Like this: select *, last_value(station_arr) over (partition by journey order by point RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as route from (select point, id, station, journey, collect_set(station) over (partition by journey order by point) from source_table); This will take the ordered list and will populate that list to every row in the partition as requested above.
... View more
06-10-2018
03:37 PM
Hi Sunile, thanks for the post. I've been experimenting with ideas in the link you sent and still cannot get these silly arrays to be ordered correctly. My latest thought is around something like: select point, id, station, journey, collect_set(station) over (partition by journey order by point) as array from source_table; Which will give me an output like this: point id station journey array
1510764333 "000003825fc3200b8d541e8ee3844522" "St. George" 1 ["St. George"]
1510764342 "000003825fc3200b8d541e8ee3844522" "St. George" 1 ["St. George"]
1510764420 "000003825fc3200b8d541e8ee3844522" "Museum" 1 ["St. George","Museum"]
1510764448 "000003825fc3200b8d541e8ee3844522" "Museum" 1 ["St. George","Museum"]
1510764509 "000003825fc3200b8d541e8ee3844522" "Queens Park" 1 ["St. George","Museum","Queens Park"]
1510764779 "000003825fc3200b8d541e8ee3844522" "St. Andrew" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510704290 "0000650b988dabe0cc4d5923e9572548" "Museum" 2 ["Museum"]
1510704508 "0000650b988dabe0cc4d5923e9572548" "St. George" 2 ["Museum","St.George"]
Since the order of the last array per journey IS correct I'm trying to get the last_value(array) over (partition by journey) with the hopes of just getting the last value and adding it to each row. Although I can't seem to get it to work yet... Kind of a hacky solution though- surely there is a simpler way of doing this that I'm missing... select *, last_value(station_arr) over (partition by id,journey) from (select point, id, station, journey, collect_set(station) over (partition by journey order by point)
from source_table);
... View more
06-10-2018
03:07 AM
Hi! Here is a small excerpt of my table: select * from source_table; point id station journey
1510764333 "000003825fc3200b8d541e8ee3844522" "St. George" 1
1510764342 "000003825fc3200b8d541e8ee3844522" "St. George" 1
1510764420 "000003825fc3200b8d541e8ee3844522" "Museum" 1
1510764448 "000003825fc3200b8d541e8ee3844522" "Museum" 1
1510764509 "000003825fc3200b8d541e8ee3844522" "Queens Park" 1
1510764779 "000003825fc3200b8d541e8ee3844522" "St. Andrew" 1
1510764781 "000003825fc3200b8d541e8ee3844522" "St. Andrew" 1
1510704290 "0000650b988dabe0cc4d5923e9572548" "Museum" 2
1510704508 "0000650b988dabe0cc4d5923e9572548" "St. George" 2 <br> I would like all the stations per journey grouped in an array ordered from first to last. The result should look like this: point id station journey array
1510764333 "000003825fc3200b8d541e8ee3844522" "St. George" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510764342 "000003825fc3200b8d541e8ee3844522" "St. George" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510764420 "000003825fc3200b8d541e8ee3844522" "Museum" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510764448 "000003825fc3200b8d541e8ee3844522" "Museum" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510764509 "000003825fc3200b8d541e8ee3844522" "Queens Park" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510764779 "000003825fc3200b8d541e8ee3844522" "St. Andrew" 1 ["St. George","Museum","Queens Park","St. Andrew"]
1510704290 "0000650b988dabe0cc4d5923e9572548" "Museum" 2 ["Museum","St.George"]
1510704508 "0000650b988dabe0cc4d5923e9572548" "St. George" 2 ["Museum","St.George"]
Unfortunately I cannot get them into the right order. This code gives me the array but in the incorrect order (see below): select point, ID, station, journey,
collect_set(station) over (partition by journey) station_arr
from source_table order by journey, point; point id station journey array
1510764333 "000003825fc3200b8d541e8ee3844522" "St. George" 1 ["Museum","St. George","St. Andrew","Queens Park"]
1510764342 "000003825fc3200b8d541e8ee3844522" "St. George" 1 ["Museum","St. George","St. Andrew","Queens Park"]
1510764420 "000003825fc3200b8d541e8ee3844522" "Museum" 1 ["Museum","St. George","St. Andrew","Queens Park"]
1510764448 "000003825fc3200b8d541e8ee3844522" "Museum" 1 ["Museum","St. George","St. Andrew","Queens Park"]
1510764509 "000003825fc3200b8d541e8ee3844522" "Queens Park" 1 ["Museum","St. George","St. Andrew","Queens Park"]
1510764779 "000003825fc3200b8d541e8ee3844522" "St. Andrew" 1 ["Museum","St. George","St. Andrew","Queens Park"]
1510704290 "0000650b988dabe0cc4d5923e9572548" "Museum" 2 ["Museum","St.George"]
1510704508 "0000650b988dabe0cc4d5923e9572548" "St. George" 2 ["Museum","St.George"]
Is there a way to create this array and keep the order consistent to the source table? I don't remember having these issues on my sandbox VM and wonder if it has something to do with multiple reducers being involved..? Thanks
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Tez
06-10-2018
02:07 AM
1 Kudo
Shu thanks so much this did it! Appreciate your detailed answer!
... View more
06-09-2018
09:54 PM
1 Kudo
Hi, here's a sample of my table (source_table1)- all nicely ordered. select * from source_table1 limit 10;
1510764333 "000003825fc3200b8d541e8ee3844522" "St. George" "Upper Platform"
1510764342 "000003825fc3200b8d541e8ee3844522" "St. George" "Upper Platform"
1510764344 "000003825fc3200b8d541e8ee3844522" "St. George" "Upper Platform"
1510764360 "000003825fc3200b8d541e8ee3844522" "St. George" "Upper Platform"
1510764420 "000003825fc3200b8d541e8ee3844522" "Museum" "Museum Platform"
1510764448 "000003825fc3200b8d541e8ee3844522" "Museum" "Museum Platform"
1510764509 "000003825fc3200b8d541e8ee3844522" "Queens Park" "Platform"
1510764511 "000003825fc3200b8d541e8ee3844522" "Queens Park" "Platform"
1510764541 "000003825fc3200b8d541e8ee3844522" "Queens Park" "Platform"
1510764779 "000003825fc3200b8d541e8ee3844522" "St. Andrew" "St. Andrew Platform"
All I simply want to do is add a row column to the entire table (about 3 mil rows) but it seems to completely mess up the table's ordering, see below:
select row_number() over () as rowid,* from source_table1;
1 1510753659 "c647a9116db03135d0b419d4a6b8a141" "Osgoode" "Osgoode Platform"
2 1510768565 "bbab071f0f1a16e40b02a88fb99fc9f8" "Don Mills" "Don Mills Platfo
3 1510753683 "c647a9116db03135d0b419d4a6b8a141" "St. Andrew" "St. Andrew Platform"
4 1510768570 "bbab071f0f1a16e40b02a88fb99fc9f8" "Don Mills" "Don Mills Platform"
5 1510753685 "c647a9116db03135d0b419d4a6b8a141" "St. Andrew" "St. Andrew Platform"
6 1510768574 "bbab071f0f1a16e40b02a88fb99fc9f8" "Don Mills" "Don Mills Platform"
7 1510753687 "c647a9116db03135d0b419d4a6b8a141" "St. Andrew" "St. Andrew Platform"
8 1510768576 "bbab071f0f1a16e40b02a88fb99fc9f8" "Don Mills" "Don Mills Platform"
9 1510753691 "c647a9116db03135d0b419d4a6b8a141" "St. Andrew" "St. Andrew Platform"
10 1510768578 "bbab071f0f1a16e40b02a88fb99fc9f8" "Don Mills" "Don Mills Platform"
I've also tried things like: select row_number() over () as rowid,* from source_table1 cluster by baic_id, point; select row_number() over () as rowid,* from source_table1 sort by baic_id,point; etc ... although none of it makes a difference. Is there any way to take an existing ordered table and simple add a row column for the whole thing? This job is running in Tez which is using multiple reducers for this one task. Thanks!
... View more
Labels:
- Labels:
-
Apache Hive
-
Apache Tez