Accelerate skewed joins

How to “re-parallelize” skewed joins

Case description

Assume that we have 1M customers, 4M transactions and our top customer produce the 2.5% of all transactions.Others produce the remaining 97.5% of transactions approx. evenly.
Scroll down to the bottom of the post for sample table and data generator SQL.

Our task is to join a “Customer” and a “Transaction” tables on Customer_id.

The join

SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
GROUP BY 1;


We experience a pretty slow execution.
On the ViewPoint we see that only one AMP is working, while others are not.

What is the problem?
There are two  separate subsets of the Transact table from “joinability” point of view:

  • “Peak” part (records of top customer(s))
    Very few customers have very much Transact records. Product join would be cost effective
  • “Even” part (records of other customers)
    Much customers have much, but specifically evenly few Transact records. Merge join would be ideal.

Unfortunately Optimizer have to decide, only one operation type can be chosen. It will choose merge join which consumes far less CPU time.

Execution plan looks like this:

 This query is optimized using type 2 profile T2_Linux64, profileid 21.
  1) First, we lock a distinct D_DB_TMP.”pseudo table” for read on a
     RowHash to prevent global deadlock for D_DB_TMP.t.
  2) Next, we lock a distinct D_DB_TMP.”pseudo table” for read on a
     RowHash to prevent global deadlock for D_DB_TMP.c.
  3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
  4) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
     all-rows scan with a condition of (“NOT (D_DB_TMP.t.Customer_ID IS
     NULL)”) into Spool 4 (all_amps), which is redistributed by the
     hash code of (D_DB_TMP.t.Customer_ID) to all AMPs.  Then we do a
     SORT to order Spool 4 by row hash.  The size of Spool 4 is
     estimated with low confidence to be 125 rows (2,125 bytes).  The
     estimated time for this step is 0.01 seconds.
  5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of a
     RowHash match scan, which is joined to D_DB_TMP.c by way of a
     RowHash match scan.  Spool 4 and D_DB_TMP.c are joined using a
     merge join, with a join condition of (“D_DB_TMP.c.Customer_ID =
     Customer_ID”).  The result goes into Spool 3 (all_amps), which is
     built locally on the AMPs.  The size of Spool 3 is estimated with
     index join confidence to be 125 rows (10,375 bytes).  The
     estimated time for this step is 0.02 seconds.
  6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
     way of an all-rows scan , grouping by field1 (
     D_DB_TMP.c.Customer_name).  Aggregate Intermediate Results are
     computed globally, then placed in Spool 5.  The size of Spool 5 is
     estimated with no confidence to be 94 rows (14,758 bytes).  The
     estimated time for this step is 0.02 seconds.
  7) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by way of
     an all-rows scan into Spool 1 (all_amps), which is built locally
     on the AMPs.  The size of Spool 1 is estimated with no confidence
     to be 94 rows (8,742 bytes).  The estimated time for this step is
     0.02 seconds.
  8) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 0.07 seconds.

How to identify

If you experience extremely asymmetric AMP load you can suspect on this case.
Find highly skewed JOIN steps in the DBQL (set all logging options on):

select top 50
a.MaxAMPCPUTime * (hashamp()+1) / nullifzero(a.CPUTime) Skw,a.CPUTime,a.MaxAMPCPUTime * (hashamp()+1) CoveringCPUTime,
b.*
from dbc.dbqlsteptbl a
join dbc.dbqlogtbl b on a.procid=b.procid and a.queryid=b.queryid
where
StepName='JIN'
and CPUtime > 100
and Skw > 2
order by CoveringCPUTime desc;




(Note: Covering CPU time is <No-of-AMPs> * <Max AMP’s CPU time>. Virtually this amount of CPU is consumed because asymmetric load of the system)

Or if you suspect a specific query, check the demography of the join field(s) in the “big” table:

SELECT TOP 100 <Join_field>, count(*) Nbr
FROM <Big_table> GROUP BY 1 ORDER BY 2 DESC;


If the top occurences are spectacularly larger than others (or than average) the idea likely matches.

Solution

Break the query into two parts: join the top customer(s) separately, and then all others. Finally union the results. (Sometimes additional modification also required if the embedding operation(s) – the group by here – is/are not decomposable on the same parameter.)
First we have to identify the top customer(s):

SELECT TOP 5 Customer_id, count(*) Nbr
FROM Transact GROUP BY 1 ORDER BY 2 DESC;

Customer_id          Nbr
——————————
          345       100004
     499873                4
     677423                4
     187236                4
       23482                4
     
Replace the original query with his one:

SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
where t.Customer_id in (345)  

/*
   ID of the top Customer(s). 
   If more customers are salient, list them, but max ~5
*/
GROUP BY 1
UNION ALL
SELECT Customer_name, count(*)
FROM Customer c
JOIN Transact t ON c.Customer_id = t.Customer_id
where t.Customer_id not in (345)  -- Same customer(s)
GROUP BY 1
;

Be sure that Customer.Customer_id, Transact.Transact_id and Transact.Customer_id have statistics!

Rhis query is more complex, has more steps, scans Transact table 2 times, but runs much faster, you can check it.
But why? And how to determine which “top” customers worth to be handled separately?
Read ahead.

Explanation

Calculation

Let’s do some maths:
Assume that we are on a 125 AMP system.
Customer table contains 1M records with unique ID.
We have ~4.1M records in the Transact table, 100k for the top customer (ID=345), and 4 for each other customers. This matches the 2.5% we assumed above.

If the  Transact table is redistributed on hash(Customer_id) then we will get ~33k records on each AMPs, excluding AMP(hash(345)). Here we’ll get ~133k (33k + 100K).
That means that this AMP will process ~4x more data than others, therefore runs 4x longer.
With other words in 75% of this JOIN step’s time 124 AMPs will DO NOTHING with the query.

Moreover the preparation and subsequent steps are problematic also: the JOIN is prepared by a redistribution which produces a strongly skewed spool, and the JOIN’s result stays locally on the AMPs being skewed also.

Optimized version

This query will consume moderately more CPU, but it is distributed evenly across the AMPs, utilizing the Teradata’s full parallel capability.
It contains a product join also, but is it no problem it joins 1 records to the selected 100k records of Transacts, that will be lightning fast.

All

Look at the execution plan of the broken-up query:

 This query is optimized using type 2 profile T2_Linux64, profileid 21.
  1) First, we lock a distinct D_DB_TMP.”pseudo table” for read on a
     RowHash to prevent global deadlock for D_DB_TMP.t.
  2) Next, we lock a distinct D_DB_TMP.”pseudo table” for read on a
     RowHash to prevent global deadlock for D_DB_TMP.c.
  3) We lock D_DB_TMP.t for read, and we lock D_DB_TMP.c for read.
  4) We do a single-AMP RETRIEVE step from D_DB_TMP.c by way of the
     unique primary index “D_DB_TMP.c.Customer_ID = 345” with no
     residual conditions into Spool 4 (all_amps), which is duplicated
     on all AMPs.  The size of Spool 4 is estimated with high
     confidence to be 125 rows (10,625 bytes).  The estimated time for
     this step is 0.01 seconds.
  5) We do an all-AMPs JOIN step from Spool 4 (Last Use) by way of an
     all-rows scan, which is joined to D_DB_TMP.t by way of an all-rows
     scan with a condition of (“D_DB_TMP.t.Customer_ID = 345”).  Spool
     4 and D_DB_TMP.t are joined using a product join, with a join
     condition of (“Customer_ID = D_DB_TMP.t.Customer_ID”).  The result
     goes into Spool 3 (all_amps), which is built locally on the AMPs.
     The size of Spool 3 is estimated with low confidence to be 99,670
     rows (8,272,610 bytes).  The estimated time for this step is 0.09
     seconds.
  6) We do an all-AMPs SUM step to aggregate from Spool 3 (Last Use) by
     way of an all-rows scan , grouping by field1 (
     D_DB_TMP.c.Customer_name).  Aggregate Intermediate Results are
     computed globally, then placed in Spool 5.  The size of Spool 5 is
     estimated with no confidence to be 74,753 rows (11,736,221 bytes).
     The estimated time for this step is 0.20 seconds.
  7) We execute the following steps in parallel.
       1) We do an all-AMPs RETRIEVE step from Spool 5 (Last Use) by
          way of an all-rows scan into Spool 1 (all_amps), which is
          built locally on the AMPs.  The size of Spool 1 is estimated
          with no confidence to be 74,753 rows (22,052,135 bytes).  The
          estimated time for this step is 0.02 seconds.
       2) We do an all-AMPs RETRIEVE step from D_DB_TMP.t by way of an
          all-rows scan with a condition of (“D_DB_TMP.t.Customer_ID <>
          3454″) into Spool 9 (all_amps), which is redistributed by the
          hash code of (D_DB_TMP.t.Customer_ID) to all AMPs.  The size
          of Spool 9 is estimated with high confidence to be 4,294,230
          rows (73,001,910 bytes).  The estimated time for this step is
          1.80 seconds.
  8) We do an all-AMPs JOIN step from D_DB_TMP.c by way of an all-rows
     scan with a condition of (“D_DB_TMP.c.Customer_ID <> 3454”), which
     is joined to Spool 9 (Last Use) by way of an all-rows scan.
     D_DB_TMP.c and Spool 9 are joined using a single partition hash
     join, with a join condition of (“D_DB_TMP.c.Customer_ID =
     Customer_ID”).  The result goes into Spool 8 (all_amps), which is
     built locally on the AMPs.  The size of Spool 8 is estimated with
     low confidence to be 4,294,230 rows (356,421,090 bytes).  The
     estimated time for this step is 0.72 seconds.
  9) We do an all-AMPs SUM step to aggregate from Spool 8 (Last Use) by
     way of an all-rows scan , grouping by field1 (
     D_DB_TMP.c.Customer_name).  Aggregate Intermediate Results are
     computed globally, then placed in Spool 10.  The size of Spool 10
     is estimated with no confidence to be 3,220,673 rows (505,645,661
     bytes).  The estimated time for this step is 8.46 seconds.
 10) We do an all-AMPs RETRIEVE step from Spool 10 (Last Use) by way of
     an all-rows scan into Spool 1 (all_amps), which is built locally
     on the AMPs.  The size of Spool 1 is estimated with no confidence
     to be 3,295,426 rows (972,150,670 bytes).  The estimated time for
     this step is 0.32 seconds.
 11) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 11.60 seconds.

Sample structures

The table structures (simplified for the example):

CREATE TABLE Customer
(
  Customer_ID   INTEGER
, Customer_name VARCHAR(200)
)
UNIQUE PRIMARY INDEX (Customer_id)
;

insert into Customer values (1,'Cust-1');
Run 20x: 

insert into Customer select mx + sum(1) over (order by Customer_id rows unbounded preceding) id, 'Cust-' || trim(id) from Customer cross join (select max(Customer_id) mx from Customer) x;

collect statistics using sample on customer column (Customer_id);

CREATE TABLE Transact
(
  Transaction_ID   INTEGER
, Customer_ID      INTEGER
)
UNIQUE PRIMARY INDEX (Transaction_id)
;

insert into Transact values (1,1);
Run 22x: 

insert into Transact select mx + sum(1) over (order by Transaction_id rows unbounded preceding) id, id mod 1000000 from Transact cross join (select max(Transaction_id) mx from Transact) x;

insert into Transact select mx + sum(1) over (order by Transaction_id rows unbounded preceding) id, 345 from Transact t cross join (select max(Transaction_id) mx from Transact) x where t.Transaction_id < 100000;

collect statistics using sample on Transact column (Customer_id);
collect statistics using sample on Transact column (Transaction_id) ;

Teradata Compress optimization

Teradata Compress optimization
Techniques and effects

What is Multi Value Compression (MVC)?

Teradata RDBMS supports a nice feature: multi value compression (aka. Teradata Compress). It enables to reduce the storage space allocated by the tables in the database, while – this is incredible – processing compressed data usually requires less resources (CPU and I/O) than the uncompressed.
The feature needs no additional licence or hardware components.

How does Teradata Compress work?

I give a short summary, if you are interested in the details please refer to Teradata documentation.

MVC can be defined in CREATE TABLE DDL or later added/modified by ALTER TABLE statements. User must define a 1..255 element list of values for each compressable columns. Those will be stored as compressed value, while others will be uncompressed.
If a column is compressed, each row has an additional area of 1..8 bits allocated (if N value is listed: upper(log2(N)) bits will be allocated). One of bit combinations means that the value is uncompressed (and allocates its corresponding space within the row layout), but all others mean compressed value, which will not allocate the value’s place in the row.
The compress bits are allocated in every rows regardless the actual value is compressed or not.
Compress bits are “compacted”, eg.: 3 + 8 + 4 = 15 compress bits will allocate 2 bytes with only 1 wasted bit instead of 3 byte aligned values.
The value belonging to each bit combinations are stored in the table header.

Multi Value Compression is:

  • Column level
    Have to be defined on each applicable columns of a table separately
  • “Manual”
    You have to calculate which values are worth to compress – Teradata gives no automatism
  • Static
    Once you defined the values it will not adapt to the changing conditions by itself

It is obvious that the current optimal settings of the compression depends on the data demography and the applied data types. Optimal setting may be different later, when data demography may be different.

Summary of most important properties of MVC once again:

  • Can be defined in the CREATE TABLE statement
  • Can be applied or modified later in an ALTER TABLE statement
  • Must be set on COLUMN level
  • Optimal value list must be calculated by you
  • Optimal settings may change in time. Optimize regularly.

Storage effects

Using MVC tables will allocate less PERM space, as can be calculated – simple.
What about the increment?
The table sizes usually grow along the time as more and more data is generated. The change in growth speed depands on the volatility of data demography. If it is stable then the growth speed will drop by the rate of compression. If typical values change in time than growth will not drop, or may speed up in extreme cases. However theese cases are when regular optimization is neccessary.

The growth look like this in stable demography cases:

Performance effects

It is a key question – what have to be payed for less storage

It is obvious that compression process requires resources during both compress and decompress phase.However there are processing gains also, which usually dominate the costs. How?

Compressed table will reside in proportionally less data blocks, therefore data fetching requires less I/O operations. In addition moving data in-memory (during processing) requires less CPU cycles.
While SELECTing table data usually small fragment of the row is used, and not used coulmns will not be decompressed.
Caching is a CPU intensive operation also, which is more effective if less data blocks are processed.
Compression helps tables to be treated as “small enough to cache 100% into memory”, which results more effective execution plans.

Summary:

  • INSERT into a compressed table usually consume more CPU by 10..50% (only final step!)
  • SELECT usually cost no more, or less CPU than at uncompressed tables
  • SELECT and INSERT usually cost proportionally less I/O like the compression ratio
  • System level CPU and I/O usage usually drops by 5..10% (!) when compressing the medium and big tables of the system (caused by more effective caching)

How to set MVC?

Setting up the MVC compression on a single table should consist of the following 4 steps:

  1. Analyze the data demography of each compressible columns of the table *1.
  2. Calculate the optimal compress settings for the columns. Notice that
    •   Optimum should be calculated not on separated columns, but on table level, since compress bits are packed into whole bytes.
    •   The more values are listed as compressed, the more overhead is on compress. Proper mathematical formula is to be used for calculating the optimum. *2
    •   Take care of the exceptions: PI / FK / etc.columns and some data types are not compressible (varies in different Teradata versions).
  3. Assemble the corresponding scripts
    CREATE TABLE DDL + INSERT SELECT + RENAME / ALTER TABLE DDL
  4. Implement the compression by running the script
    Concern to take good care of data protection like: backups, locking, documenting.

*1 Simplified sample: 
     select top 256 <columnX> , count(*), avg(<length(columnX)>) from <table> group by 1 order by 2 desc; for each columns
*2 About the algorithm: It is a maximum-seeking function (n) based on the expression of gains when specific TOP {(2^n)-1} frequent values are compressed. The expression is far more complex to discuss here because different datatypes, exceptions and internal storing constructions.

 One time or regular?

Optimal MVC setting is valid for a specific point in time, since your data changes along your business. The daily change is usually negligible, but it accumulates.
Practice shows that it is worth to review compress settings every 3..6 months, and continually optimize new tables, couple of weeks after coming into production.

Estimate how much space and processing capacity is lost if compress optimization is neglected!

Solution in practice

There are “magic excels” on the net, which can calculate the optimal settings if you load the data demography, but it requires lots of manual work in addition (Running the calculations, DDL assembling, transformation script writing, testing, etc.)

If you want a really simple solution, try PRISE Compress Wizard , that supplies a comprehensive solution:

  • Assists to collect good candidate tables to compress
  • Automatically analyses the tables, and gives feedback:
    • How much space can be saved by compress
    • What is the current compress ratio (if there is compress already applied)
    • How much resources were used for analysis
    • What is the optimal structure
  • Generates transforming script (+ checks, lock, logging) along with
    • Backup (arcmain)
    • Revert process (for safety and documentation)
    • Reverse engineering (for E/R documentation update)
  • Log implementation
    •  Reflect achieved space saving: success measurement
    •  Report used CPU and I/O resources for transformation

Curing slow INSERTs and CREATE TABLEs I.

Eliminating hash collisions

Case description

We have an INSERT or CREATE TABLE operation that runs unreasonably long time compared to the affected number of rows, in spite the table is not skewed.

What is hash collision?

Hash collision is when two or more records in a table have the same hash value.

SET type of tables ensure that there are no more records with exactly the same record content within a table. How does Teradata do it?

Teradata stores the records in a hash filesystem, where each record has a hash value calculated from the Primary Index (PI) value. If the PI values are the same in more records, they will surely have the same hash value either.

When INSERTING a record, Teradata has to compare the new record to the table’s only those records that have the same hash value that new record has, since all records with different hash value will surely differ at least at the PI columns.
If we have to INSERT N records with the same hash value into an empty table, Teradata has to do N*(N-1)/2 times – very CPU demanding – full record comparisons.

How to identify

Hash collisions can be easily found by using PRISE Tuning Assistant tool also, or follow this method:

DBQL filtering for qualifying queries:
The Merge (MRG) phase of the INSERT/CREATE TABLE operation consumes lot of CPU.
Look for high CPU consuming ‘MRG’ steps in the dbc.DBQLStepTbl:

sel a.cputime,a.MaxAmpCPUTime * (hashamp() +1) CoveringCPUTIme,  a.stepname,a.RowCount,b.* from
     dbc.DBQLStepTbl a
join dbc.DBQLogTbl   b on a.ProcId=b.ProcId and a.QueryId=b.QueryId
where
    a.StepName in ('MRG' /*, 'MRU' for UPDATEs also*/)
and a.CPUTime > 100 /* Performance boost: eliminates most of the records (small cpu seconds) at low processing cost. Adapt number to your site */
qualify sum(1) over (order by a.cputime desc rows unbounded preceding) <= 100;

 

At a specific SQL statement (INSERT or CREATE TABLE) you have to check your PI for level of hash collisions (number of records where the hash values are the same) in the target table.

How to make sure that the hash-collision is the reason? Let the target table be TableA, with primary index: ColA,ColB,ColC (can be any number of columns in practice)

select top 100 hashrow(ColA,ColB,ColC), count(*) from TableA group by 1 order by 2 desc;


The top row(s) will show the most frequent hash values. Count values >>1 mean significant hash collisions in the order of N * N. Each high frequency hash value will generate a hash-collision group causing comparisons in the order of N*N.

If the table still not exists, embed the producing “SELECT” statement into the script above, and count those field values that would get to the PI columns.

Explanation

If we use “SET” type of table (this is the default setting), Teradata ensures that there will be no perfectly alike records in the table. This can be ensured by comparing the inserted/updated record with the existing ones.
Teradata’s “hash filesystem” gives a very effective trick: only those records must be compared, whose RowID (hash) equals, otherwise at least the PI fields must differ.
If we’ve chosen the Primary Index for UNIQUE, or non-UNIQUE, but on field(s) that are almost unique, then the “SET comparison”
restricts to zero or one records in most cases.

Solution

For good solution unfortunately we have to modify the table structure.

  • Option 1: Change table type to MULTISET. This will eliminate duplication checks, but its disadvantage is the same. If the process falls back on the de-duplication of SET table, you have to replace it with programmed de-duplication (group by, left join…).
  • Option2: Change the table’s PI to a unique or nearly unique column set. Be prudent, consider the workload also. (joins, where conditions, group by expressions, etc.)

Summary

Tables with strongly non unique PI are highly dangered for hash collision slowed INSERTs/CREATE TABLEs (or UPDATEs), even if they are not skewed. Use “more unique” PI, or MULTISET table.

Typical mistake: if a CREATE TABLE … as SELECT… lacks the PRIMARY INDEX() section. In this case Teradata chooses the first column as PI, which often causes terrible performance.

What’s next

Next post will discuss Multi Value Compress (MVC) optimization.