Support Questions
Find answers, ask questions, and share your expertise

Airflow HiveOperator get results and execute in for loop

Highlighted

Airflow HiveOperator get results and execute in for loop

New Contributor

I have airflow installed and running,

I am facing 2 issues that I cannot find out a solution.

1) how to get (not sure if its possible) the result from the HiveOperator, so to say, the result of the SQL query passed to hive.

2) the Hive operator here is called in a for loop that has a list of SQL commands to be executed. The problem is that only the fist command is taking real action and do what is supposed to do (e.g. delete some rows in a table). Each sql cmd operates in a different table but always in the same schema. It looks like when the cmd1 is launched, hive is busy and the second command is executed in the mean time and is like being ignored.

How could I solve this situations?

Many thanks

<code># -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow import DAG
from airflow.operators import HiveOperator
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

<code>commands = ['cmd1','cmd2','...']
dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

for sqlcmd in commands:

        ret = HiveOperator(
            task_id='id_taskX',
            hiveconf_jinja_translate=True,
            hql=sqlcmd,
            trigger_rule='all_done',
            dag=dag
        )
1 REPLY 1

Re: Airflow HiveOperator get results and execute in for loop

Rising Star

I just starting playing around with airflow to orchestrate some Hive jobs so, not an expert. However, I would say you should use an "insert overwrite directory 'some/dir' select ..." type of query to land the results in some files then you could use another task in the DAG to read that from HDFS, maybe with an "hdfs dfs -get" or webhdfs REST call to wherever you need it or continue to process it further.