Created on 01-15-2017 05:42 PM - edited 09-16-2022 01:38 AM
Raspberry PIs and other small devices often have cameras or can have camera's attached. Raspberry Pi's have cheap camera add-ons that can ingest still images and videos (https://www.raspberrypi.org/products/camera-module/). Using a simple Python script we can ingest images and then ingest them into our central Hadoop Data Lake. This is a nice simple use case for Connected Data Platforms with both Data in Motion and Data at Rest. This data can be processed in-line with Deep Learning Libraries like TensorFlow for image recognition and assessment. Using OpenCV and other tools we can process in-motion and look for issues like security breaches, leaks and other events.
The most difficult part is the Python code which reads from camera, adds a watermark, converts to bytes, sends to MQTT and then ftps to an FTP server. I do both since networking is always tricky. You could also add if it fails to connect to either, store to a directory on a mapped USB drive. Once network returns send it out, it would be easy to do that with MiniFi which could read that directory.
Once the file lands into the MQTT broker or FTP server, NIFI pulls it and bring it into the flow. I first store to HDFS for our Data @ Rest permanent storage for future deep learning processing. I also run three processors to extra image metadata and then call jp2a to convert the image into an ASCII picture.
ExecuteStreamCommand for Running jp2a
The Output Ascii
HDFS Directory of Uploaded Files
Metadata extracted from the image
An Example Imported Image
Other Meta Data
Meta Data Extracted
A Converted JPG to ASCII
Running JP2A on Images Stored in HDFS via WebHDFS REST API
/opt/demo/jp2a-master/src/jp2a "http://hdfsnode:50070/webhdfs/v1/images/$@?op=OPEN"
Python on RPI
#!/usr/bin/python import os import datetime import ftplib import traceback import math import random, string import base64 import json import paho.mqtt.client as mqtt import picamera from time import sleep from time import gmtime, strftime packet_size=3000 def randomword(length): return ''.join(random.choice(string.lowercase) for i in range(length)) # Create unique image name img_name = 'pi_image_{0}_{1}.jpg'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime())) # Capture Image from Pi Camera try: camera = picamera.PiCamera() camera.annotate_text = " Stored with Apache NiFi " camera.capture(img_name, resize=(500,281)) pass finally: camera.close() # MQTT client = mqtt.Client() client.username_pw_set("CloudMqttUserName","!MakeSureYouHaveAV@5&L0N6Pa55W0$4!") client.connect("cloudmqttiothoster", 14162, 60) f=open(img_name) fileContent = f.read() byteArr = bytearray(fileContent) f.close() message = '"image": {"bytearray":"' + byteArr + '"} } ' print client.publish("image",payload=message,qos=1,retain=False) client.disconnect() # FTP ftp = ftplib.FTP() ftp.connect("ftpserver", "21") try: ftp.login("reallyLongUserName", "FTP PASSWORDS SHOULD BE HARD") ftp.storbinary('STOR ' + img_name, open(img_name, 'rb')) finally: ftp.quit() # clean up sent file os.remove(img_name)
References: