How to merge partial chunks from different instances?


#1

Hi,
I’m writing an UDO to conduct user-defined aggregations. I know that every instance executes the execute() function locally, and they generate local results for example a partial of a chunk. The problem is how can I merge partial chunks from different instances to generate complete chunks after aggregation.
I found it in Aggregator.h that redistributeToRandomAccess(input, defaultPartitioning(), ArrayResPtr(), query, _aggs) can merge partial intermediate aggregate results into complete chunks. However, the aggregation function in our UDO is not defined in existing SciDB aggregation functions.
Also, I tried to write a chunkMerger class based on AggregateChunkMerger class in Aggregate.cpp. But it turns out that some partial chunk data in some instances have not written to the result array.
Any better suggestions?


#2

Hi - there are several ways to do it. If you are OK with sending everything to a single instance to merge it, and you want to build something quickly - I would use streaming:

If your partial chunks could be large and you want to build things at the C++ level - I would read these two operators:


Faster_redimension is, well, faster. But it is also more experimental in the way it uses SG and we are still testing some edge cases related to it.


#3

Thanks a lot for your quick reply!
Actually, I am writing the UDO using C++, so I read the two operators you recommended to me.
It seems that both of the two operators can’t solve my problem very well. Maybe I should explain my problem in details.
My operator conducts aggregation by chunk unit in a bottom-up style. For instance, if inputArray<val: double> [x=1:16,4,0,y=1:16,4,0], then parentArrayval:double[x=1:4,4,0,y=1:4,4,0], where chunk size is 4*4.
Moreover, the final output array contains only 1 root cell, i.e., outputArray<val: double> [x=1:1,4,0,y=1:1,4,0]. If there’re 16 instances, every instance contains 1 chunk of the inputArray. After the first aggregation, every instance generates a cell of the parentArray as well as the remained cells with null values. What I expect to do is to merge those partial data of each instance to complete chunks and then redistribute them to random access to conduct further aggregations.
However, the grouped_aggregate operator generates an output array, which changes the dimensions of the input array by adding dimensions of instance_id and value_no. While I expect the schema of output array is the same with the input array except changing the length of dimensions.
The faster_dimension operator scans the input array to tuples and then transfers them to output array. It seems a little complicated to implement my operator in this way.
Is there any efficient way for me to do this?


#4

Ah I see.

So it looks essentially like a double regrid. The first step is similar to this:

$ iquery -anq "store(build(<val:double>[x=1:16,4,0,y=1:16,4,0], (x-1)*16+(y-1)), foo)"
Query was executed successfully

$ iquery -anq "store(regrid(foo, 4, 4, sum(val), count(val), 4, 4), bar)"
Query was executed successfully

$ iquery -aq "show(bar)"
{i} schema
{0} 'bar<val_sum:double,val_count:uint64> [x=1:4:0:4; y=1:4:0:4]'

$ iquery -aq "scan(bar)"
{x,y} val_sum,val_count
{1,1} 408,16
{1,2} 472,16
{1,3} 536,16
{1,4} 600,16
{2,1} 1432,16
{2,2} 1496,16
{2,3} 1560,16
{2,4} 1624,16
{3,1} 2456,16
{3,2} 2520,16
{3,3} 2584,16
{3,4} 2648,16
{4,1} 3480,16
{4,2} 3544,16
{4,3} 3608,16
{4,4} 3672,16

And then here’s the second step:

$ iquery -aq "regrid(bar, 4,4, avg(val_sum), 1,1)"
{x,y} val_sum_avg
{1,1} 2040

Did I get that right?

Have you considered just using a UDA, instead of a UDO? Something like this:


If you do that, you would be able to just say

regrid(foo, 4, 4, my_aggregate(val))

Grouped aggregate can still be utilized. In fact, it’s more general. But you just convert dimensions to attributes. You lose some dimensionality. It doesn’t make a big difference if the intermediate result is only 4x4. Dont know how big the final result will be:

$ iquery -aq "grouped_aggregate(apply(foo, new_x, (x-1)/4+1, new_y, (y-1)/4+1), avg(val), new_x, new_y)"
{instance_id,value_no} new_x,new_y,val_avg
{0,0} 1,1,25.5
{3,0} 2,2,93.5
{8,0} 2,1,89.5
{14,0} 2,4,101.5
{19,0} 4,3,225.5
{21,0} 1,3,33.5
{21,1} 1,4,37.5
{23,0} 3,2,157.5
{25,0} 4,4,229.5
{33,0} 3,4,165.5
{35,0} 4,2,221.5
{40,0} 2,3,97.5
{42,0} 3,3,161.5
{44,0} 1,2,29.5
{46,0} 4,1,217.5
{61,0} 3,1,153.5

Regrid probably is a more efficient approach however.

At any rate, I would try one of the following strategies:

  1. use a UDA if possible (simplest)
  2. if (1) doesn’t work try to use grouped_aggregate (less simple)
  3. if (2) doesn’t work try to go through the code and mimic what regrid does.

Hope that helps.


#6

Thanks very much for giving me these solutions.
Yes, the UDO I’m developing is very similar to regrid, but the output type is a more complex type defined by myself, such as a list of the struct as below:

struct grid
{
int64_t x;
int64_t y;
double val;
}

Actually, I tried the 3rd solution in the very beginning, and wrote the UDO in C++ based on regrid and Aggregator.h.
However, problem comes back to the initial problem I asked, i.e., the redistributeToRandomAccess() step.
Since the aggregate method in my UDO has not been defined in existing SciDB aggregate functions.
I rewrote the redistributeToRandomAccess(input, defaultPartitioning(), ArrayResPtr(), query, _aggs) function to merge partial chunks to complete chunks, but the result lost some data. Maybe some part of my codes went wrong but I haven’t figured out it.

Currently, my plan is to write a UDA as you suggested. In this way, it’s possible to use the redistributeToRandomAccess() function without rewriting it, isn’t it?

Also, is the UDA implementable if the output data type is a struct data?
I plan to convert struct data to strings, but drawback is the strings maybe very large. It seems I can write an User-Defined Types(UDTs) instead. I find some examples in the source code, but I’m not sure how to implement it. Is there any manuals like the readme file in exmaple_udos?

Look forward to your reply.


#7

Hi!
Problem solved by using a UDA as you suggested. Thanks a lot!
Just one more question, since I have to conduct multiple times of UDAs to generate a result array, I wrote a python script to do this, but it generates some intermediate arrays that do not necessary to be stored.
Is it more efficient to implement all those phases into one UDO? Are arrays generated in a UDO materialized or not? If they are not materialized, will they disappear during the execution of UDO?

Look forward to your reply.
Jenny