Member since
03-03-2017
74
Posts
9
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2578 | 06-13-2018 12:02 PM | |
4634 | 11-28-2017 10:32 AM |
05-19-2017
08:17 AM
@Matt Clarke Thank you very very much, your answer was very useful for me
... View more
05-17-2017
01:12 PM
Hi, I have a ingest flow which initially ingest approx 50 files, they are about 300mb each. After ingest i want to run some hive commands to create various hive tables to display. but i only need to do that once and not 50 times. I have been searching for some kind of trigger to start a PutHiveQl processor once without a useful hit. How could i acomplish that
... View more
Labels:
- Labels:
-
Apache NiFi
05-17-2017
07:26 AM
Hi , I am nearly there but i still miss something out to get it totally right I have my flow built as listHDFS->fetchHdfs->SplitText on lines->executescript on eatch lines-mergecontent->putfiles When i run the flow without executescript everything looks fine, the file is similar after merge as i was before split. But when i run the files through my script data is somehow stripped of, and i cannot figure out why. It is standard files UTF-8 Unicode text, with very long lines separated by tabulator. The full script import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get()
if(!flowFile) return
def path = flowFile.getAttribute('path')
def fail = false
flowFile = session.write(flowFile, {inputStream, outputStream ->
try {
def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def cells = recordIn.split(' ') ;
def output = ""
cells.each(){it ->
output = output + it + "\t" // do something
}
output = output + path + "\n"
outputStream.write(output.getBytes(StandardCharsets.UTF_8))
recordOut = ''
}
catch(e) {
log.error("Error during processing of validate.groovy", e)
session.transfer(inputStream, REL_FAILURE)
fail=true
}
} as StreamCallback)
if(fail){
session.transfer(flowFile, REL_FAILURE)
fail = false
} else {
session.transfer(flowFile, REL_SUCCESS)
}
It seems like something is cutoff in this line outputStream.write(output.getBytes(StandardCharsets.UTF_8)) I Tried to change it to outputStream.write(recordIn.getBytes(StandardCharsets.UTF_8)) with same result.
... View more
05-16-2017
11:36 AM
Forgive my incompetence in groovy but the job cuts off a lot of data i have 147 columns in the file so i want to iterate true the columns, and add columns ind the end of the line def recordIn = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def cells = recordIn.split(' ')
for (column in cells) {
recordOut += column + ' ' // do stuff on each column
// need to detect linefeed to determine newline ?
}
// add new column
recordOut += path
outputStream.write(recordOut.getBytes(StandardCharsets.UTF_8))
recordOut = ''
I am not sure how i should understand data from InputStream , does it stream one char at the time or does it stream lines, normally i would iterate through lines, and iterate insite that loop through columns 2017-05-16 11:29:34,245 ERROR [Timer-Driven Process Thread-4] o.a.nifi.processors.script.ExecuteScript
org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardProcessSession.transfer() is applicable for argument types: (org.apache.nifi.controller.repository.io.FlowFileAccessInputStream, org.apache.nifi.processor.Relationship) values: [org.apache.nifi.controller.repository.io.FlowFileAccessInputStream@702d53aa, ...]
Possible solutions: transfer(java.util.Collection, org.apache.nifi.processor.Relationship), transfer(org.apache.nifi.flowfile.FlowFile, org.apache.nifi.processor.Relationship), transfer(java.util.Collection), transfer(org.apache.nifi.flowfile.FlowFile)
at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:214) ~[na:na]
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.0.2.1.1.0-2.jar:1.1.0.2.1.1.0-2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_77]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_77]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_77]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Caused by: javax.script.ScriptException: javax.script.ScriptException: groovy.lang.MissingMethodException: No signature of method: org.apache.nifi.controller.repository.StandardProces
... View more
05-15-2017
12:23 PM
Hi, I am running a nififlow which ingest tabular separated text files into hdfs. I would lige to add columns to each files with flowfile attributes like ${path}.${filename} and so on Test every column for chars that i don't want to ingest Not having experience with groovy i wondered if this could be achieved by writing a groovy script I have been playing around without any good result until now, i could be nice with a small help to guide me in the right direction. import java.nio.charset.StandardCharsets
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile, {inputStream, outputStream ->
inputStream.eachLine { line ->
a = line.tokenize('\t')
a = a + ' flowfile' *trying to add column
}
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
session.commit()
... View more
Labels:
- Labels:
-
Apache NiFi
05-15-2017
06:24 AM
Thank you very much Matt
... View more
05-04-2017
11:58 PM
Hi I am trying to import data from sybase-ASE table into hive When running following cmd sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" --driver net.sourceforge.jtds.jdbc.Driver --connect jdbc:jtds:sybase://11.9.179.5:4000/BAS_csrp --username user--password xxxxxx --table csrp_total_bas --split-by cpr_nr --hcatalog-database default --hcatalog-table csrp_total_baseline --hcatalog-home /tmp/simon/std/CPRUPD/BASELINE --create-hcatalog-table --hcatalog-storage-stanza "stored as orcfile"
I get this error : Warning: /usr/hdp/2.5.0.0-1245/accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
17/05/04 17:20:12 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.5.0.0-1245
17/05/04 17:20:12 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
17/05/04 17:20:12 WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time.
17/05/04 17:20:12 INFO manager.SqlManager: Using default fetchSize of 1000
17/05/04 17:20:12 INFO tool.CodeGenTool: Beginning code generation
17/05/04 17:22:19 ERROR manager.SqlManager: Error executing statement: java.sql.SQLException: Network error IOException: Connection timed out
java.sql.SQLException: Network error IOException: Connection timed out
at net.sourceforge.jtds.jdbc.JtdsConnection.<init>(JtdsConnection.java:436)
at net.sourceforge.jtds.jdbc.Driver.connect(Driver.java:184)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at org.apache.sqoop.manager.SqlManager.makeConnection(SqlManager.java:904)
at org.apache.sqoop.manager.GenericJdbcManager.getConnection(GenericJdbcManager.java:52)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:763)
at org.apache.sqoop.manager.SqlManager.execute(SqlManager.java:786)
at org.apache.sqoop.manager.SqlManager.getColumnInfoForRawQuery(SqlManager.java:289)
at org.apache.sqoop.manager.SqlManager.getColumnTypesForRawQuery(SqlManager.java:260)
at org.apache.sqoop.manager.SqlManager.getColumnTypes(SqlManager.java:246)
at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:328)
at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1853)
at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1653)
at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:488)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:615)
at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:225)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
at org.apache.sqoop.Sqoop.main(Sqoop.java:243)
Caused by: java.net.ConnectException: Connection timed out
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at net.sourceforge.jtds.jdbc.SharedSocket.createSocketForJDBC3(SharedSocket.java:288)
at net.sourceforge.jtds.jdbc.SharedSocket.<init>(SharedSocket.java:251)
at net.sourceforge.jtds.jdbc.JtdsConnection.<init>(JtdsConnection.java:331)
... 22 more
17/05/04 17:22:19 ERROR tool.ImportTool: Encountered IOException running import job: java.io.IOException: No columns to generate for ClassWriter
at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1659)
at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:107)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:488)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:615)
at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:225)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
at org.apache.sqoop.Sqoop.main(Sqoop.java:243)
When running sqoop without hcatalog i get some warnings but it runs on exatly the same database and table sqoop import "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" --driver net.sourceforge.jtds.jdbc.Driver --connect jdbc:jtds:sybase://10.9.179.5:4000/BAS_csrp --username user --password xxxxxx --table csrp_total_bas --split-by cpr_nr --target-dir /tmp/simon/std/CPRUPD/BASELINE_file/ this is output for the sqoop which works Warning: /usr/hdp/2.5.0.0-1245/accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
17/05/04 17:14:47 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6.2.5.0.0-1245
17/05/04 17:14:47 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
17/05/04 17:14:47 WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time.
17/05/04 17:14:47 INFO manager.SqlManager: Using default fetchSize of 1000
17/05/04 17:14:47 INFO tool.CodeGenTool: Beginning code generation
17/05/04 17:14:47 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM csrp_total_bas AS t WHERE 1=0
17/05/04 17:14:47 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM csrp_total_bas AS t WHERE 1=0
17/05/04 17:14:47 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/hdp/2.5.0.0-1245/hadoop-mapreduce
Note: /tmp/sqoop-w20960/compile/ccd3eec0b22d23dc5103e861bf57e345/csrp_total_bas.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
17/05/04 17:14:49 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-w20960/compile/ccd3eec0b22d23dc5103e861bf57e345/csrp_total_bas.jar
17/05/04 17:14:49 INFO mapreduce.ImportJobBase: Beginning import of csrp_total_bas
17/05/04 17:14:49 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM csrp_total_bas AS t WHERE 1=0
17/05/04 17:14:50 INFO impl.TimelineClientImpl: Timeline service address: http://sktudv01hdp02.ccta.dk:8188/ws/v1/timeline/
17/05/04 17:14:50 INFO client.AHSProxy: Connecting to Application History server at sktudv01hdp02.ccta.dk/172.20.242.53:10200
17/05/04 17:14:51 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 8899 for w20960 on ha-hdfs:hdpudv01
17/05/04 17:14:51 INFO security.TokenCache: Got dt for hdfs://hdpudv01; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdpudv01, Ident: (HDFS_DELEGATION_TOKEN token 8899 for w20960)
17/05/04 17:14:51 WARN token.Token: Cannot find class for token kind kms-dt
17/05/04 17:14:51 INFO security.TokenCache: Got dt for hdfs://hdpudv01; Kind: kms-dt, Service: 172.20.242.53:9292, Ident: 00 06 57 32 30 39 36 30 04 79 61 72 6e 00 8a 01 5b d4 07 2a 3d 8a 01 5b f8 13 ae 3d 3b 09
17/05/04 17:14:51 WARN ipc.Client: Failed to connect to server: sktudv01hdp02.ccta.dk/172.20.242.53:8032: retries get failed due to exceeded maximum allowed retries number: 0
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:650)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:745)
at org.apache.hadoop.ipc.Client$Connection.access$3200(Client.java:397)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1618)
at org.apache.hadoop.ipc.Client.call(Client.java:1449)
at org.apache.hadoop.ipc.Client.call(Client.java:1396)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy22.getNewApplication(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getNewApplication(ApplicationClientProtocolPBClientImpl.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
at com.sun.proxy.$Proxy23.getNewApplication(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNewApplication(YarnClientImpl.java:225)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createApplication(YarnClientImpl.java:233)
at org.apache.hadoop.mapred.ResourceMgrDelegate.getNewJobID(ResourceMgrDelegate.java:188)
at org.apache.hadoop.mapred.YARNRunner.getNewJobID(YARNRunner.java:231)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:153)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)
at org.apache.sqoop.mapreduce.ImportJobBase.doSubmitJob(ImportJobBase.java:200)
at org.apache.sqoop.mapreduce.ImportJobBase.runJob(ImportJobBase.java:173)
at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:270)
at org.apache.sqoop.manager.SqlManager.importTable(SqlManager.java:692)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:507)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:615)
at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:225)
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
at org.apache.sqoop.Sqoop.main(Sqoop.java:243)
17/05/04 17:14:51 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
17/05/04 17:14:51 WARN token.Token: Cannot find class for token kind kms-dt
17/05/04 17:14:51 INFO security.TokenCache: Got dt for hdfs://hdpudv01; Kind: kms-dt, Service: 172.20.242.54:9292, Ident: 00 06 57 32 30 39 36 30 04 79 61 72 6e 00 8a 01 5b d4 07 2a c5 8a 01 5b f8 13 ae c5 8e 04 f8 2b
17/05/04 17:14:52 INFO db.DBInputFormat: Using read commited transaction isolation
17/05/04 17:14:52 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(cpr_nr), MAX(cpr_nr) FROM csrp_total_bas
17/05/04 17:17:30 WARN db.TextSplitter: Generating splits for a textual index column.
17/05/04 17:17:30 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
17/05/04 17:17:30 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
17/05/04 17:17:30 INFO mapreduce.JobSubmitter: number of splits:6
17/05/04 17:17:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1491967685506_0645
17/05/04 17:17:31 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hdpudv01, Ident: (HDFS_DELEGATION_TOKEN token 8899 for w20960)
17/05/04 17:17:31 WARN token.Token: Cannot find class for token kind kms-dt
17/05/04 17:17:31 WARN token.Token: Cannot find class for token kind kms-dt
Kind: kms-dt, Service: 172.20.242.53:9292, Ident: 00 06 57 32 30 39 36 30 04 79 61 72 6e 00 8a 01 5b d4 07 2a 3d 8a 01 5b f8 13 ae 3d 3b 09
17/05/04 17:17:31 WARN token.Token: Cannot find class for token kind kms-dt
17/05/04 17:17:31 WARN token.Token: Cannot find class for token kind kms-dt
Kind: kms-dt, Service: 172.20.242.54:9292, Ident: 00 06 57 32 30 39 36 30 04 79 61 72 6e 00 8a 01 5b d4 07 2a c5 8a 01 5b f8 13 ae c5 8e 04 f8 2b
17/05/04 17:17:32 INFO impl.YarnClientImpl: Submitted application application_1491967685506_0645
17/05/04 17:17:32 INFO mapreduce.Job: The url to track the job: http://sktudv01hdp01.ccta.dk:8088/proxy/application_1491967685506_0645/
17/05/04 17:17:32 INFO mapreduce.Job: Running job: job_1491967685506_0645
17/05/04 17:17:39 INFO mapreduce.Job: Job job_1491967685506_0645 running in uber mode : false
17/05/04 17:17:39 INFO mapreduce.Job: map 0% reduce 0%
17/05/04 17:20:47 INFO mapreduce.Job: map 33% reduce 0%
17/05/04 17:20:48 INFO mapreduce.Job: map 83% reduce 0%
17/05/04 17:36:50 INFO mapreduce.Job: map 100% reduce 0%
17/05/04 17:36:50 INFO mapreduce.Job: Job job_1491967685506_0645 completed successfully
17/05/04 17:36:51 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=1037068
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=774
HDFS: Number of bytes written=47578984831
HDFS: Number of read operations=24
HDFS: Number of large read operations=0
HDFS: Number of write operations=12
Job Counters
Launched map tasks=6
Other local map tasks=6
Total time spent by all maps in occupied slots (ms)=2083062
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=2083062
Total vcore-milliseconds taken by all map tasks=2083062
Total megabyte-milliseconds taken by all map tasks=4266110976
Map-Reduce Framework
Map input records=46527615
Map output records=46527615
Input split bytes=774
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=14955
CPU time spent (ms)=1281100
Physical memory (bytes) snapshot=1798893568
Virtual memory (bytes) snapshot=22238547968
Total committed heap usage (bytes)=1817706496
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=47578984831
17/05/04 17:36:51 INFO mapreduce.ImportJobBase: Transferred 44.3114 GB in 1,320.7926 seconds (34.3543 MB/sec)
17/05/04 17:36:51 INFO mapreduce.ImportJobBase: Retrieved 46527615 records.
I really don't understand it, i guess it might be some configuration issues on hive or in sqoop but i am just guessing
... View more
Labels:
- Labels:
-
Apache Sqoop
05-03-2017
11:36 AM
Hi, I got this problem I need to do some cleaning within columns in my flowfile. the columns are seperated by tabs, sometimes there can be \n within the columns so i cannot use splitlines to access the data, so i tried to use the csv library to read the inputstream. But i get an error : at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]
Caused by: javax.script.ScriptException: java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Cannot create PyString with non-byte value in <script> at line number 21 Here are my script which i am running in the ExecuteScript body from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import csv
# Define a subclass of StreamCallback for use in session.write()
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
newText =''
Text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
reader = csv.reader(Text,delimiter=' ')
for row in reader:
newText+=(' '.join(row)).rstrip('\n\r')
outputStream.write(newText.encode('utf-8'))
# end class
flowFile = session.get()
if(flowFile != None):
flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
# implicit return at the end
... View more
Labels:
- Labels:
-
Apache NiFi
04-27-2017
02:06 PM
Hello, I want to do some stuff on my data while ingesting so i created a little python script to do that . I am ingesting csv files and want to remove carriage return, linefeed etc in the columns. Also adding new column with timestamp My script looks lige this import csv
import sys
flowFile = session.get()
with open(flowFile, 'rb') as csvfile:
spamreader = csv.reader(csvfile, delimiter=' ', quotechar='|')
for row in spamreader:
for col in row:
col.rstrip('\n\r ')
print ', '.join(row)
Right now i got this error when running it 2017-04-27 12:40:13,463 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=af58777c-015b-1000-ffff-ffff972978f8] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: NameError: name 'flowfile' is not defined in <script> at line number 3: org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: NameError: name 'flowfile' is not defined in <script> at line number 3
What should i add to this script in order to output the manipulated flowfiles to the next process. Do i need to rewrite them like session.write(flowfile). I am not a python programmer so please be kind and supply small example Thank you
... View more
Labels:
- Labels:
-
Apache NiFi
04-27-2017
12:59 PM
Thank you very much
... View more
- « Previous
- Next »