Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Using Map in PySpark to parse and assign column names

Highlighted

Using Map in PySpark to parse and assign column names

New Contributor

Here is what I am trying to do.

The input data looks like this(Tab seperated):

<code>12/01/2018 user1 123.123.222.111 23.3s
12/01/2018 user2 123.123.222.116 21.1s

The data is coming in through Kafka and is being parsed with the following code.

<code>kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kafkaStream.map(lambda x: x[1])
parsed_log = lines.flatMap(lambda line: line.split(" "))
                  .map(lambda item: ('key', {
                  'date': item['date'],
                  'user': item['user'],
                  'ip': item['ip'],
                  'duration': item['duration'],}))

The parsed logs should be in the following format:

<code>('key', {'date': 12/01/2018, 'user': user1, 'ip': 123.123.222.111, 'duration': 23.3s}) 
('key', {'date': 12/01/2018, 'user': user2, 'ip': 123.123.222.116, 'duration': 21.1s})

In my code the code lines for "lines" and "parsed_log" and not doing the job. Could you please let me know how to go about this.

2 REPLIES 2

Re: Using Map in PySpark to parse and assign column names

@Steven Suting

Please try this and let me know if it works for you:

vals = [
     "12/01/2018 user1 123.123.222.111 23.3s",
     "12/01/2018 user2 123.123.222.111 23.3s"
]
def map_lines(line):
    l = line.split(" ")
    return ('key',{'date':l[0],'user':l[1],'ip':l[2],'duration':l[3]})
lines = spark.sparkContext.parallelize(vals)
lines.map(lambda line: map_lines(line)).collect()
Result is:
[('key', {'date': '12/01/2018', 'ip': '123.123.222.111', 'duration': '23.3s', 'user': 'user1'}), ('key', {'date': '12/01/2018', 'ip': '123.123.222.111', 'duration': '23.3s', 'user': 'user2'})]

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Re: Using Map in PySpark to parse and assign column names

Hi @Steven Suting, please let me know if the above helped or not? If it helped please take a moment to login and click the "accept" link on the answer.

Don't have an account?
Coming from Hortonworks? Activate your account here