Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11198 | 04-15-2020 05:01 PM | |
| 7097 | 10-15-2019 08:12 PM | |
| 3089 | 10-12-2019 08:29 PM | |
| 11425 | 09-21-2019 10:04 AM | |
| 4318 | 09-19-2019 07:11 AM |
03-20-2019
06:45 AM
@Shu will this work even if we have some state alraedy stored withing the processor? for eg: i have a timestamp (2019-03-17 02:00:00:0) stored in the state of my processor now i want the processor to start fetching data after 2019-03-20, will this property help in such scenario?
... View more
11-16-2017
12:24 AM
@dhieru singh
Failure relation messages won't have any write attribute(failure attribute) that is added to the flow files when the ff are transferred to failure queue. But you can use either of below methods to check the length or size of the flow files and route to SplitText processors. 1.Checking size of the flow file and routing to split text processor. 2.Check the length of the message and route to split text processor 3.Using Route On Content processor 1.Checking size of the flow file and routing to split text processor:- If you know the size of the flow file that will be having Message is too long then you can use Route on Attribute processor and add new property size more than 1 byte as ${fileSize:gt(1)} //Here we are checking the flow file size is greater than 1 byte So like this way you can filter out the flow files based on the size of the flow file but first you need to know the size of the flow files which are routing to failure. Configs:- Flow:- Failure relation --> RouteonAttribute(check filesize) -->SplitText processor--> 2.Check the length of the message and route to split text processor:- In this method we are using the same failure relation to Extract Text Processor. Add new property in Extract Text Processor to extract all the content of the flow file to an attribute Content as (.*) Configs:- If you are following this method then you need to change the highlighted properties in above screenshot according to your flow file size. As we are capturing everything by using (.*) regex so you need to change the 1.Max capture group length is Specifies the maximum number of characters a given capture group value can have. Any characters beyond the max will be truncated.
2.Maximum Buffer Size is Specifies the maximum amount of data to buffer (per file) in order to apply the regular expressions. Files larger than the specified maximum will not be fully evaluated. So once we are done with this step all the contents of ff is now added as an content attribute to the ff. RouteonAttribute Processor:- Use RouteonAttribute processor and check for length of the content attribute by using NiFi expression language. length more than 1 as ${content:length():gt(1)} //we are having content attribute from extract text processor and using that attribute and checking length of attribute and checking is it greater than 1. Configs:- Example:- if your content attribute having value as hi hello then length will be 8. Flow:- Failure Relation --> Extract Text(add property to capture all contents and change buffer size and capture length as per your flow file size) --> RouteOnAttribute(Check the length of attribute and route)-->SplitText processor 3.Using Route On Content processor:- You can use RouteonContent processor to check the content of flow file by changing the properties 1.Match Requirement property change to content must contain match 2.Buffer size depends on your flow file size 3. add property more than 1 length
[a-zA-Z0-9\s+]{1} //check content of flow file with space and Matches exactly 1 times including spaces Configs:- If you want to route the messages having more than 1000 length then change the regex to [a-zA-Z0-9\s+]{1000} //Checks flow file content, matches if ff having length of messages more than 1000 including spaces matches with that message. Flow:- Failure relation --> RouteonContent -->SplitText These are the ways we can filter out the failure relation messages, As you can choose which method best fit for your Case.
... View more
11-14-2017
09:48 PM
@Shu this worked Thanks a lot, appreciate it
... View more
11-15-2017
03:22 PM
1 Kudo
Thank you very much! It worked very well for me!
... View more
11-14-2017
02:18 PM
Thanks a lot..It really worked like a charm.. import completes successfully.. But getting a error like (ERROR ATTATCHED) Error: java.lang.RuntimeException: java.lang.RuntimeException: java.sql.SQLRecoverableException: No more data to read from socket Can you put some light on this cause ? 17/11/14 18:41:54 INFO mapreduce.Job: Running job: job_1510681534788_0007
17/11/14 18:42:02 INFO mapreduce.Job: Job job_1510681534788_0007 running in uber mode : false
17/11/14 18:42:02 INFO mapreduce.Job: map 0% reduce 0%
17/11/14 18:42:30 INFO mapreduce.Job: map 20% reduce 0%
17/11/14 18:43:01 INFO mapreduce.Job: map 40% reduce 0%
17/11/14 18:43:02 INFO mapreduce.Job: map 60% reduce 0%
17/11/14 18:43:14 INFO mapreduce.Job: map 80% reduce 0%
17/11/14 18:43:35 INFO mapreduce.Job: Task Id : attempt_1510681534788_0007_m_000003_0, Status : FAILED
Error: java.lang.RuntimeException: java.lang.RuntimeException: java.sql.SQLRecoverableException: No more data to read from socket
at org.apache.sqoop.mapreduce.db.DBInputFormat.setConf(DBInputFormat.java:167)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:749)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.RuntimeException: java.sql.SQLRecoverableException: No more data to read from socket
at org.apache.sqoop.mapreduce.db.DBInputFormat.getConnection(DBInputFormat.java:220)
at org.apache.sqoop.mapreduce.db.DBInputFormat.setConf(DBInputFormat.java:165)
... 9 more
Caused by: java.sql.SQLRecoverableException: No more data to read from socket
at oracle.jdbc.driver.SQLStateMapping.newSQLException(SQLStateMapping.java:281)
at oracle.jdbc.driver.DatabaseError.newSQLException(DatabaseError.java:118)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:224)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:296)
at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:539)
at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1091)
at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1(T4CMAREngine.java:1040)
at oracle.jdbc.driver.T4CTTIoauthenticate.receiveOauth(T4CTTIoauthenticate.java:814)
at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:428)
at oracle.jdbc.driver.PhysicalConnection.<init>(PhysicalConnection.java:494)
... 10 more
17/11/14 18:43:55 INFO mapreduce.Job: map 100% reduce 0%
17/11/14 18:43:55 INFO mapreduce.Job: Job job_1510681534788_0007 completed successfully
17/11/14 18:43:56 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=750200
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=579
HDFS: Number of bytes written=136
HDFS: Number of read operations=20
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
Job Counters
Failed map tasks=1
Launched map tasks=6
Other local map tasks=6
Total time spent by all maps in occupied slots (ms)=310816
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=310816
Total vcore-seconds taken by all map tasks=310816
Total megabyte-seconds taken by all map tasks=77704000
Map-Reduce Framework
Map input records=4
Map output records=4
Input split bytes=579
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=6122
CPU time spent (ms)=8900
Physical memory (bytes) snapshot=773640192
Virtual memory (bytes) snapshot=4145668096
Total committed heap usage (bytes)=665845760
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=136
17/11/14 18:43:56 INFO mapreduce.ImportJobBase: Transferred 136 bytes in 125.5593 seconds (1.0832 bytes/sec)
17/11/14 18:43:56 INFO mapreduce.ImportJobBase: Retrieved 4 records.
17/11/14 18:43:56 INFO util.AppendUtils: Appending to directory orders
17/11/14 18:43:56 INFO util.AppendUtils: Using found partition 10
17/11/14 18:43:56 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
17/11/14 18:43:56 INFO tool.ImportTool: --incremental lastmodified
17/11/14 18:43:56 INFO tool.ImportTool: --check-column ORDER_TIMESTAMP
17/11/14 18:43:56 INFO tool.ImportTool: --last-value 2017-11-14 19:29:52.0
17/11/14 18:43:56 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
Will be connecting you whenever getting any issue..You are really a champ..Once again thank you dear..
... View more
11-16-2017
05:32 PM
Thanks @Shu that was really helpful.. I am just wondering that when I set the number of threads of the whole instance to 1 and I have two processors connected to each other they still manage to somehow run concurrently .. The first processor takes on average 2.5 seconds per input and the second processors takes on average 4.5 seconds.. I gave it 100 inputs and I was expecting it to finish in around 700 seconds (i.e., sequential execution) but it still manages to finish in 480 seconds which suggests that each processor is using a separate thread and they do not wait on each other. Am I missing something here ?
... View more
11-09-2017
07:40 PM
1 Kudo
@Shu I checked again, the regex \n+\s+ does not remove the first line if it is blank, other than that it replaces all the blank lines, even if there are empty lines in the end, trying your regex for removing first blank line Thanks again
... View more
11-10-2017
07:04 PM
1 Kudo
@sally sally yeah, you can do that by using replace text processor with search value property as <details>\s*([\s\S]+.*)\n+\s+<\/details> //capture every thing enclosed in details tag as capture group 1 then in replacement value <details>
${filename}
$1
</details> you can customize the replacement value as per your needs. Replace Text processor Configs:- input:- <?xml version="1.0" encoding="UTF-8"?>
<service>
<Person>
<details>
<start>2017-10-22</start>
<id>*******</id>
<makeVersion>1</makeVersion>
<patch>patch</patch>
<parameter>1</parameter>
</details>
</Person>
</service> output:- <?xml version="1.0" encoding="UTF-8"?>
<service>
<Person>
<details>
1497701925152409
<start>2017-10-22</start>
<id>*******</id>
<makeVersion>1</makeVersion>
<patch>patch</patch>
<parameter>1</parameter>
</details>
</Person>
</service
... View more