Member since
12-13-2013
39
Posts
8
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3453 | 10-10-2017 08:05 PM |
12-11-2019
03:50 PM
Updating here myself for others' benefit: tracked it down to the fact that bloom filters are not supported in dynamic filters when joining to a Kudu table, only min/max filters, which are not much help when joining by many different ids/keys at a time. The relevant work to add support for bloom filters is in KUDU-2483 and IMPALA-3741 . Some contributors expressed interest in picking this up and I've heard Cloudera may have some people working on it soon, but nothing concrete at the moment.
... View more
10-01-2019
01:58 PM
Historically we have kept our daily aggregation tables in Parquet, and maintained them by doing INSERT INTO hourly, which means they are not true aggregations as there is up to 24 entries for a given dim combination in a full day. So we 'compact' them every now and then by re-aggregating entire monthly partitions, which is extremely resource intensive and time consuming. We are trying to switch some of these aggregations to Kudu, fully expecting that we could update existing records in place, but sadly it's not working for us and I'm hoping the community has some ideas. This is what we have tried: 1) UPSERTing with something like this: UPSERT INTO agg_table
SELECT
agged_facts.dimcol1,
agged_facts.dimcol2,
CAST(agged_facts.some_count + IFNULL(agg_table.some_count, 0) AS INT) some_count
FROM (
SELECT
IFNULL(dimcol1, 1) dimcol1,
IFNULL(dimcol2, '') dimcol2,
CAST(SUM(IFNULL(some_count, 0)) as INT) some_count,
FROM fact_table
WHERE ...
GROUP BY 1,2
) agged_facts
LEFT JOIN agg_table ON
agged_facts.dimcol1 = agg_table.dimcol2 AND
agged_facts.dimcol2 = agg_table.dimcol2 Which works fine initially, but as the table grows, the join becomes slower and slower (and uses more memory). We thought dynamic filtering would fix the issue, so changed the LEFT JOIN to agg_table to: LEFT JOIN (
SELECT *
FROM agg_table
WHERE
dimcol1 IN (
SELECT DISTINCT dimcol1
FROM fact_table fact
WHERE ...
) AND
dimcol2 IN (
SELECT DISTINCT dimcol2
FROM fact_table fact
WHERE ...
)
) agg_table ON But that only helped marginally, because sadly dynamic filtering against Kudu scans uses min/max filters, not Bloom filters, so it doesn't filter scan to specific values of dimcol1 and dimcol2, but rather to any in the range between the min and max values seen, which in our case is usually pretty much all of them. 2) tried to avoid joining altogether: UPSERT INTO agg_table
SELECT
agged_facts.dimcol1,
agged_facts.dimcol2,
CAST(agged_facts.some_count + IFNULL(agg_table.some_count, 0) AS INT) some_count
FROM (
SELECT
IFNULL(dimcol1, 1) dimcol1,
IFNULL(dimcol2, '') dimcol2,
CAST(SUM(IFNULL(some_count, 0)) as INT) some_count,
FROM fact_table
WHERE ...
GROUP BY 1,2
) agged_facts But that's incorrect syntax: can't sum columns from source and target tables 3) Do updates additively, and compact. Yes, like for Parquet aggs, but big difference is we don’t need to rewrite entire partitions/tablets, just the specific records with multiple entries, so much less resource intensive and can be done more often. To enable this we add a 'version' column so we can have multiple rows for same combination of dimension ids, and a 'delete' column to identify those ready for cleanup, so we'd end up with records like: dimcol1 dimcol2 version to_delete some_count 1 X 1 false 50 1 X 999 false 30 And after compaction aggregation would look like: dimcol1 dimcol2 version to_delete some_count 1 X 1 true 0 1 X 999 false 80 And finally we delete zeroed out records: dimcol1 dimcol2 version to_delete some_count 1 X 999 false 80 This works and we may go with it, BUT we can't figure out a way to make it entirely reliable, because a compaction query failure could update only one of the 2 records above, leaving data incorrect. If we adopt this it's only because we think due to ordering of operations, a failure would affect at most one entry per tablet and in our use case that may be OK.
... View more
Labels:
- Labels:
-
Apache Impala
-
Apache Kudu
07-29-2019
03:46 PM
Thanks very much Tim, I can confirm that it works like a charm, even with the group by, so yeah docs should be updated because that does add a lot of value vs. what was documented. P.S. I didn't get an email when you first replied, only yesterday with the latest ones. thanks for the quick response.
... View more
07-24-2019
04:06 PM
We have a slow query like: select max(partition_col_1) from some_table where partition_col_2 = 'x' and it's super slow, scanning all records (hundreds of billions) in the filtered partitions, even though it's actually not getting anything out of them... select only includes partitioning column. Absolutely no need to read any files I don't think. Any way or hint to get around this?
... View more
- Tags:
- impala
Labels:
- Labels:
-
Impala
04-18-2019
11:24 AM
1 Kudo
I just wanted to add to Todd's suggestion: also if you have CM, you can create a new chart with this query: "select total_kudu_on_disk_size_across_kudu_replicas where category=KUDU_TABLE", and it will plot all your table sizes, plus the graph detail will list current values for all entries. Probably not easily scriptable, but at least a way to quickly copy all sizes in one go, looking like this: 7.2T impala::<tablename_redacted> (Kudu) 9.8T impala::<tablename_redacted> (Kudu) 6.5T impala::<tablename_redacted> (Kudu) 4.1G impala::<tablename_redacted> (Kudu) 21.5G impala::<tablename_redacted> (Kudu) 15.2G impala::<tablename_redacted> (Kudu) 6.1T impala::<tablename_redacted> (Kudu) 98G impala::<tablename_redacted> (Kudu) 23.2G impala::<tablename_redacted> (Kudu) 10G impala::<tablename_redacted> (Kudu) 9.1G impala::<tablename_redacted> (Kudu) 1.2T impala::<tablename_redacted> (Kudu) 7.5G impala::<tablename_redacted> (Kudu) 2.6T impala::<tablename_redacted> (Kudu) 35.8T impala::<tablename_redacted> (Kudu)
... View more
12-13-2018
04:01 PM
For Impala, hbase, hdfs and yarn services, I can specify memory allocation in static service pools UI in Cloudera Manager. Not so for Kudu.
So do I manually under-allocate all the others to leave open whatever I want for Kudu, and then config Kudu's "Tablet Server Hard Memory Limit"?
CDH 5.15.1 CM 5.15.1
... View more
Labels:
- Labels:
-
Cloudera Manager
-
Kudu
06-12-2018
04:46 PM
Never mind my last comment: I was confused because the DISABLE_CODEGEN_ROWS_THRESHOLD setting @Tim Armstrong recommended was not yet documented, so tried using the closest thing I found (SCAN_NODE_CODEGEN_THRESHOLD) which wasn't applicable to our query. Turns out even though not yet documented, DISABLE_CODEGEN_ROWS_THRESHOLD is available and works as Tim suggested, in our CDH 5.13 cluster.
... View more
06-07-2018
10:05 AM
FYI @Tim Armstrong : sadly, setting SCAN_NODE_CODEGEN_THRESHOLD, to any value, had no effect, perhaps since as I mentioned above the slow codegen is NOT in a scan node but a TOP-N towards the end of processing. We are considering setting DISABLE_CODEGEN=false on the url for this connection alone (specific to user reports), though we'd need to watch carefully to make sure it doesn't make other reports slow. We'll probably also open a case with our EDH support to try to get to the bottom of why it's slow to begin with.
... View more
06-06-2018
09:32 AM
Thanks @Tim Armstrong. Hmm I can't find that option in the current docs, is it just undocumented? Or do you mean SCAN_NODE_CODEGEN_THRESHOLD ? because there is at least 1 node (from an often used dimension that will apply to most queries) where rows estimate is 2.6 million (though after filtering it becomes only a few). And also even if all scans are under 400K or whatever we set it to, will it help here considering the slow codegen is in a TOP-N step towards the end? Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail
...
03:SCAN HDFS 30 48.332ms 103.898ms 17 2.60M 10.93 MB 192.00 MB irdw_prod.media_dim md
... View more
06-05-2018
03:09 PM
Yeah we definitely wouldn't want to do globally. We tried to do set DISABLE_CODEGEN=true; right before our sql in the report but driver fails with a [Simba][JDBC](11300) A ResultSet was expected but not generated which is really sad, I had thought we could specify any of these hints right in the sql. Doing so in the jdbc url is not an option because same connection is shared by all of our thousands of reports, only 10% or so of which are affected by this. @Tim Armstrong I tried to guess your Cloudera email and sent you the profile directly.
... View more
06-05-2018
10:04 AM
Thanks, right I know I can do that but I'm hoping to figure out the root cause rather than paper over it. Plus it makes me nervous to do so for a whole class of queries/reports.. that doc page does say "... Do not otherwise run with this setting turned on, because it results in lower overall performance.
... View more
06-04-2018
11:57 AM
We recently enabled hdfs caching for two tables to try and speed up a whole class of queries that are very similar, generally following this pattern: SELECT x,y,z FROM ( SELECT x,y,z FROM table1 WHERE blah UNION ALL SELECT x,y,z FROM table2 WHERE blah ) x ORDER BY x DESC, y DESC LIMIT 20001 OFFSET 0 ... but we didn't get much runtime improvement. Digging in it looks like 80% of the time is spent on CodeGen: 5.25s, of that CompileTime: 1.67s and OptimizationTime: 3.51s (see profile fragment below for this sample run). With set DISABLE_CODEGEN=true query goes from ~6 seconds to ~1 second, but docs state this should not be used generally, so hesitant to add that in actual live production reports, and would rather want to understand root cause. Both tables are parquet, fully hdfs-cached. Both are wide-ish: 253 and 126 cols respectively, but inner queries project only 20 cols to the outer. CDH 5.13 / Impala 2.10. Happy to send full profile file by direct mail. Thanks in advance, -mauricio 78:MERGING-EXCHANGE 1 5s307ms 5s307ms 73 101 0 0 UNPARTITIONED 49:TOP-N 30 341.689us 880.634us 73 101 873.00 KB 39.28 KB 00:UNION 30 240.707us 3.190ms 73 1.61K 8.81 MB 0 ... F35:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | Per-Host Resources: mem-estimate=0B mem-reservation=0B PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B | 78:MERGING-EXCHANGE [UNPARTITIONED] | order by: action_date DESC, action_id ASC | limit: 101 | mem-estimate=0B mem-reservation=0B | tuple-ids=47 row-size=398B cardinality=101 | F34:PLAN FRAGMENT [RANDOM] hosts=18 instances=18 Per-Host Resources: mem-estimate=206.48MB mem-reservation=14.44MB 49:TOP-N [LIMIT=101] | order by: action_date DESC, action_id ASC | mem-estimate=39.28KB mem-reservation=0B | tuple-ids=47 row-size=398B cardinality=101 | 00:UNION ... >>>>> F34 Fragment for a sample node (all very similar): Hdfs split stats (<volume id>:<# splits>/<split lengths>): 8:1/38.32 MB Filter 4 arrival: 5s339ms AverageThreadTokens: 1.00 BloomFilterBytes: 3.0 MiB InactiveTotalTime: 0ns PeakMemoryUsage: 27.4 MiB PeakReservation: 14.4 MiB PeakUsedReservation: 0 B PerHostPeakMemUsage: 58.6 MiB RowsProduced: 1 TotalNetworkReceiveTime: 261.74us TotalNetworkSendTime: 313.68us TotalStorageWaitTime: 4.96us TotalThreadsInvoluntaryContextSwitches: 583 TotalThreadsTotalWallClockTime: 5.37s TotalThreadsSysTime: 53ms TotalThreadsUserTime: 5.20s TotalThreadsVoluntaryContextSwitches: 169 TotalTime: 5.43s >> Fragment Instance Lifecycle Timings (0ns) >> DataStreamSender (dst_id=78) (1ms) >> CodeGen (5.25s) CodegenTime: 26ms CompileTime: 1.67s <<<<<<<<<<<<< ???? InactiveTotalTime: 0ns LoadTime: 0ns ModuleBitcodeSize: 1.9 MiB NumFunctions: 729 NumInstructions: 35,078 OptimizationTime: 3.51s <<<<<<<<<<<<< ???? PeakMemoryUsage: 17.1 MiB PrepareTime: 66ms TotalTime: 5.25s >> SORT_NODE (id=49) (94ms) >> UNION_NODE (id=0) (93ms) >> HASH_JOIN_NODE (id=48) (9ms)
... View more
Labels:
- Labels:
-
Impala
05-22-2018
11:05 AM
Can anyone explain whatis RowBatchQueueGetWaitTime? I'm looking into a slow-ish query that is taking 2 to 3 seconds to do hdfs scan on most nodes and I don't see why it should take that long: 3 or so files per node only a couple K each, cached (and confirmed all read from cache). The only thing that looks odd is this metric. Here's a sample relevant profile fragment (about same for all executors): >>> HDFS_SCAN_NODE (id=0) (1.96s)
Hdfs split stats (<volume id>:<# splits>/<split lengths>): 18:1/69.15 KB 20:2/142.83 KB
ExecOption: PARQUET Codegen Enabled, Codegen enabled: 3 out of 3
Runtime filters: Not all filters arrived (arrived: [1], missing [0]), waited for 352ms
Hdfs Read Thread Concurrency Bucket: 0:100% 1:0% 2:0% 3:0% 4:0% 5:0% 6:0% 7:0% 8:0% 9:0% 10:0% 11:0% 12:0% 13:0% 14:0% 15:0%
File Formats: PARQUET/SNAPPY:156
AverageHdfsReadThreadConcurrency: 0.00
AverageScannerThreadConcurrency: 1.00
BytesRead: 228.0 KiB
BytesReadDataNodeCache: 228.0 KiB
BytesReadLocal: 228.0 KiB
BytesReadRemoteUnexpected: 0 B
BytesReadShortCircuit: 228.0 KiB
CachedFileHandle**bleep**Count: 0
CachedFileHandlesMissCount: 159
DecompressionTime: 188.47us
InactiveTotalTime: 0ns
MaxCompressedTextFileLength: 0 B
NumColumns: 52
NumDictFilteredRowGroups: 0
NumDisksAccessed: 0
NumRowGroups: 3
NumScannerThreadsStarted: 1
NumScannersWithNoReads: 0
NumStatsFilteredRowGroups: 0
PeakMemoryUsage: 499.3 KiB
PerReadThreadRawHdfsThroughput: 0 B/s
RemoteScanRanges: 0
RowBatchQueueGetWaitTime: 1.60s
RowBatchQueuePutWaitTime: 0ns
RowsRead: 426
RowsReturned: 2
RowsReturnedRate: 1 per second
ScanRangesComplete: 3
ScannerThreadsInvoluntaryContextSwitches: 8
ScannerThreadsTotalWallClockTime: 1.89s
MaterializeTupleTime(*): 16ms
ScannerThreadsSysTime: 10ms
ScannerThreadsUserTime: 73ms
ScannerThreadsVoluntaryContextSwitches: 393
TotalRawHdfsReadTime(*): 0ns
TotalReadThroughput: 88.4 KiB/s
TotalTime: 1.96s
>>> Filter 0 (1.00 MB) (0ns)
InactiveTotalTime: 0ns
Rows processed: 0
Rows rejected: 0
Rows total: 426
TotalTime: 0ns
>>> Filter 1 (1.00 MB) (0ns)
InactiveTotalTime: 0ns
Rows processed: 426
Rows rejected: 424
Rows total: 426
TotalTime: 0ns Thanks in advance! -m
... View more
Labels:
- Labels:
-
Impala
03-27-2018
01:57 PM
Thanks very much Tim for looking up the JIRA. Yikes, been open since 2014. As John pointed out there, column order info must be in the metastore since hive show create table displays fine, so seems like this should be a simple change to how impala reads that info. Upvoted the JIRA.
... View more
03-26-2018
07:21 PM
When I create an impala/hive table over an hbase table, the columns in Impala appear in alphabetical order instead of as defined. Not a blocker, but really annoying and might become an issue down the road. Anyone know what could be happening? We're on CDH 5.13, thanks. Defined in hbase with: create 'irdw_sandbox:date_dim', {NAME => 'mcf', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'true', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'PREFIX_TREE', COMPRESSION => 'SNAPPY', BLOCKCACHE => 'true', BLOCKSIZE => '65536'} Defined in beehive with: CREATE EXTERNAL TABLE irdw_sandbox.hbase_date_dim (
key STRING,
id INT,
sqldate TIMESTAMP,
year INT,
quarter_of_year INT,
month_of_year INT,
week_of_year INT,
day_of_year INT,
day_name STRING,
day_of_week INT,
day_of_month INT,
day_type STRING,
month_name STRING,
week_of_year_mtos INT
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" =
":key,mcf:id,mcf:sqldate,mcf:year,mcf:quarter_of_year,mcf:month_of_year,mcf:week_of_year,mcf:day_of_year,mcf:day_name,mcf:day_of_week,mcf:day_of_month,mcf:day_type,mcf:month_name,mcf:week_of_year_mtos"
)
TBLPROPERTIES("hbase.table.name" = "irdw_sandbox:date_dim") and a show create table in hive looks fine: CREATE EXTERNAL TABLE `hbase_date_dim`(
`key` string COMMENT 'from deserializer',
`id` int COMMENT 'from deserializer',
`sqldate` timestamp COMMENT 'from deserializer',
`year` int COMMENT 'from deserializer',
`quarter_of_year` int COMMENT 'from deserializer',
`month_of_year` int COMMENT 'from deserializer',
`week_of_year` int COMMENT 'from deserializer',
`day_of_year` int COMMENT 'from deserializer',
`day_name` string COMMENT 'from deserializer',
`day_of_week` int COMMENT 'from deserializer',
`day_of_month` int COMMENT 'from deserializer',
`day_type` string COMMENT 'from deserializer',
`month_name` string COMMENT 'from deserializer',
`week_of_year_mtos` int COMMENT 'from deserializer')
ROW FORMAT SERDE
'org.apache.hadoop.hive.hbase.HBaseSerDe'
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
'hbase.columns.mapping'=':key,mcf:id,mcf:sqldate,mcf:year,mcf:quarter_of_year,mcf:month_of_year,mcf:week_of_year,mcf:day_of_year,mcf:day_name,mcf:day_of_week,mcf:day_of_month,mcf:day_type,mcf:month_name,mcf:week_of_year_mtos',
'serialization.format'='1')
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='false',
'hbase.table.name'='irdw_sandbox:date_dim',
'numFiles'='0',
'numRows'='-1',
'rawDataSize'='-1',
'totalSize'='0', but a show create table in impala (after invalidate metadata to recognize the new table) doesn't: CREATE EXTERNAL TABLE irdw_sandbox.hbase_date_dim (
key STRING,
day_name STRING,
day_of_month INT,
day_of_week INT,
day_of_year INT,
day_type STRING,
id INT,
month_name STRING,
month_of_year INT,
quarter_of_year INT,
sqldate TIMESTAMP,
week_of_year INT,
week_of_year_mtos INT,
year INT
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping'=':key,mcf:id,mcf:sqldate,mcf:year,mcf:quarter_of_year,mcf:month_of_year,mcf:week_of_year,mcf:day_of_year,mcf:day_name,mcf:day_of_week,mcf:day_of_month,mcf:day_type,mcf:month_name,mcf:week_of_year_mtos', 'serialization.format'='1')
TBLPROPERTIES ('COLUMN_STATS_ACCURATE'='false', 'hbase.table.name'='irdw_sandbox:date_dim', 'numFiles'='0', 'numRows'='-1', 'rawDataSize'='-1', 'storage_handler'='org.apache.hadoop.hive.hbase.HBaseStorageHandler', 'totalSize'='0')
... View more
Labels:
- Labels:
-
HBase
01-01-2018
09:14 PM
So this query hangs the daemon but other queries run fine? Oh wow that's strange, I would expect it to fail with out-of-memory error or something. Do you have really low memory on the daemon, like under 16GB? You should review the daemon INFO log under /var/log/impalad and search for the query id, see how it progresses and where it gets stuck (you can compare entries vs. another query that runs fine). You'll see a lot of info about fragments being setup and distributed.
... View more
12-31-2017
05:38 PM
Well if you can't access the impala UI on that node then you have bigger problems than that query. Perhaps your impalad is hung? or maybe you have a firewall or network policy that is not allowing you to access that port? Could you first of all try restarting that impalad?
... View more
12-31-2017
05:33 PM
2 Kudos
For us it didn't appear to be any particular table having too many files or partitions, but rather the catalog tracking too many of them overall. So definitely compact the most fragmented ones to start with, but the goal is to lower total files. We use impala itself, doing an insert overwrite in place. This does result in a short outage as queries will fail for a few seconds on that table (if reading the same partitions being overwritten), so we schedule this late at night. For a typical table partitioned by event_date_yearmonth, and account_id_mod (i.e. account_id % 10), we typically will compact the current month (which has data coming in throughout the day so many new small files) with: insert overwrite sometable partition( event_date_yearmonth, and account_id_mod) select * from sometable where event_date_yearmonth = '201712' This will result in 1 file in each partition (or more if partition's data bigger than block size), and all account_id_mod partitions for event_date_yearmonth 201712 will be rewritten, while other months will not be touched. Notice I didn't specify partition values in the partition clause, so it's fully dynamic, and therefore the * works ( * returns all the schema columns AND the partition columns). Caution though: having 1 file per partition will decrease parallelism and query performance if common use case is to read a single partition at a time. If so, you can set PARQUET_FILE_SIZE before the insert, to create files in each partition smaller than the defaul 128m.
... View more
12-30-2017
05:38 PM
How are you submitting your query? If through impala-shell then you should see something like "Query progress can be monitored at: http://[coordinator_hostname]:25000/query_plan?query_id=984ed18511f4ae82:9ccc11c300000000 " and you could go there to see its progress. Or you can start impala-shell with --live_summary and see progress of each fragment in realtime. If through odbc/jdbc, and you're specifying a node directly (not through haproxy), then you can go directly here for that node: http:// [coordinator_hostname] :25000/queries and see any queries running there, even if for some reason they're not coming up in CM. https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_live_summary.html#live_summary
... View more
12-30-2017
03:48 PM
2 Kudos
We had this issue too (on different versions most recently with 5.10) and tried many things over a 6 month period: - Fiddled with catalog server config, such as -num_metadata_loading_threads=32 in the advanced config snippet. - Increased catalog memory to 24GB I think, but since not helpful ended up going back to 16GB - Refactored many of our jobs to drastically lower the number of refreshes and invalidate metadatas - Went from doing a daily compute stats on our tables to weekly on Saturdays. All was pretty much to no avail, then we noticed some tables were not being defragmented by our maintenance jobs and had upwards of 100K files (each!). We fixed that and started compacting others more aggressively, so our tables went from having over 2 million total files to about 1.4 That did the trick. No more long catalog metadata operations. Hope that helps.
... View more
11-14-2017
11:26 AM
We often need to restart a node to do some quick maintenance, such as reconfiguring a disk or changing an OS setting which requires a machine restart. Also, we use Impala not only for interactive user queries but also for many of our ETL job queries, and these slightly longer queries of course die if a single node processing fragments becomes unavailable, killing the corresponding job, exposing us to data corruption. Therefore we are always forced to pause all our jobs. I know we can gracefully decommission a node, but it can take hours to move all the dfs data out and then back in, so not worth it when trying to do a quick restart. So is there a way (via CM, shell or API) to tell an impalad to simply stop taking new fragments in preparation for a restart? (we can also easily remove from haproxy so it doesn't take new queries as coordinator). thanks!
... View more
Labels:
- Labels:
-
Cloudera Manager
-
Impala
10-10-2017
08:10 PM
Another option I forgot to mention: if your table is partitioned, and your insert query uses dynamic partitioning, it will generate 1 file per partition: insert into table2 partition(par1,par2) select col1, col2 .. colN, par1, par2 from table1; ... again up to the max parquet file size currently set, so you can play with that max to achieve 2 files per partition. https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_partitioning.html#partition_static_dynamic
... View more
10-10-2017
08:05 PM
You can do "set NUM_NODES=1" in your session (before your query), which will cause it to be processed in a single node (just in the coordinator). It will produce 1 file, up to the default max size of parquet files. You can do "set PARQUET_FILE_SIZE=XX" to fine-tune that max file size up or down until you get it split exactly into 2 files (it will take some trial and error because this is an upper bound - files are actually quite a bit smaller than the limit in my experience). But beware the docs state NUM_NODES is not for production use, especially on big tables, as it can put a lot of pressure on a single host and crash that impalad. https://www.cloudera.com/documentation/enterprise/latest/topics/impala_query_options.html -m
... View more
09-19-2017
05:48 PM
I see no way to specify a compression default in the create table statement, so I tried: SET COMPRESSION_CODEC=gzip;insert overwrite <text_table> select .. from <another_table> and got " Writing to compressed text table is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS to override." but ALLOW_UNSUPPORTED_FORMATS shouldn't be used according to docs. Is there a trick to having impala write text files compressed? -Mauricio
... View more
Labels:
- Labels:
-
Impala
04-14-2017
11:01 AM
1 Kudo
We spent almost 2 weeks diagnosing impalad and hbase RS crashes, and finally were able to figure out it was a spark job that had been changed and was failing sometimes with excessive GC issues. The failing executors/containers were using 2,000% cpu instead of 100% or less (spark job was launched with 1 core per executor). The box cpu usage was through the roof, even though CM didn't pick it up in the per-role graph (which has lines for NodeManager cpu_system_rate and cpu_user_rate, but doesn't pick up containers i guess?): Before the container died we were able to capture this which I guess means yarn killed it because of the excessive GC (or the jvm killed it? it was started with -XX:OnOutOfMemoryError=kill): Here's top showing > 2000% cpu on a java process: Which we can see it was the container for this executor, and shows specifically the --cores 1 argument: We're on CDH 5.10 and are using both static and dynamic pools. Cgroup I/O Weight for YARN NodeManager is 35%. Container Virtual CPU Cores is set to 12 (out of 32 cores total). For dynamic all pools are using DRF scheduler, and the pool this job runs in has 130 cores out of 336 total. So is this total breakdown of the resource allocation model a yarn bug or a limitation that one must be aware of and watch carefully?
... View more
11-15-2016
07:22 PM
I've opened IMPALA-4492. Thanks Tim. -m
... View more
11-08-2016
11:04 AM
Thanks Tim, yeah even just having the real-time start timestamp for each fragment/instance would be very helpful to isolate consistent slow worker nodes. Should I open a JIRA for that? Quick follow up question (last one promise), here's another weird instance of that same query which happens often: relatively small (by comparison) hdfs scan of 1MB and 86K rows, all local, all from cache, and still took 8 seconds vs. about the normal 400ms in all other nodes. Since disk is not a factor here, what else could be holding up the read from cache? cluster is barely loaded at 20% cpu.
... View more
11-04-2016
08:31 PM
I'm not sure where I go from here with this insight though... there are no actual timestamps in the fragments, only individual timings, so no idea how to find out which ones in which nodes started late. Any ideas on that, anyone? thanks, -m
... View more
11-04-2016
08:10 PM
Thanks so much Tim. I compared summaries out of profiles from a fast and a slow run of the (basically) same query. They are pretty much the same!! however I did notice in the timeline that the slow query took 6 seconds before 'ready to start fragments' and 'all fragments started'. So I guess there were some straggler nodes, but outside of fragment processing because none of the 'Max Time' values look high: FAST:
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail
----------------------------------------------------------------------------------------------------------------------------------
20:MERGING-EXCHANGE 1 99.260us 99.260us 7 10 0 -1.00 B UNPARTITIONED
10:SORT 22 677.665us 828.653us 7 10 24.02 MB 16.00 MB
19:AGGREGATE 22 201.337ms 244.372ms 7 10 2.28 MB 10.00 MB FINALIZE
18:EXCHANGE 22 20.432us 140.472us 28 10 0 0 HASH(CASE WHEN cld.platform...
09:AGGREGATE 22 219.170ms 287.685ms 28 10 1.98 MB 10.00 MB STREAMING
08:HASH JOIN 22 7.359ms 71.569ms 153.93K 851 2.04 MB 761.00 B INNER JOIN, PARTITIONED
|--17:EXCHANGE 22 7.094us 20.544us 7 761 0 0 HASH(dd.id)
| 04:SCAN HDFS 1 8.870ms 8.870ms 7 761 175.15 KB 32.00 MB irdw_prod.date_dim dd
16:EXCHANGE 22 1.589ms 8.964ms 183.28K 851 0 0 HASH(event_date_local_dim_id)
07:HASH JOIN 22 200.085ms 267.669ms 183.28K 851 12.04 MB 1.71 MB INNER JOIN, PARTITIONED
|--15:EXCHANGE 22 1.728ms 2.233ms 1.43M 1.43M 0 0 HASH(cld.id)
| 03:SCAN HDFS 2 11.642ms 12.940ms 1.43M 1.43M 11.59 MB 64.00 MB irdw_prod.client_dim cld
14:EXCHANGE 22 1.203ms 2.218ms 183.28K 851 0 0 HASH(fact.client_dim_id)
06:HASH JOIN 22 204.626ms 256.269ms 183.28K 851 6.03 MB 428.48 KB INNER JOIN, PARTITIONED
|--13:EXCHANGE 22 2.719ms 11.037ms 2.19M 2.19M 0 0 HASH(md.id)
| 02:SCAN HDFS 8 4.838ms 6.003ms 2.19M 2.19M 6.11 MB 40.00 MB irdw_prod.media_dim md
12:EXCHANGE 22 825.834us 4.673ms 183.28K 851 0 0 HASH(fact.media_dim_id)
05:HASH JOIN 22 199.360ms 253.233ms 183.28K 851 2.02 MB 18.00 B INNER JOIN, BROADCAST
|--11:EXCHANGE 22 8.630us 10.408us 3 2 0 0 BROADCAST
| 01:SCAN HDFS 1 23.969ms 23.969ms 3 2 181.51 KB 32.00 MB irdw_prod.campaign_dim cd
00:SCAN HDFS 22 814.857ms 1s106ms 183.28K 2.86M 3.18 MB 320.00 MB irdw_prod.agg_daily_activit...
Ready to start 122 remote fragments: 24,632,776
All 122 remote fragments started: 40,539,024
First dynamic filter received: 523,467,712
Rows available: 1,742,258,728
SLOW:
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail
---------------------------------------------------------------------------------------------------------------------------------
20:MERGING-EXCHANGE 1 115.304us 115.304us 4 10 0 -1.00 B UNPARTITIONED
10:SORT 22 711.942us 971.640us 4 10 24.02 MB 16.00 MB
19:AGGREGATE 22 224.158ms 356.244ms 4 10 2.28 MB 10.00 MB FINALIZE
18:EXCHANGE 22 17.424us 116.992us 24 10 0 0 HASH(CASE WHEN cld.platform...
09:AGGREGATE 22 239.932ms 376.739ms 24 10 1.98 MB 10.00 MB STREAMING
08:HASH JOIN 22 9.258ms 126.508ms 7.00K 1.27K 2.04 MB 761.00 B INNER JOIN, PARTITIONED
|--17:EXCHANGE 22 6.449us 14.136us 7 761 0 0 HASH(dd.id)
| 04:SCAN HDFS 1 31.094ms 31.094ms 7 761 175.15 KB 32.00 MB irdw_prod.date_dim dd
16:EXCHANGE 22 313.646us 762.564us 24.74K 1.27K 0 0 HASH(event_date_local_dim_id)
07:HASH JOIN 22 222.134ms 336.441ms 24.74K 1.27K 12.04 MB 1.71 MB INNER JOIN, PARTITIONED
|--15:EXCHANGE 22 2.364ms 3.331ms 1.43M 1.43M 0 0 HASH(cld.id)
| 03:SCAN HDFS 2 17.363ms 21.651ms 1.43M 1.43M 11.66 MB 64.00 MB irdw_prod.client_dim cld
14:EXCHANGE 22 319.401us 541.207us 24.74K 1.27K 0 0 HASH(fact.client_dim_id)
06:HASH JOIN 22 238.946ms 399.160ms 24.74K 1.27K 6.03 MB 428.04 KB INNER JOIN, PARTITIONED
|--13:EXCHANGE 22 2.509ms 3.938ms 2.19M 2.19M 0 0 HASH(md.id)
| 02:SCAN HDFS 7 14.627ms 28.996ms 2.19M 2.19M 3.27 MB 48.00 MB irdw_prod.media_dim md
12:EXCHANGE 22 265.672us 600.188us 24.74K 1.27K 0 0 HASH(fact.media_dim_id)
05:HASH JOIN 22 220.025ms 363.591ms 24.74K 1.27K 2.02 MB 18.00 B INNER JOIN, BROADCAST
|--11:EXCHANGE 22 12.656us 17.408us 2 2 0 0 BROADCAST
| 01:SCAN HDFS 1 10.060ms 10.060ms 2 2 181.48 KB 32.00 MB irdw_prod.campaign_dim cd
00:SCAN HDFS 22 551.595ms 1s062ms 24.74K 4.26M 2.59 MB 320.00 MB irdw_prod.agg_daily_activit...
Ready to start 121 remote fragments: 36,909,268
All 121 remote fragments started: 6,567,144,968
First dynamic filter received: 6,567,170,788
Rows available: 8,395,137,540
... View more