Consume() and chunk shuffling


#1

Hello,

Does consume operator shuffles the output chunks as store operator (via the hash function)?
or cosume just triggers computation?

My question has arised from this post Does store() redistributes chunks among cluster nodes?

Does regrid behaves like xgrid (no chunk shuffling) if I reduce an array resolution twice (in this case the number of chunks will not change, right)?

Also, after applying xgrid and regrid does SciDB choose chunk size for the new array automatically (e.g. equal to the new chunk size after 2x increase/descrease)?
Or does it rechunk the new array in order to make the chunk size equal to the chunk size of the old array?

Is it possible to see the hash function that is used by SciDB for chunk placement?

Thank you!


#2

Hey Raro,

I’ll give you a more general answer that should help you answer many such questions. The best source for this kind of information is the query plan. You can get the query plan from the scidb.log file or by running _explain_physical. The _explain_physical operator will return a string for the query plan without running the query. Example:

iquery -aq "create array foo <val:double>[i=1:10:0:5,j=1:10:0:5]"
iquery -anq "store(build(foo, random()), foo)"

$ iquery -aq "_explain_physical('store(regrid(foo, 2,3,sum(val)),bar)', 'afl')"
{No} physical_plan
{0} '[pPlan]:
>[pNode] physicalStore ddl 0 tile 0 children 1
  schema public.bar<val_sum:double> [i=1:5 (1:5):0:5; j=1:4 (1:4):0:5] ArrayId: 0 UnversionedArrayId: 0 Version: 0 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3] <val_sum:double,EmptyTag:indicator NOT NULL>
  output full chunks: yes
  changes dstribution: no
  props sgm 1 sgo 1
  diout dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3]
  bound start {1, 1} end {5, 4} density 1 cells 20 chunks 1 est_bytes 982
>>[pNode] physical_regrid ddl 0 tile 0 children 1
   schema public.foo@1<val_sum:double> [ipublic.foo=1:5 (1:5):0:5; jpublic.foo=1:4 (1:4):0:5] ArrayId: 0 UnversionedArrayId: 0 Version: 0 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3] <val_sum:double,EmptyTag:indicator NOT NULL>
   output full chunks: yes
   changes dstribution: yes
   props sgm 1 sgo 1
   diout dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3]
   bound start {1, 1} end {5, 4} density 1 cells 20 chunks 1 est_bytes 982
>>>[pNode] physicalScan ddl 0 tile 1 children 0
    schema public.foo@1<val:double> [ipublic.foo=1:10 (1:10):0:5; jpublic.foo=1:10 (1:10):0:5] ArrayId: 19743 UnversionedArrayId: 19742 Version: 1 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3] <val:double,EmptyTag:indicator NOT NULL>
    output full chunks: yes
    changes dstribution: no
    props sgm 1 sgo 1
    diout dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3]
    bound start {1, 1} end {10, 10} density 1 cells 100 chunks 4 est_bytes 4908
'

You are essentially looking at a tree (in this case a stack) of operators, one feeding into another. By looking at the physical_regrid node you can see the schema it outputs actually still has chunks that are 5x5. You can also see that regrid is said to “change distribution” and the output distribution is hash. So, regrid moves data around as part of its implementation but the output is guaranteed to be distributed according to the hash function. No extra data movement is needed between regrid and store.

Here’s another example with subarray:

iquery -aq "_explain_physical('store(subarray(foo, 0,2,4,7),bar)', 'afl')"
{No} physical_plan
{0} '[pPlan]:
>[pNode] impl_sg ddl 0 tile 0 children 1
  schema public.bar<val:double> [i=0:3 (0:3):0:5; j=0:5 (0:5):0:5] ArrayId: 0 UnversionedArrayId: 0 Version: 0 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3] <val:double,EmptyTag:indicator NOT NULL>
  output full chunks: yes
  changes dstribution: no
  props sgm 1 sgo 1
  diout dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3]
  bound start {0, 0} end {3, 5} density 1 cells 24 chunks 2 est_bytes 1180
>>[pNode] physicalSubArray ddl 0 tile 0 children 1
   schema public.foo@1<val:double> [ipublic.foo=0:3 (0:3):0:5; jpublic.foo=0:5 (0:5):0:5] ArrayId: 0 UnversionedArrayId: 0 Version: 0 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {0, 1} shift: 0 res: [0, 1, 2, 3] <val:double,EmptyTag:indicator NOT NULL>
   output full chunks: no
   changes dstribution: yes
   props sgm 1 sgo 1
   diout dist: hash ps: 1 ctx:  redun: 0 off: {0, 1} shift: 0 res: [0, 1, 2, 3]
   bound start {0, 0} end {3, 5} density 1 cells 24 chunks 2 est_bytes 1180
>>>[pNode] physicalScan ddl 0 tile 1 children 0
    schema public.foo@1<val:double> [ipublic.foo=1:10 (1:10):0:5; jpublic.foo=1:10 (1:10):0:5] ArrayId: 19743 UnversionedArrayId: 19742 Version: 1 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3] <val:double,EmptyTag:indicator NOT NULL>
    output full chunks: yes
    changes dstribution: no
    props sgm 1 sgo 1
    diout dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 2, 3]
    bound start {1, 1} end {10, 10} density 1 cells 100 chunks 4 est_bytes 4908
'

In this case we see the subarray node crucially does not output full chunks - which means that pieces of the same chunk could be located on different instances after subarray finishes. That must be corrected before the data is stored. So we see there is no longer a store node but an impl_sg node at the top of the plan. sg stands for “scatter gather” and we call this case a storing sg - it moves data around and saves it to a persistent array all in one shot.

Another factoid: you can override what regrid output chunk sizes are. Try this:

iquery -aq "_explain_physical('store(regrid(foo, 5,5, sum(val), 1,1), bar)', 'afl')"

If you want to look at the hash function, search the source for getPrimaryChunkLocation or getInstanceForChunk and go from there.


#3

Hello,

Thank you very much, this is a good answer.
However, I still have questions regarding the query plan.
I would like to thank you in advance for your time and patience!

How do I know which instances receive what chunks during data redistribution?
Is it possible to turn off chunk shuffling?

How can I learn the meaning of the _explain_physical output?
Is it somehow documented?

In particular:

see that regrid is said to “change distribution” and the output distribution is hash.

what line tells that the distribution is hash? is it diout dist: hash ps: 1 ctx:? what other symbols mean here?

ipublic.foo=1:5 (1:5):0:5;

I presume 1:5 is the index range but what does (1:5) mean?

That must be corrected before the data is stored

What do you mean by corrected?
What is wrong there?

If I override what regrid output chunk sizes are, does this mean that can I tune regrid to store output from an input chunk exactly into a single output chunk?

res: [0, 1, 2, 3]

what is res?

Thank you!


#4

Hey, Sure.

How do I know which instances receive what chunks during data redistribution?

Hard to say in all cases. You can definitely find the resting distribution of data stored data by doing things like summarize() or list('chunk map'). But then if you run operators that change distribution (aggregate, regrid, sort, redimension,…), things will change and you have to, in a way, walk up the plan tree to figure out what data will end up where.

what line tells that the distribution is hash?

Yes it is dist: hash. Some of these are debugging information. Some of these are mathematical facts about the distribution that may be taken advantage of by future optimization. Part of the reason why this isn’t documented is that it may change in the future (but of course everyone can read our code).

what does (1:5) mean?

That happens to be the extent of populated data in the array. Since arrays can be sparse or ragged, it’s possible to have i=0:5 but have no data at i>3. Again this is something that the optimizer can use to plan queries.

What do you mean by corrected?

I mean that, at the moment, all stored arrays must be stored in hash distribution. In the future, this may be relaxed and other schemes may be possible but, for now, some code paths expect that to be the case. And so before we store anything into an array we have to make sure that chunks are distributed with the hash.

If I override what regrid output chunk sizes are, does this mean that can I tune regrid to store output from an input chunk exactly into a single output chunk?

You have full control over the size of the output chunk(s). If you set the output chunk size to be large, then the entire result could be returned as one chunk (make sure you have enough RAM!). Or you can make it so that for every input chunk there’s only one output chunk - as long as your grid size divides the input chunk evenly.

what is res?

That’s the array residency, the list of instances on which it is present. Residencies are used in Enterprise Edition when we can add instances on the fly or survive instance failure. More notes here: https://github.com/paradigm4/elastic_resource_groups


#5

Hello,

I am exploring the physical plan (it is below) for consume operator.
If consume does not save data to disk but the previous operator (transpose in this case) reports that it changes chunk distribution, does the redistribution actually happen (tranpose, shuffle, than consume) or consume just triggers computation (transposing and no network exchange) without data moves?

{No} physical_plan
{0} '[pPlan]:
>[pNode] PhysicalConsume ddl 0 tile 0 children 1
  schema public.r2_u10m_100x20x16@1<value:float> [lonpublic.r2_u10m_100x20x16=0:191 (0:191):0:16; latpublic.r2_u10m_100x20x16=0:93 (0:93):0:20; timepublic.r2_u10m_100x20x16=0:* (0:46799):0:100] ArrayId: 0 UnversionedArrayId: 0 Version: 0 Flags: 0 Distro: dist: undefined ps: 5 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 4294967304, 4294967309, 8589934598, 8589934599, 12884901892, 12884901893, 17179869194, 17179869198, 21474836491, 21474836495, 25769803778, 25769803779, 30064771081, 30064771084] <value:float,EmptyTag:indicator NOT NULL>
  output full chunks: yes
  changes dstribution: no
  props sgm 1 sgo 1
  diout dist: undefined ps: 5 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 4294967304, 4294967309, 8589934598, 8589934599, 12884901892, 12884901893, 17179869194, 17179869198, 21474836491, 21474836495, 25769803778, 25769803779, 30064771081, 30064771084]
  bound start {0, 0, 0} end {191, 93, 4611686018427387903} density 1 cells 9223372036854775807 chunks 9223372036854775807 est_bytes 5.81072e+20
>>[pNode] physicalTranspose ddl 0 tile 0 children 1
   schema public.r2_u10m_100x20x16@1<value:float> [lonpublic.r2_u10m_100x20x16=0:191 (0:191):0:16; latpublic.r2_u10m_100x20x16=0:93 (0:93):0:20; timepublic.r2_u10m_100x20x16=0:* (0:46799):0:100] ArrayId: 0 UnversionedArrayId: 0 Version: 0 Flags: 0 Distro: dist: undefined ps: 5 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 4294967304, 4294967309, 8589934598, 8589934599, 12884901892, 12884901893, 17179869194, 17179869198, 21474836491, 21474836495, 25769803778, 25769803779, 30064771081, 30064771084] <value:float,EmptyTag:indicator NOT NULL>
   output full chunks: yes
   changes dstribution: yes
   props sgm 1 sgo 1
   diout dist: undefined ps: 5 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 4294967304, 4294967309, 8589934598, 8589934599, 12884901892, 12884901893, 17179869194, 17179869198, 21474836491, 21474836495, 25769803778, 25769803779, 30064771081, 30064771084]
   bound start {0, 0, 0} end {191, 93, 46799} density 1 cells 844646400 chunks 28080 est_bytes 5.15235e+10
>>>[pNode] physicalScan ddl 0 tile 1 children 0
    schema public.r2_u10m_100x20x16@1<value:float> [timepublic.r2_u10m_100x20x16=0:* (0:46799):0:100; latpublic.r2_u10m_100x20x16=0:93 (0:93):0:20; lonpublic.r2_u10m_100x20x16=0:191 (0:191):0:16] ArrayId: 3 UnversionedArrayId: 2 Version: 1 Flags: 0 Distro: dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 4294967304, 4294967309, 8589934598, 8589934599, 12884901892, 12884901893, 17179869194, 17179869198, 21474836491, 21474836495, 25769803778, 25769803779, 30064771081, 30064771084] <value:float,EmptyTag:indicator NOT NULL>
    output full chunks: yes
    changes dstribution: no
    props sgm 1 sgo 1
    diout dist: hash ps: 1 ctx:  redun: 0 off: {} shift: 0 res: [0, 1, 4294967304, 4294967309, 8589934598, 8589934599, 12884901892, 12884901893, 17179869194, 17179869198, 21474836491, 21474836495, 25769803778, 25769803779, 30064771081, 30064771084]
    bound start {0, 0, 0} end {46799, 93, 191} density 1 cells 844646400 chunks 28080 est_bytes 5.15235e+10
'

#6

I am asking because in the case of consume it is not necessary to store chunks according to a hash distribution, This means that it is sufficient just to compute the transpose without chunk shuffling when consume is called. Am I right?