Support Questions
Find answers, ask questions, and share your expertise

Using Map in PySpark to parse and assign column names

New Contributor

Here is what I am trying to do.

The input data looks like this(Tab seperated):

<code>12/01/2018 user1 123.123.222.111 23.3s
12/01/2018 user2 123.123.222.116 21.1s

The data is coming in through Kafka and is being parsed with the following code.

<code>kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kafkaStream.map(lambda x: x[1])
parsed_log = lines.flatMap(lambda line: line.split(" "))
                  .map(lambda item: ('key', {
                  'date': item['date'],
                  'user': item['user'],
                  'ip': item['ip'],
                  'duration': item['duration'],}))

The parsed logs should be in the following format:

<code>('key', {'date': 12/01/2018, 'user': user1, 'ip': 123.123.222.111, 'duration': 23.3s}) 
('key', {'date': 12/01/2018, 'user': user2, 'ip': 123.123.222.116, 'duration': 21.1s})

In my code the code lines for "lines" and "parsed_log" and not doing the job. Could you please let me know how to go about this.

2 REPLIES 2

Re: Using Map in PySpark to parse and assign column names

@Steven Suting

Please try this and let me know if it works for you:

vals = [
     "12/01/2018 user1 123.123.222.111 23.3s",
     "12/01/2018 user2 123.123.222.111 23.3s"
]
def map_lines(line):
    l = line.split(" ")
    return ('key',{'date':l[0],'user':l[1],'ip':l[2],'duration':l[3]})
lines = spark.sparkContext.parallelize(vals)
lines.map(lambda line: map_lines(line)).collect()
Result is:
[('key', {'date': '12/01/2018', 'ip': '123.123.222.111', 'duration': '23.3s', 'user': 'user1'}), ('key', {'date': '12/01/2018', 'ip': '123.123.222.111', 'duration': '23.3s', 'user': 'user2'})]

HTH

*** If you found this answer addressed your question, please take a moment to login and click the "accept" link on the answer.

Re: Using Map in PySpark to parse and assign column names

Hi @Steven Suting, please let me know if the above helped or not? If it helped please take a moment to login and click the "accept" link on the answer.