Implementation of Window Aggregation


#1

Hi experts,

I am very curious of window aggregation in SciDB. Is it very similar to the implementation of grid aggregation, except that more sliding grids will be generated?

Here is an example. Assume that there is an 1D array of size 10, and the window size is 2. The window aggregate is SUM. Does SciDB work as follows?

  1. load the chunk of the 1D array;
  2. perform structural grouping; (group1: array[0], array[1]; group2: array[1], array[2]; …; group9: array[8], array[9])
  3. calculate the sum of each group.

Furthermore, if the window aggregation needs to run in parallel, then the straightforward strategy is to evenly distribute the aggregation tasks of all the groups to different processors. Is this the way that SciDB parallelizes the aggregation? If so, I am afraid there might be a load balancing issue.

If the aggregate operator is a user-defined operator, which may involve intensive computation, and a WHERE clause is set to filter the elements in each group at the same time, the computation workload of different groups may vary a lot. Is this correct?


#2

Answers!

  1. Yes. The semantics of the SciDB window(…) are similar to a “sliding regrid”. The goal is to support operations like a gaussian smoothing of images, or timeseries calibration.

  2. The actual algorithm can be found in src/query/ops/aggregate/PhysicalWindow.cpp. Here’s how it works:

i. First, we make a decision about whether or not to materialize each input chunk. The basic idea is that the window(…) operation might be called by something else … like a thin(…) or a bernoulli(…). In those cases, the number of cells in the output chunk is much smaller than the number of cells in the input chunk, so we simply pull the set of cells in each specified window(…), one at a time. (Recall … we’re designed to allow client applications to wander at random over the result array using setPosition(), so where possible, we try to get the engine to pipe-line data through the operators as much as possible).
ii. If it makes sense to materialize the input chunk, then that’s what we do. We pull the chunk into an in-memory representation that allows us to quickly probe for the “region” (the set of cells) around the desired position.
iii. As we iterate over the cells, pulling values either from the underlying operators or from the materialized chunk, we execute the aggregate over cell values in the window, and return the aggregate’s result to the invoking operator (which might be the client/server communications).

  1. It’s worth making something clear about our aggregates. In your example, we don’t need to create 10 “groups”, each group containing 5 values (window size = 2 implies “two to the left, the one you’re on, two to the right”). Rather, we create 10 “aggregates”. The state for each aggregate–sum(…) in your example…is all we keep. Our in-memory data structure allows us to apply the aggregate to all members of the “group” without copying them. This is important from a memory management perspective. If we did group first, then aggregate each group, the memory footprint would be some (large) multiple of the input chunk’s size. We don’t want that.

  2. Note that there are in fact two kinds of window. The algorithm above refers to situations where you want to define your window size by the dimension. But in other cases, you don’t want a window that’s (say) 3x3. Instead, you want a window that’s “the three previous and two following” values along some flux vector through the array. This is a common access pattern in timeseries applications (average the last three trades, whenever they occurred, to get an estimate of current price). The algorithm for that other kind of window is much more complicated.

  3. We’re able to parallelize the operation for two reasons. First, the chunks must have adequate overlap to support the window(…) size. If your overlap is 0, and the window size is +/- 3, then you can’t compute the window(…) in parallel. SciDB would need to repart(…) the input (stitch the edges together to compute the window(…) of cells along the chunk’s perimeter). Second, because of the way we pseudo-randomly distribute chunks over the instances, each instance is able to process it’s local chunks concurrently with all of the other instances. Even within a single instance, we’re able to use multiple threads to process several chunks simultaneously.

  4. Load balance is a general problem. We address it in part by distributing the chunks over the instances in a manner that’s pretty random. But regardless of what we do, we expect folk in some applications are going to have lots of concurrent users interested in a small section of the data (perhaps one or two chunks), or the data might be “skewed” within the array’s logical boundaries, or cells that pass through a filter(…) might be unevenly distributed as you suggest, etc.

Given the many and diverse ways a load balancing problem can come about, there’s little you can do at a system level. It’s a problem we & our users will need to handle one at a time, whenever it comes up.


#3

Plumber, thank you very much for the detailed answer. It’s very helpful, and I still have 3 more questions with regard to your reply.

  1. Since all the data processed in SciDB is chunked array, will you assume that the chunk size is set properly by the user so that a single chunk is less than the cache size?

  2. I understand we should reuse the data in memory and avoid data redundancy by duplicating same data to different groups, but I am not very clear how you optimize the memory footprint. Let’s assume that an array of size 3 is already in the memory, and all we need to compute are sum1(a1, a2) and sum2(a2, a3).Which process below is the way SciDB works? (“read” means load an element into cache)

I.
read a1 and a2, sum1 = a1 + a2;
read (a2 and) a3, sum2 = a2 + a3;

II.
sum1 = 0, sum2 = 0;
read a1, sum1 += a1;
read a2, sum1 += a2, sum2 +=a3;
read a3, sum2 += a3.

  1. You mentioned that “If your overlap is 0, and the window size is +/- 3, then you can’t compute the window(…) in parallel.” Did you also imply that although I set the chunk overlap as 3, if the window size is +/-5, then the parallelization will also require an repart() operation beforehand, right? If so, I will understand that as a user, I should make an accurate guess of my possible window size, when loading data into SciDB and setting chunk overlap size. If I am uncertain about the possible window size I will use in the future, I should keep the chunk overlap large enough, in a very conservative manner.

Look forward to your reply.


#4

Answers!

  1. Correct. This is one of the reasons we recommend chunk sizes of between 2M and 64M, with optimal sizes being 8M to 16M. In other words, try to get about 1,000,000 cells per chunk. If your chunk’s too big to fit in cache, we will load just one chunk. If you’re chunk’s too big to fit into your RAM, we’re no good to you.

  2. Algorithm II is closest. But it’s a bit more complex than that. Suppose you have an array with 4 chunks, and 2 instances, with each instance holding 2 chunks. Suppose the chunks are c1, c2, c3, c4, and i1 has { c1, c3 }, i2 has { c2, c4 }

Instance # 1:                     Instance # 2: 
S1 = init ( Sum_State );          S2 = init ( Sum_State ); 
read c1;                          read c2; 
iter ( S1, c1, plus );            iter ( S2, c2, plus ); 
read c3;                          read c4; 
iter ( S1, c3, plus );            iter ( S2, c4, plus ); 
S2 = read ( Sum_State from i2 )
merge ( S1, S2, plus ); 
result = final ( S1 );

Get the idea? We build partial results for the aggregates on each instance, and only merge them to produce a final when the client asks for the result.

As for optimizing memory usage, what we do is nothing more sophisticated than a good old fashioned DBMS style LRU cache. Once the aggregate operator has finished with the data in the c1 chunk, we mark the chunk as “eligible for ejection”. If a query–need not be the same query, could be another query submitted by another user simultaneously with the first–needs space for a chunk, we find the least recently used chunk, and re-use it’s space for the “next” chunk wanted. We’re looking at more complex algorithms–MRU, not LRU ejection, for chunks in a full scan, say–but history’s shown that simple LRU caching gets you 80% of what can be done.

  1. If you get the overlap size wrong, we’ll re-organize it for you. The query will still run; it will just take longer than if you had decided to provision out enough overlap. Some people want to pull a small(ish) segment of data from a big array, and then use window(…) on that. If the size of the data you’re pushing through the window(…) is small, then the overhead of the re-organization (we call it repart(…) for repartition) isn’t large in absolute terms.

Think of overlap specification as a performance tuning thing. If you don’t choose to use it, we’ll “do the right thing”, but we’re obliged to do it more slowly than we could. But we’re not going to be any slower than anyone else. If you get the overlap right, we go very fast.


#5

I did some performance evaluation on some raw scientific chunked arrays (e.g., HDF5 datasets), and it seems that loading consecutive chunks is much faster than loading inconsecutive chunks. Does the random chunk distribution in SciDB hurt the its performance to some extent? I guess it’s a tradeoff between skew-tolerance and performance.

If such a random chunk distribution does not hurt the performance much, does SciDB shuffle the chunks during the data loading? :confused:


#6

SciDB moves chunks to the instance they’ll live on at load time, yes. Our assumption is that you’ll load once, and query many times, so it’s worth spending the time on load to improve the performance / reliability of the subsequent queries. Also worth noting that at load time, we create a number of copies of the input data and distribute these copies over the instances to ensure we will survive instance failure.

What do you mean by “consecutive chunks”? In a 2D array, “consecutive” might mean row-major, column-major or some other serialization. Or are you loading a 1D array?


#7

Since any multi-dimensional array can be serialized into a 1D array, any chunked array can also be serialized into a sequence of chunks. For a 2D array in C language interface, “consecutive chunks” means row-major.

BTW, it seems that SciDB does not support chunking the data in different sizes (i.e., irregular chunks). I think irregular chunking is a good approach to handle sparse array, so that the chunks of different sizes can still hold the same amount of data. As I know, ArrayStore and Rasdarman have implemented such a feature. Doesn’t SciDB intend to do so?


#8

If you’re loading chunks in row-major-order then there is a “smoothing” to the distribution. No two consecutive chunks will be located on the same instance. On the other hand, if you’re “randomly” loading, then there’s a good chance a local instance will be handling more than one chunk at a time. Overall though … the effect should be pretty minor … especially as instance counts go up. Can you give me a script to quantify the difference?

Did you look at the parallel loading facilities? If you break your load data up (or split it as it arrives) we support parallel loading.

You’re completely right: at the moment, SciDB has a “fixed logical / variable physical” approach to chunk management. This is ideal for dense chunks, and decent so long as you don’t have lots of skew. We’re looking at a couple of approaches to a “variable logical / fixed physical” dynamic chunking model. We have a high level design, but for the short to medium term, we’re kinda resource constrained.

Also … we’re paying very close attention to what ArrayStore can teach us. :wink: Emad, Magda and Daniel are all friends of ours.


#9

Sorry, perhaps I made you confused. I didn’t explicitly load consecutive chunks in SciDB, so I don’t have such a script. I only conducted some chunk loading experiments on the HDF5 chunked datasets, before the data is reformatted and loaded into SciDB.

I didn’t try any SciDB parallel loading facility, but I know every SciDB instance can run in parallel in the cluster. I believe parallel loading should have great scalability, since both the loading and reformatting are embarrassingly parallel. Currently I only installed SciDB on my machine with a single instance.

So far I don’t need to process any sparse array, so I am fine with the current version of SciDB. :smiley:

I saw the acknowledgements in the ArrayStore paper, and I know there is also some effort from SciDB team. :laughing: