Member since
05-30-2018
1322
Posts
715
Kudos Received
148
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 4019 | 08-20-2018 08:26 PM | |
| 1924 | 08-15-2018 01:59 PM | |
| 2356 | 08-13-2018 02:20 PM | |
| 4067 | 07-23-2018 04:37 PM | |
| 4974 | 07-19-2018 12:52 PM |
04-07-2020
08:30 PM
1 Kudo
Application deployment has been significantly proliferated by Kubernetes. However, true universal log capture with multi endpoint (downstream) support is lacking. Apache NiFi Stateless provides a possibility to bridge the gap between rapid application deployment and InfoSecs desire to continue to capture and monitor behaviors.
What is NiFi Stateless?
NiFi-Fn is a library for running NiFi flows as stateless functions. It provides delivery guarantees similar to NiFi, without the need for an on-disk repository, by waiting to confirm receipt of incoming data until it has been written to the destination (source NIFI-5922).
Try it out
Prerequisites
K8s (local or cluster). In this demonstration, Azure Kubernetes Service is used.
Some familiarity with K8s & NiFi
Assets Used
NiFi on K8s
https://github.com/sunileman/AKS-YAMLS/blob/master/apache-nifi.yaml
Any instance of NiFi will do here. It does not need to run on K8s.
NiFi Registry on K8s
https://github.com/sunileman/AKS-YAMLS/blob/master/nifi-registry.yml
Any instance of NiFi Registry will do here. It does not need to run on K8s.
Laying the groundwork
NiFi Stateless will pull an existing flow from NiFi Registry. The following is a simple NiFi flow designed in NiFi:
TailFile processor will tail the application log file /var/log/app.txt. The application deployed will write log entries to this file:
The flow is checked into NiFi Registry. NiFi Registry URL, Bucket Identifier & Flow Identifier will be used by NiFi Stateless at run time. More about this soon.
Time to deploy
The flow has been registered into NiFi Registry, therefore the application pod can be deployed. A NiFi Stateless container will be deployed in the same application Pod (sidecar) to capture the log data generated from the application. The application being deployed is simple. It is a dummy application that generates a timestamp log entry every 5 seconds into a log file (/var/log/app.txt). NiFi stateless will tail this file and ship the events. The event can be shipped virtually anywhere due to NiFi’s inherent universal log forward compatibility. (Kafka/Splunk/ElasticSearch/Mongo/Kinesis/EventHub/S3/ADLS/etc). All NiFi processors are in https://nifi.apache.org/docs.html. For this demonstration, the log event will be shipped to a NiFi cluster over Site2Site.
Here is the K8s YAML to deploy the Pod (application with NiFi Stateless sidecar): https://github.com/sunileman/AKS-YAMLS/blob/master/nifi-stateless-sidecar.yml
In that YAML file, NiFi Registry URL, bucketId, and flowId will need to be updated. These values are from the NiFi registry. NiFi Stateless binds itself at runtime to a specific flow to execute.
args: ["RunFromRegistry", "Continuous", "--json", "{\"registryUrl\":\"http://nifiregistry-service\",\"bucketId\":\"71efc3ea-fe1d-4307-97ce-589f78be05fb\",\"flowId\":\"c9092508-4deb-45d2-b6a4-b6a4da71db47\"}"]
To deploy the Pod, run the following:
kubectl apply -f nifi-statless-sidecar.yml
Once the pod is up and running, immediately application log events are captured by NiFi Stateless containers and shipped downstream.
Wrapping Up
FluentD and similar offerings are great for getting started to capture application log data. However, enterprises require much richer connectivity (Universal Log Forward Compatibility) to enable InfoSec to perform their vital role. NiFi Stateless bridges that current gap.
... View more
Labels:
04-02-2020
04:46 AM
How to get entire table list in hive(cluster). Not for particular table and database.
... View more
03-21-2020
07:27 AM
I do have similar issue to connect hive from squirrel i use the Beeline version 3.1.0.3.0.1.0-187 by connecting Hortonworks Image thru VM Here are the jars added ,but I am having connection got refused with error "Unexpected Error occurred attempting to open an SQL connection.class java.net.ConnectException: Connection refused: connect" hive-jdbc-3.1.0.3.0.1.0-187.jar hive-jdbc-3.1.0.3.0.1.0-187-sources.jar hive-jdbc-3.1.0.3.0.1.0-187-standalone.jar Jdbc URL jdbc:hive2://sandbox-hdp.hortonworks.com:2181/default Any idea how to fix?
... View more
03-02-2020
12:52 AM
Hey! Can you please explain more how to create an apache NIFI template?! Actually I need to know is it possible to generate an Apache NiFi template using JAVA ??
... View more
02-20-2020
03:52 AM
Also LEAD() function can be used with COALESCE() For example coalesce(sepan, lead(sepan, 1) over (partition by sourcedata order by timestamps)) as sepan That will replace NULL values in sepan column with next non null value. The timestamps column could be ordered in asc order depending on the value you wan to copy.
... View more
01-15-2020
11:57 AM
Over the last fews weeks as customers have started to ramp up their usage of CDP cloud assets (like DataHub, Experiences, etc), I have observed many of the ways they are leveraging on-prem engineering assets (code) in the cloud. The concept of write-once-deploy-anywhere is fundamental to a well designed data strategy. It's NOT a sales pitch. It's a reality for enterprises who have invested in a Modern Data Architecture. However, unlike on-prem where storage and services are tightly coupled, CDP flips that concept on its head. We can now launch services independently and choose only the capabilities we need for the task at hand. For example, streaming use cases typically require NiFi, Kafka, and Spark Streaming. Each of those services would be separate DataHub clusters and scale independently. This article focuses on using PySpark to read from a secured Kafka instance. To be clear, this is one way (not the only way) of using PySpark to read from Kafka. Both (DE & SM) clusters are launched via CDP control plane. CDP DataHub assets used in this article Data Engineering Cluster (DE) Streams Messaging (SM) Launching DataHub DE and SM clusters are well documented here. PreWork Secure Kafka and generate certs/truststore https://docs.cloudera.com/runtime/7.0.2/kafka-securing/topics/kafka-secure-tls.html In this article I refer to the truststore as kafka.client.truststore Get a list of all Kafka brokers and ports For this article I will call the brokers k1.cloudera.com:9093, k2.cloudera.com:9093, k3.cloudera.com:9093 Create a kafka.properties file security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/home/sunilemanjee/kafka.client.truststore.jks
ssl.truststore.password=password
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="machine-user" password="password; Create a Kafka jaas file called jaas.conf (this can be named whatever you like) KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="machine-user"
password="password";
}; Create a Kafka topic The easiest way to create a Kafka topic is via SMM (Streamings Messaging Manager) which is shipped with Streams Messaging cluster. Click on the SMM URL within DataHub and the click on "Topics" located on the right menu bar. Click on "Add New" to create a new Kafka topic. Enter the topic name "demo", set partitions to 1, and clean up policy to "delete". The demo topic should now be available. Generate Data For PySpark to consume from a secured instance of Kafka, we need the ability to write to Kafka. Here we will use Kafka console. SSH into one of the broker nodes Update k*.cloudera.com:9093 with your broker list and ports Upload kafka.properties (created early) onto this node Update the location of your kafka.properties file After the below command is executed, you can start to write data (messages) to Kafka. We will come back to this in a moment. kafka-console-producer --broker-list k1.cloudera.com:9093, k2.cloudera.com:9093,k3.cloudera.com:9093 --producer.config /home/c
sunilemanjee/kafka.properties --topic demo Read from Kafka using PySpark SSH into any node within the DE cluster Uploaded jaas.conf and kafka.client.truststore Update the location of jaas.conf and kafka.client.truststore Launch PySpark shell using the following command pyspark --files "/home/csunilemanjee/jaas.conf#jaas.conf,/home/sunilemanjee/kafka.client.truststore.jks#kafka.client.truststore.jks" --driver-java-options "-Djava.security.auth.login.config=/home/sunilemanjee/jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/sunilemanjee/jaas.conf" Once PySpark shell is up, it may be easier to store the Kafka brokers in a variable like this: KAFKA_BROKERS = "k1.cloudera.com:9093,k2.cloudera.com:9093,k3.cloudera.com:9093" Create a structured stream to read from Kafka. Update the following kafka.ssl.truststore.location kafka.ssl.truststore.password username password df_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", KAFKA_BROKERS).option("subscribe", "demo").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.mechanism", "PLAIN").option("kafka.ssl.truststore.location", "./kafka.client.truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"machine-user\" password=\"password\"serviceName=\"kafka\";").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").trigger(continuous="1 second").start() To start viewing Kafka message on the console (pyspark shell) from the "demo" topic stream = df_kafka.writeStream.format("console").start()
stream.awaitTermination()
##once you are finished, to kill the stream run this
stream.stop() Go back to your kafka console and start write messaging (anything you like). You will see those messages show up in your PySpark Shell console. That's it. Again this is one (not the only way) to use PySpark to consume from a secured Kafka instance. I see as an emerging pattern in the CDP for streaming use cases. Enjoy.
... View more
Labels:
11-25-2019
12:02 AM
@sunile_manjee Your article is too good and informative. I am searching for Benchmarking Hadoop with TeraGen, TeraSort, and TeraValidate with ease and I get exact article i am thankful to you for sharing this educational article . and the way you written is also good, you covered up all the points which i searching for & I am impressed by reading this article. Keep writing and sharing educational article like this which can help us to grow our knowledge.
Regards : Sevenmentor
... View more
11-18-2019
08:29 PM
@Kalyan77 Good question and I haven't tried yet. in the next few weeks I have an engagement which will require me to find out. will keep you posted.
... View more
08-27-2019
09:06 PM
2 Kudos
Part 2 of Autoscaling MiNiFi on K8S is focused on deploying the artifacts on AKS - Amazon Kubernetes Service. My knew jerk reaction was all Kubernetes as a Service would play well but that is definitely not the case. Hence why GCP Anthos product direction for this space is a key. The net-net of my observation is k8s app deployment on any single cloud vendor would cause deployment complexities any other k8s deployment, cloud or OnPrem.. Vendor lock in theory is ALIVE and WELL. In Azure I leveraged ACS for EFM and NiFi Registry; however, the natural evolution was to deploy EFM and NiFi Registry (NR) on K8S. EFM, NR, and MiNiFi are integrated components (refer to part 1 on architecture). I will leverage several key out of the box k8s components to make this all work together. The good news is, the deployment is super simple! Prerequisites Some knowledge of Kubernetes and AKS AKS Cluster kubectl cli eksctl cli VPC 2 public subnets within a VPC NR, EFM, and MiNiFi images uploaded to and available ECS Refer to part 1 on image locations Create a AKS Cluster eksctl Makes this simple. I tried using aws eks and it was painful. eksctl create cluster \
--name sunman-k8s \
--version 1.13 \
--nodegroup-name standard-workers \
--node-type t3.medium \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--vpc-public-subnets=subnet-067d0ffbc09152382,subnet-037d8c6750c5de236 \
--node-ami auto Deployment All the contents in the ymls below can be placed into single file for deployment. For this demonstration, chucking it into smaller components makes it easier to explain. NiFi Registry (NR) Edge Flow Manager has a dependency on NR. Flow versions are stored in NR. Here is nifiregistry.yml. apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: nifiregistry
spec:
replicas: 1
selector:
matchLabels:
app: nifiregistry
template:
metadata:
labels:
app: nifiregistry
spec:
containers:
- name: nifiregistry-container
image: your-image-location/nifiregistry
ports:
- containerPort: 18080
name: http
- containerPort: 22
name: ssh
resources:
requests:
cpu: ".5"
memory: "2Gi"
limits:
cpu: "1"
env:
- name: VERSION
value: "11"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: nifiregistry-service #+
spec: #+
selector: #+
app: nifiregistry #+
ports: #+
- protocol: TCP #+
targetPort: 18080 #+
port: 80 #+
name: http #+
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
type: LoadBalancer #+
loadBalancerSourceRanges:
- 0.0.0.0/0 Update the following line in nifiregistry.yml with the location of your NR image. image: your-image-location/nifiregistry Also take note the load balancer for NR is open to the world. You may want to lock this down. Deploy NR on k8s kubectl apply -f nifiregistry.yml Edge Flow Manager (EFM) Next deploy EFM on k8s. Here is efm.yml apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: efm
spec:
replicas: 1
selector:
matchLabels:
app: efm
template:
metadata:
labels:
app: efm
spec:
containers:
- name: efm-container
image: your-image-location/efm
ports:
- containerPort: 10080
name: http
- containerPort: 22
name: ssh
resources:
requests:
cpu: ".5"
memory: "2Gi"
limits:
cpu: "1"
env:
- name: VERSION
value: "11"
- name: NIFI_REGISTRY_ENABLED
value: "true"
- name: NIFI_REGISTRY_BUCKETNAME
value: "testbucket"
- name: NIFI_REGISTRY
value: "<a href="<a href="<a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>>>>" target="_blank"><a href="<a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a</a>>>>>"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: efm-service #+
spec: #+
selector: #+
app: efm #+
ports: #+
- protocol: TCP #+
targetPort: 10080 #+
port: 80 #+
name: http #+
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
type: LoadBalancer #+
loadBalancerSourceRanges:
- 0.0.0.0/0 Update the following line in efm..yml with the location of your EFM image. image: your-image-location/efm Also take note the load balancer for EFM is open to the world. You may want to lock this down. Deploy EFM on k8s kubectl apply -f efm.yml MiNiFi Lastly, deploy MiNiF on k8s. Here is minifi.yml apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: minifi
spec:
replicas: 1
selector:
matchLabels:
app: minifi
template:
metadata:
labels:
app: minifi
spec:
containers:
- name: minifi-container
image: your-image-location/minifi-azure-aws
ports:
- containerPort: 10080
name: http
- containerPort: 6065
name: listenhttp
- containerPort: 22
name: ssh
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
env:
- name: NIFI_C2_ENABLE
value: "true"
- name: MINIFI_AGENT_CLASS
value: "listenSysLog"
- name: NIFI_C2_REST_URL
value: "<a href="<a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a</a>>>>"
- name: NIFI_C2_REST_URL_ACK
value: "<a href="<a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a</a>>>>"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: minifi-service #+
spec: #+
selector: #+
app: minifi #+
ports: #+
- protocol: TCP #+
targetPort: 10080 #+
port: 10080 #+
name: http #+
- protocol: TCP #+
targetPort: 9877 #+
port: 9877 #+
name: tcpsyslog
- protocol: TCP #+
targetPort: 9878 #+
port: 9878 #+
name: udpsyslog
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
- protocol: TCP #+
targetPort: 6065 #+
port: 6065 #+
name: listenhttp #+
type: LoadBalancer #+
loadBalancerSourceRanges:
- 0.0.0.0/0 Update the following line in minifi..yml with the location of your MiNiFI image. image: your-image-location/minifi-azure-aws Also take note the load balancer for MiNiFi is open to the world. You may want to lock this down. Deploy MiNiFi on k8s kubectl apply -f minifi.yml Thats it! Part 1 of this series demonstrated how to autoscale MiNiFi and those same k8s commands can be used here to scale this out properly. The next part of this series will add the concept of k8s stateful sets and their impact EFM/NR/MiNiFi for a resilient backend persistence layer.
... View more
08-18-2019
09:42 PM
Hi @rushi_ns , yours might be completly different issue. Please create a new Question thread stating your issue.
... View more