Member since
11-23-2016
14
Posts
7
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
13618 | 03-23-2017 07:51 PM | |
3720 | 12-30-2016 10:30 AM |
03-23-2017
07:51 PM
We decided to leave it as its because this was for a PoC and anyway, files in S3 were created properly.
... View more
12-30-2016
10:30 AM
3 Kudos
I found the solution. Looks like Set is not automatically translated to corresponding PySet class in Jython. Here's the solution in case someone else encounters the same: from org.apache.nifi.processor import Processor, Relationship
from org.python.core import PySet
class PythonProcessor(Processor):
def __init__(self):
self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
self.REL_FAILURE = Relationship.Builder().name("failure").description("FlowFiles that failed to be processed").build()
self.REL_UNMATCH = Relationship.Builder().name("unmatch").description("FlowFiles that did not match rules").build()
self.log = None
def initialize(self, context):
self.log = context.getLogger()
def getRelationships(self):
return PySet([self.REL_SUCCESS, self.REL_FAILURE, self.REL_UNMATCH])
def validate(self, context):
return None
def getPropertyDescriptor(self, name):
return None
def getPropertyDescriptors(self):
return None
def validate(self, context):
return None
def onPropertyModified(self, descriptor, oldValue, newValue):
pass
def getIdentifier(self):
return None
processor = PythonProcessor()
... View more
12-30-2016
08:31 AM
1 Kudo
I'm trying to port this InvokeScriptedProcessor - Hello World! into Python. The Groovy example uses Generics in some of the methods like Set<Relationship> getRelationships() {
return [REL_SUCCESS] as Set
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }
List<PropertyDescriptor> getPropertyDescriptors() { return null }
How can I port this to Python? I tried to return Java's HashSet like def getRelationships(self):
set = HashSet()
set.add(self.REL_SUCCESS)
return set and then I am getting 2016-12-30 12:04:10,829 ERROR [NiFi Web Server-139] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.reflect.UndeclaredThrowableException I also tried Python's Set like def getRelationships(self):
return Set([self.REL_SUCCESS,]) In this case I received 2016-12-30 11:04:50,588 ERROR [Timer-Driven Process Thread-8] o.a.n.p.script.InvokeScriptedProcessor InvokeScriptedProcessor[id=4eb16182-0159-1000-dc3a-531dc23b13e6] Unable to get relationships from scripted Processor: java.lang.ClassCastException: org.python.core.PyObjectDerived cannot be cast to java.util.Set Here's a complete code so far: import java.util
from org.apache.nifi.processor import Processor, Relationship
from sets import Set
class PythonProcessor(Processor):
def __init__(self):
self.REL_SUCCESS = Relationship.Builder().name("success").description("FlowFiles that were successfully processed").build()
self.log = None
def initialize(self, context):
self.log = context.getLogger()
def getRelationships(self):
return Set([self.REL_SUCCESS])
def validate(self, context):
return None
def getPropertyDescriptor(self, name):
return None
def getPropertyDescriptors(self):
return None
def validate(self, context):
return None
def onPropertyModified(self, descriptor, oldValue, newValue):
pass
def getIdentifier(self):
return None
processor = PythonProcessor() I will appreciate any help
... View more
Labels:
- Labels:
-
Apache NiFi
12-19-2016
12:16 PM
I encounter a strange behavior when uploading files to S3 using PutS3Object process. It raises the following error: ERROR [Timer-Driven Process Thread-6] o.a.nifi.processors.aws.s3.PutS3Object PutS3Object[id=12b0e9ff-0159-1000-ffff-ffffeedc377a] Error checking S3 Multipart Upload list for graphdb-poc-rawdata/: Key is not expected for the GET method ?uploads subresource (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: AC706A3B0F7DF2B9)
But, the strange thing is that despite of this error I do see that files are actually uploaded to S3 bucket. Is there anything wrong with what I'm doing? I'll appreciate any suggestion how to fix this. Thank you!
... View more
Labels:
- Labels:
-
Apache NiFi
12-16-2016
03:34 PM
Thank you! This is exactly what I want to do. Run ListFile on Primary node and then load balance to the same cluster... I read in few posts here that RPG is the way to load balance, but my concern is with the URL that I will enter in RPG. What if it goes down?
... View more
12-16-2016
03:31 PM
Thank you Bryan! But what if that specific node that I entered in the RPG goes down and nod available? How will it find other nodes if the one in RPG is down? Thank you!
... View more
12-16-2016
03:17 PM
2 Kudos
It might be discussed in other threads, but I have some misunderstanding and issue that maybe you can help me with. I have a 3 node NiFi cluster and I'd like to schedule ListFile on a Primary Node, then send the flowfiles via Remote Process Group to the same cluster and by that load balance the processing. In my Remote Process Group I can set cluster URL as http://10.10.10.175:8080/nifi which is my nifi.node1. However, what happens if nifi.node1 goes down? For that purpose I took another machine and installed HAProxy so it will serve as a load balancer for my cluster. Then I configured Remote Process Group to point to LB via http://10.10.10.174:8080/nifi, but I am getting error "Unable to refresh group's peers due to unable to communicate with remote NiFi cluster..." I am able to work with NiFi Web UI via load balancer, but Remote Process Group doesn't work. Is that right approach to use Load Balancer so I won't have single point of the failure in case one of the nodes that I configured in Remote Process group goes down? Any idea what might be the cause of this error? Here's my HAProxy configuration: global
log /dev/log local0
log /dev/log local1 notice
chroot /var/lib/haproxy
stats socket /run/haproxy/admin.sock mode 660 level admin
stats timeout 30s
user haproxy
group haproxy
daemon
defaults
log global
mode http
option httplog
option dontlognull
timeout connect 5000
timeout client 50000
timeout server 50000
frontend http_front
bind *:8080
reqadd X-Forwarded-Proto:\ http
capture request header origin len 128
http-response add-header Access-Control-Allow-Origin %[capture.req.hdr(0)] if { capture.req.hdr(0) -m found }
rspadd Access-Control-Allow-Headers:\ Origin,\ X-Requested-With,\ Content-Type,\ Accept if { capture.req.hdr(0) -m found }
stats uri /haproxy?stats
default_backend http_back
backend http_back
balance roundrobin
server nifi1 10.10.10.175:8080 check
server nifi2 10.10.10.176:8080 check
server nifi3 10.10.10.177:8080 check
... View more
Labels:
- Labels:
-
Apache NiFi
11-23-2016
08:13 PM
done! thank you very much! https://issues.apache.org/jira/browse/NIFI-3096
... View more