Parallel loading of binary files


#1

Hi,

I’m trying to load binary input files in parallel with the load() operator on SciDB 14.8. My understanding of the docs was that if you use instance_id = -1 in the load operator, the coordinator will split the file into chunks and distribute those to the instances and then all instances load in parallel, resulting in a speed-up of approx. # of instances.

(My number of instances is 4.)

However, when I tried it out, “the opposite” happened: The same data was loaded four times and it took twice as long.

Here’s the output of my test script:

load instance_id: -2
analyze result:
{attribute_number} attribute_name,min,max,distinct_count,non_null_count
{0} ‘gpi’,‘0’,‘3264390’,3963546,3264391
[…]
elapsed time: 6.220s

load instance_id: -1
analyze result:
{attribute_number} attribute_name,min,max,distinct_count,non_null_count
{0} ‘gpi’,‘0’,‘3264390’,3963546,13057564
[…]
elapsed time: 15.937s

Is this a misunderstanding on my part, a bug in the operator, or a configuration problem in my SciDB?


#2

Hi Thea,

The problem is that the load() operator does not actually do any splitting. The operator’s parallel loading mode (instance_id == -1) is intended for use with the loadcsv.py script, which splits CSV (or TSV) text input and sends it to FIFO files on each instance, and then invokes load(…, -1, …) to load from the local FIFO at each instance. In 14.8 we do not have a way to split a single binary input stream, so either the coordinator or some other particular instance must be used.

The docs fail to make clear that the splitting implied by parallel load is not actually done by the load operator. :frowning:

If your binary data is all fixed-length fields, you may be able to use the GNU split(1) utility to break it up into per-instance pieces, and then parallel loading with a format string might do the trick.

We’re aware that data loading workflows need to be improved and simplified, and (though I’m too low on the totem pole to make capital-P-Promises) the next few releases should show steady improvement in this area.

Cheers,
Mike Leibensperger
Paradigm4


#3

Hi Mike,

[quote=“mjl”]
The problem is that the load() operator does not actually do any splitting. The operator’s parallel loading mode (instance_id == -1) is intended for use with the loadcsv.py script, which splits CSV (or TSV) text input and sends it to FIFO files on each instance, and then invokes load(…, -1, …) to load from the local FIFO at each instance. [/quote]
So I could/should write my own script that splits the binary file and sends each batch to a fifo on a different instance?
I assume the fifo always has to have the same name, so instance == machine?

[quote]
The docs fail to make clear that the splitting implied by parallel load is not actually done by the load operator. :frowning:[/quote]
Yeah, a hint in that direction would be nice :wink:

That sounds promising and would be very welcome :smiley:


#4

Some more notes that might be helpful (I’ll post this also in the “How To, Demos and Toys” forum).

The basic methodology goes like this …

  1. You start with a single, large file. In order for SciDB (or any DBMS) to load that file’s contents in parallel (that is, to load a different portion of the input on each instance) someone needs to chop up the input up. There are lots of tools and utilities that you can use to accomplish this; Linux ‘split’, or a perl/python/PHP script that uses some kind of “unpack” facility.

  2. Having chopped your input file into pieces, your next task is to locate each piece somewhere that’s local to the instance. Why use relative addressing to a well known location in each instance’s data directories? To simplify the syntax of the load(…) command.

  3. Then you can use the SciDB input(…) (or load) tools to load the data.

Here’s a longer script that should make things a bit more clear.

Let’s start with a simple example array:

DROP ARRAY Load_Test;
CREATE ARRAY Load_Test
< a1 : double, a2 : double >
[ I=0:99,10,0, J=0:99,10,0 ];

And let’s populate it:

SET NO FETCH;
INSERT INTO Load_Test
 SELECT *
   FROM build ( < a1 : double > [ I=0:99,10,0, J=0:99,10,0 ],
                double(J*100+I)
              ),
        build ( < a2 : double > [ I=0:99,10,0, J=0:99,10,0 ],
                double ( random()%1000 ) / 100.0
              );

You can now look at the data you’ve just created:

SET FETCH;
SELECT COUNT(*), MIN ( a1 ), MAX ( a1 ) FROM Load_Test;

The basic SAVE utilities in SciDB (the mechanics you use to export data from a SciDB array into the file system) can be exercised using either AQL or AFL:

SET NO FETCH;
SET LANG AQL;
SAVE Load_Test INTO '/tmp/BINARY_SAVE_OF_LOAD_TEST.bin' AS '(double, double)';

SET LANG AFL;
save (
  Load_Test,
  '/tmp/BINARY_SAVE_OF_LOAD_TEST.bin',
  0,
  '(double,double)'
);

Note that I’m focussing on the binary save and load options here. For a longer write-up on SciDB’s load options, have a look http://www.scidb.org/forum/viewtopic.php?f=11&t=1308&p=2852&hilit=load#p2724%20at%20this%20forum%20post.

In the two statements above, SciDB writes the contents of the entire Load_Test array to a single binary file (note the instance = 0 option, and the filename with absolute pathing to ‘/tmp’) that contains all of the array’s attributes serialized in chunk order, and within the chunks, in row-major order. Note that the binary file does not include any coordinates information. (More on this below).

To get a handle on what’s going on in a binary file, one option is to use the Linux ‘od’ tool as follows:

od -N 128 --format=f8 /tmp/BINARY_SAVE_OF_LOAD_TEST.bin

   0000000   0.000000000000000e+00   1.020000000000000e+00
   0000020   1.000000000000000e+00   1.050000000000000e+00
   0000040   2.000000000000000e+00   8.730000000000000e+00
   0000060   3.000000000000000e+00   9.570000000000000e+00
   0000100   4.000000000000000e+00   1.640000000000000e+00
   0000120   5.000000000000000e+00   4.980000000000000e+00
   0000140   6.000000000000000e+00   6.860000000000000e+00
   0000160   7.000000000000000e+00   8.720000000000001e+00

Note the values 0.0, 1.0, 2.0, 3.0 … etc in the first column? These correspond to the ‘a1’ attribute. The second column contains the randomly generated double precision values in ‘a2’.

How to pull the data back in? The symmetric operation to save(…) in SciDB is called input(…). In general, folk will use load(…) to put the data directly into a target array. But there’s no absolute requirement that you do this. For example:

SET LANG AQL;
SET FETCH;
SELECT COUNT(*) AS CNT,
       MIN ( a1 ) AS Start,
       MAX ( a1 ) AS End
  FROM input ( < a1 : double, a2 : double > [ RowNum ],
               '/tmp/BINARY_SAVE_OF_LOAD_TEST.bin',
               0,
               '(double, double)'
             );

Now, so far we’ve been dealing with data that only contains the double precision attribute values. What about the coordinates? In general, load files (binary or text) contain coordinates information, in addition to attributes information. So we want to (quickly) create a data set that looks more like what we can expect to get: one that contains each cell’s coordinates, and the values of attributes at each cell’s location.

The following AFL query “unpacks” the input, 2D Load_Test array into a 1D array where the coordinates information from the 2D Load_Test gets turned into an attribute in the 1D unpacked form. Then the query uses the save(…) to save the 1D array’s contents in a single file in the /tmp directory on the coordinator.

SET LANG AFL;
SET NO FETCH;
save (
  unpack ( Load_Test, RowNum ),
  '/tmp/BINARY_SAVE_OF_UNPACKED_LOAD_TEST.bin',
  0,
  '(int64, int64, double, double)'
);

Looking at the contents of the /tmp/BINARY_SAVE_OF_UNPACKED_LOAD_TEST.bin file is a bit beyond what the ‘od’ tool can do because the data types vary. So when you’re dealing with binary data it’s often a good idea to reach for scripting languages and tools, many of which have been worked on for years to add precisely the kind of functionality that data wranglers need. Back in the 80s Perl introduced us to the pack() and unpack() operations, which provide rich facilities for dealing with these kinds of problems.

#!/usr/bin/perl
#
$RECLEN=32;
while ($bytesRead = read (STDIN, $buffer, $RECLEN)) {
  my ( $I, $J, $a1, $a2 ) = unpack "qqdd", $buffer;
  printf ("I = %d, J = %d, a1 = %g, a2 = %g\n", $I, $J, $a1, $a2 );
}

A glance at the pack / unpack documentation should give you some insight into the reasons a general purpose scripting tool is a very good idea for this kind of work!

If you put that little script into /tmp/unpack.pl, you can use it to look at the file SciDB saved for you in /tmp.

cat /tmp/BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin | /tmp/unpack.pl | head -10

I = 0, J = 0, a1 = 0, a2 = 7.81
I = 0, J = 1, a1 = 100, a2 = 5.84
I = 0, J = 2, a1 = 200, a2 = 4.5
I = 0, J = 3, a1 = 300, a2 = 2.75
I = 0, J = 4, a1 = 400, a2 = 7.7
I = 0, J = 5, a1 = 500, a2 = 2.81
I = 0, J = 6, a1 = 600, a2 = 7.17
I = 0, J = 7, a1 = 700, a2 = 2.85
I = 0, J = 8, a1 = 800, a2 = 6.28
I = 0, J = 9, a1 = 900, a2 = 8.23

So far, we have saved data as binary into a single file. But we want to be able to load the data in binary. A good way to understand how this works is to exercise the parallel unload, and then see where the files end up.

How to save in parallel in SciDB? Again, you can do this in AFL, as follows:

SET LANG AFL;
SET NO FETCH;
save (
  unpack ( Load_Test, RowNum ),
  'BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin',
  -1,
  '(int64, int64, double, double)'
);

Note the use of “instance = -1” flag in the AFL save(…) operator? That tells SciDB to save each instance’s fragment(s) (ie the chunks of the array) in that instance.

Where do these files containing fragments of the array’s chunks end up? Change directory to the location of the instance’s data ( this is the base-path in the config.ini file ) and use ‘find’ to search for the files.You should see something like this.

cd [base-path from config.ini]
find . -name '*.bin' -print
./000/0/BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin
./000/1/BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin
./000/3/BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin
./000/2/BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin

Note that, on my system, I am running 4 SciDB instances on a single machine. Your mileage will vary, depending on how many instances you’re running, and how these instances are mapped to your physical boxes. The important point is that the parallel save locates each of the files in each instance’s data directories. If you have one SciDB instance per physical box, that’s one file per box. More than one instance per physical node means more than one file per box, but these files are all going to be located in the instance’s data directories.

Of course, there’s a symmetric input(…) operation that can pull the contents of all of these files into the system in parallel.

SET LANG AQL;
SET FETCH;
SELECT MIN ( a1 ) AS start_a1,
       MAX ( a1 ) AS end_a1,
       COUNT(*) AS C
  FROM input (  < I : int64, J : int64, a1 : double, a2 : int64 > [ RowNum ],
                'BINARY_SLICE_OF_UNPACKED_LOAD_TEST.bin',
                -1,
                '(int64, int64, double, int64)'
             );

Now, if you’re starting with a large binary file, then the first thing to do is to break it up into smaller files. Then you need to move each of the smaller files into the appropriate location on each instance. The most general purpose utility for breaking binary files up in Linux is called ‘split’. (Note that if you have something exotic in your data like a variable length string, you’re going to have to use a scripting tool to pry it apart. But split is generally extremely convenient for files containing lots of fixed length data objects.

You would use split in the following way …

$ $ split -b 40000 /tmp/BINARY_SAVE_OF_LOAD_TEST.bin /tmp/BINARY_SPLIT.bin $ ls -1s /tmp/BINARY_SPLIT.bina* 40 /tmp/BINARY_SPLIT.binaa 40 /tmp/BINARY_SPLIT.binab 40 /tmp/BINARY_SPLIT.binac 40 /tmp/BINARY_SPLIT.binad

Each of those files? The BINARY_SPLIT.binaa through BINARY_SPLIT.binad? Each of them contains a fragment of the original BINARY_SAVE_OF_LOAD_TEST.bin data file. To parallelize the load, you would copy each fragment into the same location were SciDB earlier placed the parallel save(…) output. Each fragment will need the same file name in its new location …

mv /tmp/BINARY_SPLIT.binaa [base-path from config.ini]/000/0/BINARY_SPLIT.bin
mv /tmp/BINARY_SPLIT.binab [base-path from config.ini]/000/1/BINARY_SPLIT.bin
mv /tmp/BINARY_SPLIT.binac [base-path from config.ini]/000/2/BINARY_SPLIT.bin
mv /tmp/BINARY_SPLIT.binad [base-path from config.ini]/000/3/BINARY_SPLIT.bin

And now, you should be able to just load the entire data set, in parallel. The following AFL and AQL queries illustrate how to do this.

SET LANG AFL;
SET NO FETCH;
load (
  Load_Test_1D,
  'BINARY_SPLIT.bin',
  -1,
  '(int64, int64, double, int64)'
);

SET LANG AQL;
INSERT INTO Load_Test_Two
SELECT MIN ( a1 ) AS a1,
       MIN ( a2 ) AS a2
  FROM input (  < I : int64, J : int64, a1 : double, a2 : int64 > [ RowNum ],
                'BINARY_SPLIT.bin',
                -1,
                '(int64, int64, double, int64)'
             )
 REDIMENSION BY [ I=0:99,10,0, J=0:99,10,0 ];

Anyway … long explanation, but I hope this helps. The basic idea though, is to break your binary file up, locate each fragment in each instance’s data directory, and then load the data in parallel.