Support Questions

Find answers, ask questions, and share your expertise

Falcon Email Processing Tutorial in HDP 2.5

avatar

I'm trying to implement the Falcon Email Tutorial tutorial in HDP 2.5.3 (self installed on a single node rather than on the sandbox). Everything is submitted and running, the RawEmailIngestProcess is creating data, but no instances of the rawEmailFeed are scheduled, and therefore the cleansedEmailProcess is stuck waiting for input.

How do I troubleshoot this, anyone have any ideas? I have a feeling it's something similar to this HCC post which @Sowmya Ramesh helped with, but I can't get my head round the logic of the validities!

Here's my code:

rawEmailFeed

<feed xmlns='uri:falcon:feed:0.1' name='rawEmailFeed' description='Raw customer email feed'>
  <tags>externalSystem=USWestEmailServers</tags>
  <groups>churnAnalysisDataPipeline</groups>
  <availabilityFlag>_success</availabilityFlag>
  <frequency>hours(1)</frequency>
  <timezone>UTC</timezone>
  <late-arrival cut-off='hours(1)'/>
  <clusters>
    <cluster name='primaryCluster' type='source'>
      <validity start='2017-02-24T17:57Z' end='2099-06-05T11:59Z'/>
      <retention limit='days(90)' action='delete'/>
      <locations>
        <location type='data' path='/user/ambari-qa/falcon/demo/primary/input/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
        <location type='stats' path='/'>
        </location>
      </locations>
    </cluster>
  </clusters>
  <locations>
    <location type='data' path='/user/ambari-qa/falcon/demo/primary/input/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
    </location>
    <location type='stats' path='/'>
    </location>
  </locations>
  <ACL owner='ambari-qa' group='users' permission='0755'/>
  <schema location='/none' provider='/none'/>
  <properties>
    <property name='queueName' value='default'>
    </property>
    <property name='jobPriority' value='NORMAL'>
    </property>
  </properties>
</feed>

rawEmailIngestProcess

<process xmlns='uri:falcon:process:0.1' name='rawEmailIngestProcess'>
  <tags>email=testemail</tags>
  <clusters>
    <cluster name='primaryCluster'>
      <validity start='2017-02-24T17:59Z' end='2099-06-05T18:00Z'/>
    </cluster>
  </clusters>
  <parallel>1</parallel>
  <order>FIFO</order>
  <frequency>hours(1)</frequency>
  <timezone>UTC</timezone>
  <outputs>
    <output name='output' feed='rawEmailFeed' instance='now(0,0)'>
    </output>
  </outputs>
  <workflow name='emailIngestWorkflow' version='4.0.1' engine='oozie' path='/user/ambari-qa/falcon/demo/apps/ingest/fs'/>
  <retry policy='exp-backoff' delay='minutes(3)' attempts='3'/>
  <ACL owner='ambari-qa' group='users' permission='0755'/>
</process>

cleansedEmailFeed

<feed xmlns='uri:falcon:feed:0.1' name='cleansedEmailFeed' description='Cleansed customer emails'>
  <tags>cleanse=cleaned</tags>
  <groups>churnAnalysisDataPipeline</groups>
  <availabilityFlag>_success</availabilityFlag>
  <frequency>hours(1)</frequency>
  <timezone>UTC</timezone>
  <late-arrival cut-off='hours(4)'/>
  <clusters>
    <cluster name='primaryCluster' type='source'>
      <validity start='2017-02-24T17:58Z' end='2099-06-05T18:00Z'/>
      <retention limit='hours(90)' action='delete'/>
      <locations>
        <location type='data' path='/user/ambari-qa/falcon/demo/primary/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
        <location type='stats' path='/tmp/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
        <location type='meta' path='/tmp/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
      </locations>
    </cluster>
    <cluster name='backupCluster' type='target'>
      <validity start='2017-02-24T17:58Z' end='2099-06-05T18:00Z'/>
      <retention limit='hours(90)' action='delete'/>
      <locations>
        <location type='data' path='/falcon/demo/bcp/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
        <location type='stats' path='/tmp/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
        <location type='meta' path='/tmp/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
        </location>
      </locations>
    </cluster>
  </clusters>
  <locations>
    <location type='data' path='/user/ambari-qa/falcon/demo/primary/processed/enron/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
    </location>
    <location type='stats' path='/tmp/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
    </location>
    <location type='meta' path='/tmp/${YEAR}-${MONTH}-${DAY}-${HOUR}'>
    </location>
  </locations>
  <ACL owner='ambari-qa' group='users' permission='0755'/>
  <schema location='/none' provider='/none'/>
  <properties>
    <property name='queueName' value='default'>
    </property>
    <property name='jobPriority' value='NORMAL'>
    </property>
  </properties>
</feed>

cleanseEmailProcess

<process xmlns='uri:falcon:process:0.1' name='cleanseEmailProcess'>
  <tags>cleanse=yes</tags>
  <clusters>
    <cluster name='primaryCluster'>
      <validity start='2017-02-24T17:59Z' end='2099-06-05T18:00Z'/>
    </cluster>
  </clusters>
  <parallel>1</parallel>
  <order>FIFO</order>
  <frequency>hours(1)</frequency>
  <timezone>UTC</timezone>
  <inputs>
    <input name='input' feed='rawEmailFeed' start='now(0,0)' end='now(0,0)'>
    </input>
  </inputs>
  <outputs>
    <output name='output' feed='cleansedEmailFeed' instance='now(0,0)'>
    </output>
  </outputs>
  <workflow name='emailCleanseWorkflow' version='pig-0.13.0' engine='pig' path='/user/ambari-qa/falcon/demo/apps/pig/id.pig'/>
  <retry policy='exp-backoff' delay='minutes(3)' attempts='3'/>
  <ACL owner='ambari-qa' group='users' permission='0755'/>
</process>

Logs output

[root@anafalcon0 ~]# falcon instance -type feed -name rawEmailFeed -logs
Consolidated Status: SUCCEEDED


Instances:
Instance		Cluster		SourceCluster		Status		RunID			Log
-----------------------------------------------------------------------------------------------


Additional Information:
Response: default/STATUS
Request Id: default/1477825851@qtp-1435229983-77 - f7fb75d8-4221-4481-b333-d4b25d105c02


[root@anafalcon0 ~]# falcon instance -type feed -name cleansedEmailFeed -logs
Consolidated Status: SUCCEEDED


Instances:
Instance		Cluster		SourceCluster		Status		RunID			Log
-----------------------------------------------------------------------------------------------
2017-02-24T21:59Z	backupCluster	primaryCluster	WAITING	latest	-


Additional Information:
Response: default/STATUS
Request Id: default/1477825851@qtp-1435229983-77 - 2c22df87-3e68-4af6-b483-3dba6ae85bb3

[root@anafalcon0 ~]# falcon instance -type process -name cleanseEmailProcess -logs
Consolidated Status: SUCCEEDED


Instances:
Instance		Cluster		SourceCluster		Status		RunID			Log
-----------------------------------------------------------------------------------------------
2017-02-24T21:59Z	primaryCluster	-	WAITING	latest	-
2017-02-24T21:59Z	primaryCluster	-	WAITING	latest	-
2017-02-24T20:59Z	primaryCluster	-	WAITING	latest	-
2017-02-24T19:59Z	primaryCluster	-	WAITING	latest	-
2017-02-24T18:59Z	primaryCluster	-	WAITING	latest	-
2017-02-24T17:59Z	primaryCluster	-	WAITING	latest	-


Additional Information:
Response: default/STATUS
Request Id: default/1477825851@qtp-1435229983-77 - ecdd07db-ca6c-42a6-b2cd-c0ac02faa116


[root@anafalcon0 ~]# falcon instance -type process -name rawEmailIngestProcess -logs
Consolidated Status: SUCCEEDED


Instances:
Instance		Cluster		SourceCluster		Status		RunID			Log
-----------------------------------------------------------------------------------------------
2017-02-24T21:59Z	primaryCluster	-	SUCCEEDED	latest	http://anafalcon0.test.com:50070/data/apps/falcon/primaryCluster/staging/falcon/workflows/process/ra...
2017-02-24T20:59Z	primaryCluster	-	SUCCEEDED	latest	http://anafalcon0.test.com:50070/data/apps/falcon/primaryCluster/staging/falcon/workflows/process/ra...
2017-02-24T19:59Z	primaryCluster	-	SUCCEEDED	latest	http://anafalcon0.test.com:50070/data/apps/falcon/primaryCluster/staging/falcon/workflows/process/ra...
2017-02-24T18:59Z	primaryCluster	-	SUCCEEDED	latest	http://anafalcon0.test.com:50070/data/apps/falcon/primaryCluster/staging/falcon/workflows/process/ra...
2017-02-24T17:59Z	primaryCluster	-	SUCCEEDED	latest	http://anafalcon0.test.com:50070/data/apps/falcon/primaryCluster/staging/falcon/workflows/process/ra...


Additional Information:
Response: default/STATUS
Request Id: default/1477825851@qtp-1435229983-77 - 29845518-e2ac-4363-a9ab-9a0fea5f60e2

Appreciate any help! Thanks in advance!

1 ACCEPTED SOLUTION

avatar

Ok, turns out it's because the "availability flag" property is now mandatory and the old ingest script didn't generate "_success" to trigger the feed.

modified ingest.sh to generate the flag:

curl -sS http://bailando.sims.berkeley.edu/enron/enron_with_categories.tar.gz | tar xz && hadoop fs -mkdir -p $1 && hadoop fs -chmod 777 $1 && hadoop fs -put enron_with_categories/*/*.txt $1 && hadoop fs -touchz $1/_success

View solution in original post

1 REPLY 1

avatar

Ok, turns out it's because the "availability flag" property is now mandatory and the old ingest script didn't generate "_success" to trigger the feed.

modified ingest.sh to generate the flag:

curl -sS http://bailando.sims.berkeley.edu/enron/enron_with_categories.tar.gz | tar xz && hadoop fs -mkdir -p $1 && hadoop fs -chmod 777 $1 && hadoop fs -put enron_with_categories/*/*.txt $1 && hadoop fs -touchz $1/_success