Created on 09-13-2017 01:48 PM - edited on 05-26-2020 08:39 AM by VidyaSargur
Web scraping is a technique used often times to extract data from websites for further processing. Many use cases involve data extraction from web sites as a prerequisite; the following are some examples:
Web scraping presents many challenges, and the complexity of the process to extract data from websites depends on the complexity of the website itself (ie. website structure, client-side frameworks, etcetera).
I tend to think of web scraping as a three-stage process:
There are many powerful and production ready open source web scrapers out there, and all of them excel at stages #1 and #2. However, most of them lack of facilities for “outcome dispatching”. Not to mention that they are not meant to be part of enterprise class architectures where security and governance are a must. Here it’s where Apache NiFi comes to the rescue, complementing open source web scrapers to bring a solid web scraping open source and enterprise class solution.
Scrapy claims to be an “open source and collaborative framework for extracting the data you need from websites in a fast, simple, yet extensible way”. Scrapy is a Python framework easy to install via pip, and it comes with a set of command line tools which are useful for developing and debugging.
Don’t worry if you’ve never used Scrapy, its documentation is very well-written and it has an 101 tutorial to get familiarized very quickly.
In this tutorial I’ll show you how to extract air quality data from the “Air Quality in Europe” web site. Unfortunately there is not a REST API available to easily pull that data out, and data has to be extracted directly from the “Current situation” page or RSS feeds:
Both resources are valid to extract the data we need, though I’ll use the first one as it gives us the information grouped by country, which will make processing easier afterwards. The “Current situation” page is going to be parsed and processed once an hour and, as a result, there will be a JSON document per city with the following schema:
{ country: <country_name>, city: <city_name>, roadside_idx: <value>, background_idx: <value>, date: <yyyyMMdd>, hour: <0-23> }
I’ll assume you have Scrapy already installed, otherwise check the installation guide and proceed accordingly. Once installed, the scrapy command line tool is available to facilitate the development and testing of spiders, the scripts dealing with the pages to produce to expected results.
The following 5 steps address the steps needed to accomplish our goal:
import scrapy import re class AQSpider(scrapy.Spider): name = "airqualitydata" start_urls = [ 'https://www.airqualitynow.eu/comparing_home.php' ] def build_datadoc (self, country, row, str_date, str_hour): return { 'country': country, 'city': row.xpath("./td[@class='city_bkg1' or @class='city_bkg2']/a/text()").extract_first(), 'roadside_idx': row.xpath("./td[5]/text()").extract_first(), 'background_idx': row.xpath("./td[9]/text()").extract_first(), 'date': str_date, 'hour': str_hour } def parse(self, response): country = None # 1. Extracting relevant sections from the page currentdate = response.xpath("/html/body/div/table[3]/tr/td/table[2]/tr/td[3]/table/tr[2]/td[4]/span/strong/text()").extract_first() tablerows = response.xpath("//*[@id='results']/tr") # 2. Format date accordingly to our requirements parsed_currentdate = re.search('([0-9]{2})/([0-9]{2})/([0-9]{4}) ([0-9]{2}):([0-9]{2})',currentdate) str_date = "%s%s%s" % (parsed_currentdate.group(3), parsed_currentdate.group(1), parsed_currentdate.group(2)) str_hour = parsed_currentdate.group(4) # 3. Row by row processing; only relevant data yielded for row in tablerows: class_attr = row.xpath('@class').extract_first() # 3.1 Only processing useful data (first rows skipped) if country != None or class_attr != None: if class_attr != None: # A new country starts country = row.xpath("./td/text()").extract_first() self.log("Start processing a new country: %s" % country) else: # Extract city info yield self.build_datadoc(country, row, str_date, str_hour)
FEED_FORMAT = 'jsonlines' FEED_URI = 'stdout:' LOG_LEVEL = 'ERROR'
In the introduction I mentioned that NiFi is very handy to dispatch the extracted data. As an enterprise class automation system to manage the flow of data between systems, it provides the following key features:
This section will outline how to create a NiFi Process Group able to jointly work with existing Scrapy spiders properly deployed on NiFi instances. Process Groups are abstractions encapsulating the details of the data flow, and can nest additional Process Groups if needed. By having multiple Process Groups, it’s easier to build, debug and understand complex data flows without affecting the performance.
Let’s build a super simple Process Group named “Air Quality Data Extractor” calling to our airqualitydata spider, and let’s connect it to a Kafka topic named airquality for further processing. This will involve only 5 steps:
Note that this is a very simple Process Group, but in a real case scenario we could be interested in registering a schema in the Schema Registry component within the Process Group.
This example just showcased how Apache NiFi can easily integrate itself with any external process throughout the ExecuteProcess processor. Even though the example was focused on data extraction from web sites, it can be applied to many other situations.
It’s very common to have many custom scripts or processes for very specific tasks, such as custom ETL scripts for data consolidation. You might think that in order to use NiFi to improve the automation and performance of those custom scripts, you need to migrate them all before starting. The reality is that you have the option of following the approach presented in this example in first place, and migrate those custom scripts to NiFi little by little.
The following resources have been used to elaborate this article:
Created on 03-07-2018 11:19 PM
First off, thanks a ton for this tutorial. I'm currently constructing a system which includes various spiders and this was a good starting point. When running the processors I noticed that it seems like ExecuteProcess only releases the output once the process (the spider) is done running. This leads to a single large FlowFile as opposed to what I expected: a single FlowFile for each extracted Scrapy Item (datadoc in your case). Do you know of a way to change this behavior? I could split up the FlowFile produced in to multiple (a JSON line each), but I feel like it would be a lot cleaner if this was the output of the Spider Processor in the first place. Any advice is appreciated.
Created on 03-11-2018 07:45 PM
Thanks for your kind words, I'm glad that you like my article.
Actually, the ExecuteProcess is meant to generate FlowFiles containing (size or time) batches as you'll realise checking its source code (this is the beauty of Open Source :)). Said this, if your goal is to feed FlowFiles with "single events" (a JSON document containing a parsed HTML page in my case), then you need to ensure that batches only have one event: by building events 4KB size or building events every "Batch Duration" nanoseconds (synchronise batch generation and batch ingestion will be challenging). To me this is tricky and I wouldn't recommend going down that road, and I'd think about building my own custom processor implementing the desired logic if this is what you really want to achieve.
In fact, what you are considering as a workaround (splitting up FlowFiles after ingestion) it would be the way I would recommend. Maybe it doesn't make very much sense in your scenario because the ExecuteProcess processor (doing web scraping) and the NiFi instance are in the same host, but if you take your initiative to the limit and start having hundreds of spiders, you will end up running the ExecuteProcess processor and NiFi instance in different hosts. Under this new scenario, the ExecuteProcess processor (via MiniFi) needs to send FlowFiles over the network, and as every FlowFile only contains one event it'll kill performance of the ingestion process (round trips to the server are expensive, and the more events you include in one FlowFile the better => throughput optimisation vs latency optimisation).
Hope this helps 🙂
Created on 03-13-2018 10:55 PM
Hi Raul,
It definitely helped. I'm still very much a beginner when it comes to Nifi (I come from a data science background).
I'm surprised that I didn't notice the Batch Duration property earlier, I'll play around with this. With regards to the "event size", where would this be set? I can't seem to find it within the processor.
If you could, would you mind explaining why it's tricky to "synchronise batch generation and batch ingestion"? Is this under the assumption that I have multiple Nifi instances running for my system?
The reason that I asked was because I was under the impression that Nifi was optimized for small chunks of data (thus single Scrapy items as FlowFiles sounded better than having the entire output of the spider as a FlowFile), but I suppose that this depends on the situation.
I'll most likely split it after ingestion as you recommended. As a sanity check: I am routing the output from my spider to a ExecuteStreamCommand processor (a Python script) which analyzes the data and adds a few extra properties to the JSON. These JSON lines are then fed in to a Postgres DB. Would it make sense to do this directly, or would you advise using Kafka in this system (I haven't yet as I haven't studied it yet)?
Thanks a ton. And as always, any advice is appreciated!
Created on 03-19-2018 07:58 AM
Hey Vincent,
Your script should be the responsible of building those events 4KB size. You could build a buffer 4KB size and add your event inside of it. Definitely this is far from optimal as: (1) if the event is small you're wasting space and (2) you can't have events bigger than 4KB as it would need a more complex logic (split before sending and joining afterwards).
I said that batch generation (managed by your Python script) and batch ingestion (handled by ExecuteProcess processor in NiFi) synchronisation is tricky, because each one belongs to different processes with their own internal timers triggering those time batches... processes synchronisation needs additional artefacts that will make things even more complicated. Definitely, don't go this way... I just mention it as an argument to justify applying your "workaround" which, to me, it's the way to go.
Regarding the size of data, NiFi is optimised to handle any kind of data (varying from small to large pieces of data) as outlined in the documentation. I highly recommend you to go over our documentation to understand how data is managed internally, and you'll see there are smart design decisions made when it comes to dealing with content and the metadata linked to it 🙂
Kafka makes sense when you want to avoid overkilling a destination system which doesn't process data as fast as it is generated (for example). If you think Postgres won't cope with ingestion pace (probably you've already realised that NiFi is very performant), then add Kafka in the equation. Any way, keep in mind that queues and back pressure capabilities in NiFi could be just enough for your project.
Best 😉