# How to merge partial chunks from different instances?

#1

Hi,
I’m writing an UDO to conduct user-defined aggregations. I know that every instance executes the execute() function locally, and they generate local results for example a partial of a chunk. The problem is how can I merge partial chunks from different instances to generate complete chunks after aggregation.
I found it in Aggregator.h that redistributeToRandomAccess(input, defaultPartitioning(), ArrayResPtr(), query, _aggs) can merge partial intermediate aggregate results into complete chunks. However, the aggregation function in our UDO is not defined in existing SciDB aggregation functions.
Also, I tried to write a chunkMerger class based on AggregateChunkMerger class in Aggregate.cpp. But it turns out that some partial chunk data in some instances have not written to the result array.
Any better suggestions?

#2

Hi - there are several ways to do it. If you are OK with sending everything to a single instance to merge it, and you want to build something quickly - I would use streaming:

If your partial chunks could be large and you want to build things at the C++ level - I would read these two operators:

Faster_redimension is, well, faster. But it is also more experimental in the way it uses SG and we are still testing some edge cases related to it.

#3

Actually, I am writing the UDO using C++, so I read the two operators you recommended to me.
It seems that both of the two operators can’t solve my problem very well. Maybe I should explain my problem in details.
My operator conducts aggregation by chunk unit in a bottom-up style. For instance, if inputArray<val: double> [x=1:16,4,0,y=1:16,4,0], then parentArrayval:double[x=1:4,4,0,y=1:4,4,0], where chunk size is 4*4.
Moreover, the final output array contains only 1 root cell, i.e., outputArray<val: double> [x=1:1,4,0,y=1:1,4,0]. If there’re 16 instances, every instance contains 1 chunk of the inputArray. After the first aggregation, every instance generates a cell of the parentArray as well as the remained cells with null values. What I expect to do is to merge those partial data of each instance to complete chunks and then redistribute them to random access to conduct further aggregations.
However, the grouped_aggregate operator generates an output array, which changes the dimensions of the input array by adding dimensions of instance_id and value_no. While I expect the schema of output array is the same with the input array except changing the length of dimensions.
The faster_dimension operator scans the input array to tuples and then transfers them to output array. It seems a little complicated to implement my operator in this way.
Is there any efficient way for me to do this?

#4

Ah I see.

So it looks essentially like a double regrid. The first step is similar to this:

$iquery -anq "store(build(<val:double>[x=1:16,4,0,y=1:16,4,0], (x-1)*16+(y-1)), foo)" Query was executed successfully$ iquery -anq "store(regrid(foo, 4, 4, sum(val), count(val), 4, 4), bar)"
Query was executed successfully

$iquery -aq "show(bar)" {i} schema {0} 'bar<val_sum:double,val_count:uint64> [x=1:4:0:4; y=1:4:0:4]'$ iquery -aq "scan(bar)"
{x,y} val_sum,val_count
{1,1} 408,16
{1,2} 472,16
{1,3} 536,16
{1,4} 600,16
{2,1} 1432,16
{2,2} 1496,16
{2,3} 1560,16
{2,4} 1624,16
{3,1} 2456,16
{3,2} 2520,16
{3,3} 2584,16
{3,4} 2648,16
{4,1} 3480,16
{4,2} 3544,16
{4,3} 3608,16
{4,4} 3672,16


And then here’s the second step:

$iquery -aq "regrid(bar, 4,4, avg(val_sum), 1,1)" {x,y} val_sum_avg {1,1} 2040  Did I get that right? Have you considered just using a UDA, instead of a UDO? Something like this: If you do that, you would be able to just say regrid(foo, 4, 4, my_aggregate(val))  Grouped aggregate can still be utilized. In fact, it’s more general. But you just convert dimensions to attributes. You lose some dimensionality. It doesn’t make a big difference if the intermediate result is only 4x4. Dont know how big the final result will be: $ iquery -aq "grouped_aggregate(apply(foo, new_x, (x-1)/4+1, new_y, (y-1)/4+1), avg(val), new_x, new_y)"
{instance_id,value_no} new_x,new_y,val_avg
{0,0} 1,1,25.5
{3,0} 2,2,93.5
{8,0} 2,1,89.5
{14,0} 2,4,101.5
{19,0} 4,3,225.5
{21,0} 1,3,33.5
{21,1} 1,4,37.5
{23,0} 3,2,157.5
{25,0} 4,4,229.5
{33,0} 3,4,165.5
{35,0} 4,2,221.5
{40,0} 2,3,97.5
{42,0} 3,3,161.5
{44,0} 1,2,29.5
{46,0} 4,1,217.5
{61,0} 3,1,153.5


Regrid probably is a more efficient approach however.

At any rate, I would try one of the following strategies:

1. use a UDA if possible (simplest)
2. if (1) doesn’t work try to use grouped_aggregate (less simple)
3. if (2) doesn’t work try to go through the code and mimic what regrid does.

Hope that helps.

#6

Thanks very much for giving me these solutions.
Yes, the UDO I’m developing is very similar to regrid, but the output type is a more complex type defined by myself, such as a list of the struct as below:

struct grid
{
int64_t x;
int64_t y;
double val;
}

Actually, I tried the 3rd solution in the very beginning, and wrote the UDO in C++ based on regrid and Aggregator.h.
However, problem comes back to the initial problem I asked, i.e., the redistributeToRandomAccess() step.
Since the aggregate method in my UDO has not been defined in existing SciDB aggregate functions.
I rewrote the redistributeToRandomAccess(input, defaultPartitioning(), ArrayResPtr(), query, _aggs) function to merge partial chunks to complete chunks, but the result lost some data. Maybe some part of my codes went wrong but I haven’t figured out it.

Currently, my plan is to write a UDA as you suggested. In this way, it’s possible to use the redistributeToRandomAccess() function without rewriting it, isn’t it?

Also, is the UDA implementable if the output data type is a struct data?
I plan to convert struct data to strings, but drawback is the strings maybe very large. It seems I can write an User-Defined Types(UDTs) instead. I find some examples in the source code, but I’m not sure how to implement it. Is there any manuals like the readme file in exmaple_udos?